這篇文章給大家分享的是raft的實(shí)際運(yùn)用,相信大部分人都還沒學(xué)會(huì)這個(gè)技能,為了讓大家學(xué)會(huì),給大家總結(jié)了以下內(nèi)容,話不多說,一起往下看吧。
成都創(chuàng)新互聯(lián)公司服務(wù)項(xiàng)目包括古藺網(wǎng)站建設(shè)、古藺網(wǎng)站制作、古藺網(wǎng)頁制作以及古藺網(wǎng)絡(luò)營銷策劃等。多年來,我們專注于互聯(lián)網(wǎng)行業(yè),利用自身積累的技術(shù)優(yōu)勢(shì)、行業(yè)經(jīng)驗(yàn)、深度合作伙伴關(guān)系等,向廣大中小型企業(yè)、政府機(jī)構(gòu)等提供互聯(lián)網(wǎng)行業(yè)的解決方案,古藺網(wǎng)站推廣取得了明顯的社會(huì)效益與經(jīng)濟(jì)效益。目前,我們服務(wù)的客戶以成都為中心已經(jīng)輻射到古藺省份的部分城市,未來相信會(huì)繼續(xù)擴(kuò)大服務(wù)區(qū)域并繼續(xù)獲得客戶的支持與信任!
1、raft.go 的raft結(jié)構(gòu)體 補(bǔ)充字段。 字段應(yīng)該盡量與raft論文的Figure2接近。
type Raft struct { mu sync.Mutex // Lock to protect shared access to this peer's state peers []*labrpc.ClientEnd // RPC end points of all peers persister *Persister // Object to hold this peer's persisted state me int // this peer's index into peers[] dead int32 // set by Kill() // Your data here (2A, 2B, 2C). // Look at the paper's Figure 2 for a description of what // state a Raft server must maintain. state int // follower, candidate or leader resetTimer chan struct{} // for reset election timer electionTimer *time.Timer // election timer electionTimeout time.Duration // 400~800ms heartbeatInterval time.Duration // 100ms CurrentTerm int // Persisted before responding to RPCs VotedFor int // Persisted before responding to RPCs Logs []LogEntry // Persisted before responding to RPCs commitCond *sync.Cond // for commitIndex update //newEntryCond []*sync.Cond // for new log entry commitIndex int // Volatile state on all servers lastApplied int // Volatile state on all servers nextIndex []int // Leader only, reinitialized after election matchIndex []int // Leader only, reinitialized after election applyCh chan ApplyMsg // outgoing channel to service shutdownCh chan struct{} // shutdown channel, shut raft instance gracefully }
func (rf *Raft) GetState() (int, bool) { var term int var isleader bool // Your code here (2A). rf.mu.Lock() defer rf.mu.Unlock() term = rf.CurrentTerm isleader = rf.state == Leader return term, isleader }
type RequestVoteArgs struct { // Your data here (2A, 2B). Term int // candidate's term CandidateID int // candidate requesting vote LastLogIndex int // index of candidate's last log entry LastLogTerm int // term of candidate's last log entry } type RequestVoteReply struct { // Your data here (2A). CurrentTerm int // currentTerm, for candidate to update itself VoteGranted bool // true means candidate received vote }
1、獲取當(dāng)前節(jié)點(diǎn)的log個(gè)數(shù),以及最后一個(gè)log的term 確定當(dāng)前節(jié)點(diǎn)的term。
2、如果調(diào)用節(jié)點(diǎn)的term小于當(dāng)前節(jié)點(diǎn),返回當(dāng)前term,并且不為其投票。
3、如果調(diào)用節(jié)點(diǎn)的term大于當(dāng)前節(jié)點(diǎn),修改當(dāng)前節(jié)點(diǎn)的term,當(dāng)前節(jié)點(diǎn)轉(zhuǎn)為follower.
4、如果調(diào)用節(jié)點(diǎn)的term大于當(dāng)前節(jié)點(diǎn),或者等于當(dāng)前節(jié)點(diǎn)term并且調(diào)用節(jié)點(diǎn)的log個(gè)數(shù)大于等于當(dāng)前節(jié)點(diǎn)的log,則為調(diào)用節(jié)點(diǎn)投票。
5、投票后重置當(dāng)前節(jié)點(diǎn)的選舉超時(shí)時(shí)間。
func (rf *Raft) RequestVote(args *RequestVoteArgs, reply *RequestVoteReply) { // Your code here (2A, 2B). select { case <-rf.shutdownCh: DPrintf("[%d-%s]: peer %d is shutting down, reject RV rpc request.\n", rf.me, rf, rf.me) return default: } rf.mu.Lock() defer rf.mu.Unlock() lastLogIdx, lastLogTerm := rf.lastLogIndexAndTerm() DPrintf("[%d-%s]: rpc RV, from peer: %d, arg term: %d, my term: %d (last log idx: %d->%d, term: %d->%d)\n", rf.me, rf, args.CandidateID, args.Term, rf.CurrentTerm, args.LastLogIndex, lastLogIdx, args.LastLogTerm, lastLogTerm) if args.Term < rf.CurrentTerm { reply.CurrentTerm = rf.CurrentTerm reply.VoteGranted = false } else { if args.Term > rf.CurrentTerm { // convert to follower rf.CurrentTerm = args.Term rf.state = Follower rf.VotedFor = -1 } // if is null (follower) or itself is a candidate (or stale leader) with same term if rf.VotedFor == -1 { //|| (rf.VotedFor == rf.me && !sameTerm) { //|| rf.votedFor == args.CandidateID { // check whether candidate's log is at-least-as update if (args.LastLogTerm == lastLogTerm && args.LastLogIndex >= lastLogIdx) || args.LastLogTerm > lastLogTerm { rf.resetTimer <- struct{}{} rf.state = Follower rf.VotedFor = args.CandidateID reply.VoteGranted = true DPrintf("[%d-%s]: peer %d vote to peer %d (last log idx: %d->%d, term: %d->%d)\n", rf.me, rf, rf.me, args.CandidateID, args.LastLogIndex, lastLogIdx, args.LastLogTerm, lastLogTerm) } } } }
除了一些基本的初始化過程,新開了一個(gè)goroutine。
func Make(peers []*labrpc.ClientEnd, me int, persister *Persister, applyCh chan ApplyMsg) *Raft { rf := &Raft{} rf.peers = peers rf.persister = persister rf.me = me rf.applyCh = applyCh // Your initialization code here (2A, 2B, 2C). rf.state = Follower rf.VotedFor = -1 rf.Logs = make([]LogEntry, 1) // first index is 1 rf.Logs[0] = LogEntry{ // placeholder Term: 0, Command: nil, } rf.nextIndex = make([]int, len(peers)) rf.matchIndex = make([]int, len(peers)) rf.electionTimeout = time.Millisecond * time.Duration(400+rand.Intn(100)*4) rf.electionTimer = time.NewTimer(rf.electionTimeout) rf.resetTimer = make(chan struct{}) rf.shutdownCh = make(chan struct{}) // shutdown raft gracefully rf.commitCond = sync.NewCond(&rf.mu) // commitCh, a distinct goroutine rf.heartbeatInterval = time.Millisecond * 40 // small enough, not too small // initialize from state persisted before a crash rf.readPersist(persister.ReadRaftState()) go rf.electionDaemon() // kick off election return rf }
除了shutdown,還有兩個(gè)通道,一個(gè)是electionTimer,用于選舉超時(shí)。
一個(gè)是resetTimer,用于重置選舉超時(shí)。
注意time.reset是很難正確使用的。
一旦選舉超時(shí),調(diào)用go rf.canvassVotes()
// electionDaemon func (rf *Raft) electionDaemon() { for { select { case <-rf.shutdownCh: DPrintf("[%d-%s]: peer %d is shutting down electionDaemon.\n", rf.me, rf, rf.me) return case <-rf.resetTimer: if !rf.electionTimer.Stop() { <-rf.electionTimer.C } rf.electionTimer.Reset(rf.electionTimeout) case <-rf.electionTimer.C: rf.mu.Lock() DPrintf("[%d-%s]: peer %d election timeout, issue election @ term %d\n", rf.me, rf, rf.me, rf.CurrentTerm) rf.mu.Unlock() go rf.canvassVotes() rf.electionTimer.Reset(rf.electionTimeout) } } }
replyHandler是進(jìn)行請(qǐng)求返回后的處理。
當(dāng)前節(jié)點(diǎn)為了成為leader,會(huì)調(diào)用每一個(gè)節(jié)點(diǎn)的RequestVote方法。
如果返回過來的term大于當(dāng)前term,那么當(dāng)前節(jié)點(diǎn)變?yōu)閒ollower,重置選舉超時(shí)時(shí)間。
否則,如果收到了超過一半節(jié)點(diǎn)的投票,那么其變?yōu)榱薼eader,并立即給其他節(jié)點(diǎn)發(fā)送心跳檢測。
// canvassVotes issues RequestVote RPC func (rf *Raft) canvassVotes() { var voteArgs RequestVoteArgs rf.fillRequestVoteArgs(&voteArgs) peers := len(rf.peers) var votes = 1 replyHandler := func(reply *RequestVoteReply) { rf.mu.Lock() defer rf.mu.Unlock() if rf.state == Candidate { if reply.CurrentTerm > voteArgs.Term { rf.CurrentTerm = reply.CurrentTerm rf.turnToFollow() //rf.persist() rf.resetTimer <- struct{}{} // reset timer return } if reply.VoteGranted { if votes == peers/2 { rf.state = Leader rf.resetOnElection() // reset leader state go rf.heartbeatDaemon() // new leader, start heartbeat daemon DPrintf("[%d-%s]: peer %d become new leader.\n", rf.me, rf, rf.me) return } votes++ } } } for i := 0; i < peers; i++ { if i != rf.me { go func(n int) { var reply RequestVoteReply if rf.sendRequestVote(n, &voteArgs, &reply) { replyHandler(&reply) } }(i) } } }
1、leader調(diào)用每一個(gè)節(jié)點(diǎn)的AppendEntries方法。
2、如果當(dāng)前節(jié)點(diǎn)大于調(diào)用節(jié)點(diǎn),那么AppendEntries失敗。否則,修改當(dāng)前的term為最大。
3、如果當(dāng)前節(jié)點(diǎn)是leader,始終將其變?yōu)閒ollower(為了讓leader穩(wěn)定)
4、將當(dāng)前節(jié)點(diǎn)投票給調(diào)用者(對(duì)于落后的節(jié)點(diǎn))。
5、重置當(dāng)前節(jié)點(diǎn)的超時(shí)時(shí)間。
func (rf *Raft) heartbeatDaemon() { for { if _, isLeader := rf.GetState(); !isLeader { return } // reset leader's election timer rf.resetTimer <- struct{}{} select { case <-rf.shutdownCh: return default: for i := 0; i < len(rf.peers); i++ { if i != rf.me { go rf.consistencyCheck(i) // routine heartbeat } } } time.Sleep(rf.heartbeatInterval) } } func (rf *Raft) consistencyCheck(n int) { rf.mu.Lock() defer rf.mu.Unlock() pre := rf.nextIndex[n] - 1 var args = AppendEntriesArgs{ Term: rf.CurrentTerm, LeaderID: rf.me, PrevLogIndex: pre, PrevLogTerm: rf.Logs[pre].Term, Entries: nil, LeaderCommit: rf.commitIndex, } go func() { DPrintf("[%d-%s]: consistency Check to peer %d.\n", rf.me, rf, n) var reply AppendEntriesReply if rf.sendAppendEntries(n, &args, &reply) { rf.consistencyCheckReplyHandler(n, &reply) } }() } func (rf *Raft) AppendEntries(args *AppendEntriesArgs, reply *AppendEntriesReply) { select { case <-rf.shutdownCh: DPrintf("[%d-%s]: peer %d is shutting down, reject AE rpc request.\n", rf.me, rf, rf.me) return default: } DPrintf("[%d-%s]: rpc AE, from peer: %d, term: %d\n", rf.me, rf, args.LeaderID, args.Term) rf.mu.Lock() defer rf.mu.Unlock() if args.Term < rf.CurrentTerm { //DPrintf("[%d-%s]: AE failed from leader %d. (heartbeat: leader's term < follower's term (%d < %d))\n", // rf.me, rf, args.LeaderID, args.Term, rf.currentTerm) reply.CurrentTerm = rf.CurrentTerm reply.Success = false return } if rf.CurrentTerm < args.Term { rf.CurrentTerm = args.Term } // for stale leader if rf.state == Leader { rf.turnToFollow() } // for straggler (follower) if rf.VotedFor != args.LeaderID { rf.VotedFor = args.LeaderID } // valid AE, reset election timer // if the node recieve heartbeat. then it will reset the election timeout rf.resetTimer <- struct{}{} reply.Success = true reply.CurrentTerm = rf.CurrentTerm return }
如果心跳檢測失敗了,那么變?yōu)閒ollower,重置選舉超時(shí)。
// n: which follower func (rf *Raft) consistencyCheckReplyHandler(n int, reply *AppendEntriesReply) { rf.mu.Lock() defer rf.mu.Unlock() if rf.state != Leader { return } if reply.Success { } else { // found a new leader? turn to follower if rf.state == Leader && reply.CurrentTerm > rf.CurrentTerm { rf.turnToFollow() rf.resetTimer <- struct{}{} DPrintf("[%d-%s]: leader %d found new term (heartbeat resp from peer %d), turn to follower.", rf.me, rf, rf.me, n) return } } }
看完上述內(nèi)容,你們掌握raft的運(yùn)用方法了嗎?如果還想學(xué)到更多技能或想了解更多相關(guān)內(nèi)容,歡迎關(guān)注創(chuàng)新互聯(lián)行業(yè)資訊頻道,感謝各位的閱讀!
標(biāo)題名稱:raft的實(shí)際運(yùn)用
分享地址:http://vcdvsql.cn/article2/gjdcic.html
成都網(wǎng)站建設(shè)公司_創(chuàng)新互聯(lián),為您提供外貿(mào)建站、響應(yīng)式網(wǎng)站、移動(dòng)網(wǎng)站建設(shè)、網(wǎng)站設(shè)計(jì)公司、網(wǎng)站導(dǎo)航、搜索引擎優(yōu)化
聲明:本網(wǎng)站發(fā)布的內(nèi)容(圖片、視頻和文字)以用戶投稿、用戶轉(zhuǎn)載內(nèi)容為主,如果涉及侵權(quán)請(qǐng)盡快告知,我們將會(huì)在第一時(shí)間刪除。文章觀點(diǎn)不代表本網(wǎng)站立場,如需處理請(qǐng)聯(lián)系客服。電話:028-86922220;郵箱:631063699@qq.com。內(nèi)容未經(jīng)允許不得轉(zhuǎn)載,或轉(zhuǎn)載時(shí)需注明來源: 創(chuàng)新互聯(lián)