文章详情

短信预约-IT技能 免费直播动态提醒

请输入下面的图形验证码

提交验证

短信预约提醒成功

一篇文章带给你Etcd-Raft学习

2024-12-03 03:24

关注

从本质上说,Raft 算法是通过一切以领导者为准的方式,实现一系列值的共识和各节点日志的一致

Leader 选举

raft 算法本质上是一个大的状态机,任何的操作例如选举、提交数据等,最后都被封装成一个消息结构体,输入到 raft 算法库的状态机中。raft 算法其实由好几个协议组成,etcd-raft 将其统一定义在了 Message 结构体之中,以下总结了该结构体的成员用途:

  1. type Message struct { 
  2. Type             MessageType `protobuf:"varint,1,opt,name=type,enum=raftpb.MessageType" json:"type"` // 消息类型 
  3. To               uint64      `protobuf:"varint,2,opt,name=to" json:"to"` // 消息接收者的节点ID 
  4. From             uint64      `protobuf:"varint,3,opt,name=from" json:"from"` // 消息发送者的节点 ID 
  5. Term             uint64      `protobuf:"varint,4,opt,name=term" json:"term"` // 发送消息的节点的Term值。如果Term值为0,则为本地消息,在etcd-raft模块的实现中,对本地消息进行特殊处理。 
  6. LogTerm          uint64      `protobuf:"varint,5,opt,name=logTerm" json:"logTerm"` // 该消息携带的第一条Entry记录的Term值,日志所处的任期ID 
  7. Index            uint64      `protobuf:"varint,6,opt,name=index" json:"index"` // 日志索引ID,用于节点向 Leader 汇报自己已经commit的日志数据ID 
  8. Entries          []Entry     `protobuf:"bytes,7,rep,name=entries" json:"entries"` // 如果是MsgApp类型的消息,则该字段中保存了Leader节点复制到Follower节点的Entry记录 
  9. Commit           uint64      `protobuf:"varint,8,opt,name=commit" json:"commit"` // 消息发送节点提交日志索引 
  10. Snapshot         Snapshot    `protobuf:"bytes,9,opt,name=snapshot" json:"snapshot"` // 在传输快照时,该字段保存了快照数据 
  11. Reject           bool        `protobuf:"varint,10,opt,name=reject" json:"reject"` // 主要用于响应类型的消息,表示是否拒绝收到的消息 
  12. RejectHint       uint64      `protobuf:"varint,11,opt,name=rejectHint" json:"rejectHint"` //在Follower节点拒绝Leader节点的消息之后,会在该字段记录一个Entry索引值供Leader节点 
  13. Context          []byte      `protobuf:"bytes,12,opt,name=context" json:"context,omitempty"` // 消息携带的一些上下文信息。例如,该消息是否与Leader节点转移相关 
  14. XXX_unrecognized []byte      `json:"-"

Message结构体相关的数据类型为 MessageType,MessageType 有 19 种。当然,并不是所有的消息类型都会用到上面定义的Message结构体中的所有字段,因此其中有些字段是Optinal的。

  1.    MsgHup            MessageType = 0  //当Follower节点的选举计时器超时,会发送MsgHup消息 
  2. MsgBeat           MessageType = 1  //Leader发送心跳,主要作用是探活,Follower接收到MsgBeat会重置选举计时器,防止Follower发起新一轮选举 
  3. MsgProp           MessageType = 2  //客户端发往到集群的写请求是通过MsgProp消息表示的 
  4. MsgApp            MessageType = 3  //当一个节点通过选举成为Leader时,会发送MsgApp消息 
  5. MsgAppResp        MessageType = 4  //MsgApp的响应消息 
  6. MsgVote           MessageType = 5  //当PreCandidate状态节点收到半数以上的投票之后,会发起新一轮的选举,即向集群中的其他节点发送MsgVote消息 
  7. MsgVoteResp       MessageType = 6  //MsgVote选举消息响应的消息 
  8. MsgSnap           MessageType = 7  //Leader向Follower发送快照信息 
  9. MsgHeartbeat      MessageType = 8  //Leader发送的心跳消息 
  10. MsgHeartbeatResp  MessageType = 9  //Follower处理心跳回复返回的消息类型 
  11. MsgUnreachable    MessageType = 10 //Follower消息不可达 
  12. MsgSnapStatus     MessageType = 11 //如果Leader发送MsgSnap消息时出现异常,则会调用Raft接口发送MsgUnreachable和MsgSnapStatus消息 
  13. MsgCheckQuorum    MessageType = 12 //Leader检测是否保持半数以上的连接 
  14. MsgTransferLeader MessageType = 13 //Leader节点转移时使用,本地消息 
  15. MsgTimeoutNow     MessageType = 14 //Leader节点转移超时,会发该类型的消息,使Follower的选举计时器立即过期,并发起新一轮的选举 
  16. MsgReadIndex      MessageType = 15 //客户端发往集群的只读消息使用MsgReadIndex消息(只读的两种模式:ReadOnlySafe和ReadOnlyLeaseBased) 
  17. MsgReadIndexResp  MessageType = 16 //MsgReadIndex消息的响应消息 
  18. MsgPreVote        MessageType = 17 //PreCandidate状态下的节点发送的消息 
  19. MsgPreVoteResp    MessageType = 18 //预选节点收到的响应消息   

然后是 raft 算法的实现,node 结构体实现了 Node 接口,对etcd-raft模块具体实现的一层封装,方便上层模块使用etcd-raft模块。其定义如下:

  1. type node struct { 
  2.  
  3. propc      chan msgWithResult      //该通道用于接收MsgProp类型的消息 
  4.  
  5. recvc      chan pb.Message         //除MsgProp外的其他类型的消息都是由该通道接收的 
  6.  
  7. confc      chan pb.ConfChangeV2    //当节点收到EntryConfChange类型的Entry记录时,会转换成ConfChange,并写入该通道中等待处理。在ConfChange中封装了其唯一 ID、待处理的节点 ID (NodeID 字段)及处理类型(Type 字段,例如,ConfChangeAddNode类型表示添加节点)等信息 
  8. confstatec chan pb.ConfState       //在ConfState中封装了当前集群中所有节点的ID,该通道用于向上层模块返回ConfState实例 
  9.  
  10. readyc     chan Ready              //Ready结构体的功能在上一小节已经介绍过了,该通道用于向上层模块返回Ready实例,即node.Ready()方法的返回值 
  11.  
  12. advancec   chan struct{}           //当上层模块处理完通过上述readyc通道获取到的Ready实例之后,会通过node.Advance()方法向该通道写入信号,从而通知底层raft实例 
  13.  
  14. tickc      chan struct{}                //用来接收逻辑时钟发出的信号,之后会根据当前节点的角色推进选举计时器和心跳计时器 
  15.  
  16. done       chan struct{}           //当检测到done通道关闭后,在其上阻塞的goroutine会继续执行,并进行相应的关闭操作 
  17.  
  18. stop       chan struct{}           //当node.Stop()方法被调用时,会向该通道发送信号,在后续介绍中会提到,有另一个goroutine会尝试读取该通道中的内容,当读取到信息之后,会关闭done通道。 
  19.  
  20. status     chan chan Status        //注意该通道的类型,其中传递的元素也是Channel类型,即node.Status()方法的返回值 
  21.  
  22.  rn        *RawNode 
  23.  

下面我们来看看 raft StateMachine 的状态机转换,实际上就是 raft 算法中各种角色的转换。每个 raft 节点,可能具有以下三种状态中的一种。

每一个状态都有其对应的状态机,每次收到一条提交的数据时,都会根据其不同的状态将消息输入到不同状态的状态机中。同时,在进行 tick 操作时,每种状态对应的处理函数也是不一样的。因此 raft 结构体中将不同的状态及其不同的处理函数,独立出来几个成员变量:

我们接着看 etcd-raft 状态转换。etcd-raft StateMachine 封装在 raft机构体中,etcd为了不让entry落后的太多的直接进行选举,多了一个其PreCandidate状态,转换如下图:

raft 状态转换的接口都在 raft.go 中,其定义如下:

  1. //在newRaft()函数中完成初始化之后,会调用 becomeFollower()方法将节点切换成 Follower状态,其中会设置raft实例的多个字段 
  2. func (r *raft) becomeFollower(term uint64, lead uint64) { 
  3.  r.step = stepFollower //设置函数处理Follower节点处理消息的行为 
  4.  r.reset(term) //在reset()方法中会重置raft实例的多个字段 
  5.  r.tick = r.tickElection //将tick字段设置成tickElection函数 
  6.  r.lead = lead //设置当前节点的leader节点 
  7.     //修改当前节点的角色 
  8.  r.state = StateFollower 
  9.  
  10. //如果当前集群开启了 PreVote 模式,当 Follower 节点的选举计时器超时时,会先调用becomePreCandidate()方法切换到PreCandidate状态,becomePreCandidate() 
  11. func (r *raft) becomePreCandidate() { 
  12.     //检查当前节点的状态,禁止leader直接切换到PreCandidate状态 
  13.  if r.state == StateLeader { 
  14.   panic("invalid transition [leader -> pre-candidate]"
  15.  } 
  16.     //设置函数处理Candidate节点处理消息的行为 
  17.  r.step = stepCandidate  
  18.  r.prs.ResetVotes() 
  19.  r.tick = r.tickElection 
  20.  r.lead = None 
  21.     //修改当前节点的角色 
  22.  r.state = StatePreCandidate  
  23. //当节点可以连接到集群中半数以上的节点时,会调用 becomeCandidate()方法切换到Candidate状态,becomeCandidate() 
  24. func (r *raft) becomeCandidate() { 
  25.  // TODO(xiangli) remove the panic when the raft implementation is stable 
  26.  if r.state == StateLeader { 
  27.   panic("invalid transition [leader -> candidate]"
  28.  } 
  29.     //在reset()方法中会重置raft实例的多个字段 
  30.  r.step = stepCandidate 
  31.  r.reset(r.Term + 1) //在reset()方法中会重置raft实例的多个字段 
  32.  r.tick = r.tickElection 
  33.  r.Vote = r.id //在此次的选举中,Candidate节点会将选票投给自己 
  34.     //修改当前节点的角色 
  35.  r.state = StateCandidate 
  36.  
  37. //当 Candidate 节点得到集群中半数以上节点的选票时,会调用 becomeLeader()方法切换成Leader状态,becomeLeader() 
  38. func (r *raft) becomeLeader() { 
  39.     //检查当前节点的状态,机制从follower直接切换成leader状态 
  40.  if r.state == StateFollower { 
  41.   panic("invalid transition [follower -> leader]"
  42.  } 
  43.  r.step = stepLeader 
  44.  r.reset(r.Term) //在reset()方法中会重置raft实例的多个字段 
  45.  r.tick = r.tickHeartbeat 
  46.  r.lead = r.id //将leader字段设置成当前节点的id 
  47.  r.state = StateLeader //更新当前节点的角色 
  48.     //检查未提交的记录中是否存在多条集群配置变更的Entry记录 
  49.  r.prs.Progress[r.id].BecomeReplicate() 
  50.  r.pendingConfIndex = r.raftLog.lastIndex() 
  51.  emptyEnt := pb.Entry{Data: nil} 
  52.     //向当前节点的raftLog中追加一条空的Entry记录 
  53.  if !r.appendEntry(emptyEnt) { 
  54.     } 
  55.  r.reduceUncommittedSize([]pb.Entry{emptyEnt}) 

tick 函数,每个状态对应的 tick 函数不同,下面分析两个tick:

  1. func (r *raft) tickElection() { 
  2.  r.electionElapsed++ //递增electionElapsed计时器 
  3.  
  4.  if r.promotable() && r.pastElectionTimeout() { //检查是否在集群中与检查单签的选举计时器是否超时 
  5.   r.electionElapsed = 0 
  6.   r.Step(pb.Message{From: r.id, Type: pb.MsgHup}) //发起step处理pb.MsgHup类型消息。 
  7.  } 
  8.  
  9. func (r *raft) tickHeartbeat() { 
  10.  r.heartbeatElapsed++ //递增heartbeatElapsed计时器 
  11.  r.electionElapsed++ //递增electionElapsed计时器 
  12.  if r.electionElapsed >= r.electionTimeout { 
  13.   r.electionElapsed = 0 //重置选举计时器,leader节点不会主动发起选举 
  14.   if r.checkQuorum { //进行多数检查 
  15.    r.Step(pb.Message{From: r.id, Type: pb.MsgCheckQuorum}) //发起大多数检查。 
  16.   } 
  17.         //选举计时器处于electionElapsed~randomizedElectionTimeout时段之间时,不能进行leader转移 
  18.   if r.state == StateLeader && r.leadTransferee != None { 
  19.    r.abortLeaderTransfer() //清空raft.leadTransferee字段,放弃转移 
  20.   } 
  21.  } 
  22.  if r.state != StateLeader { //只有laeder能发送tickHeartbeat 
  23.   return 
  24.  } 
  25.  if r.heartbeatElapsed >= r.heartbeatTimeout { //心跳计时器超时 
  26.   r.heartbeatElapsed = 0 //重置心跳计时器 
  27.   r.Step(pb.Message{From: r.id, Type: pb.MsgBeat}) //发起step处理MsgBeat类型消息 
  28.  } 

跟随者、预选候选人、候选人、领导者 4 种节点状态都有分别对应的功能函数,当需要查看各节点状态相关的功能实现时(比如,跟随者如何接收和处理日志),都可以将对应的函数作为入口函数,来阅读代码和研究功能实现。

日志复制

这里重点看一下raft.appendEntry()方法,它的主要操作步骤如下:(1)设置待追加的Entry记录的Term值和Index值。

(2)向当前节点的raftLog中追加Entry记录。

(3)更新当前节点对应的Progress实例。

(4)尝试提交Entry记录,即修改raftLog.committed字段的值。

raft.appendEntry()方法的具体实现如下:

  1. func (r *raft) appendEntry(es ...pb.Entry) (accepted bool) { 
  2.  li := r.raftLog.lastIndex()//获取raftLog中最后一条记录的索引值 
  3.  for i := range es {//更新待追加记录的Term值和索引值 
  4.   es[i].Term = r.Term//Entry记录的Term指定为当前leader节点的任期号 
  5.   es[i].Index = li + 1 + uint64(i) //为日志记录指定的Index 
  6.  } 
  7.  li = r.raftLog.append(es...)//向raft中追加记录 
  8.     //更新当前节点对应的Progress,主要是更新Next和Match 
  9.  r.prs.Progress[r.id].MaybeUpdate(li) 
  10.     //尝试提交Entry记录 
  11.  r.maybeCommit() 
  12.  return true 

在Progress.mayUpdate()方法中,会尝试修改Match字段和Next字段,用来标识对应节点Entry记录复制的情况。Leader节点除了在向自身raftLog中追加记录时(即appendEntry()方法)会调用该方法,当Leader节点收到Follower节点的MsgAppResp消息(即MsgApp消息的响应消息)时,也会调用该方法尝试修改Follower节点对应的Progress实例。Progress.MayUpdate()方法的具体实现如下:

  1. func (pr *Progress) MaybeUpdate(n uint64) bool { 
  2.  var updated bool 
  3.  if pr.Match < n { 
  4.   pr.Match = n //n之前所有的Entry记录都已经写入对应节点的raftLog中 
  5.   updated = true 
  6.         //下面将Progress.paused设置为false,表示leader节点可以继续向对应Follower 
  7.         //节点发送MsgApp消息 
  8.   pr.ProbeAcked() 
  9.  } 
  10.  pr.Next = max(pr.Next, n+1)//将Next值加一,下一次复制Entry记录开始的位置 
  11.  return updated 

如果该Entry记录已经复制到了半数以上的节点中,则在raft.maybeCommit()方法中会尝试将其提交。除了 appendEntry()方法,在 Leader 节点每次收到 MsgAppResp 消息时也会调用maybeCommit()方法,maybeCommit()方法的具体实现如下:

  1. func (r *raft) maybeCommit() bool { 
  2.  mci := r.prs.Committed() 
  3.  return r.raftLog.maybeCommit(mci, r.Term) 
  4.  
  5. func (p *ProgressTracker) Committed() uint64 { 
  6.  return uint64(p.Voters.CommittedIndex(matchAckIndexer(p.Progress))) 
  7. //将node分两个组,JointConfig是大多数的组,有兴趣的看一看quorum包的实现 
  8. func (c JointConfig) CommittedIndex(l AckedIndexer) Index {//比较大多数的node的前俩个Index,返回Match的值。 
  9.  idx0 := c[0].CommittedIndex(l) 
  10.  idx1 := c[1].CommittedIndex(l) 
  11.  if idx0 < idx1 { 
  12.   return idx0 
  13.  } 
  14.  return idx1 
  15. //更新raftLog.committed字段,完成提交 
  16. func (l *raftLog) maybeCommit(maxIndex, term uint64) bool { 
  17.  if maxIndex > l.committed && l.zeroTermOnErrCompacted(l.term(maxIndex)) == term { 
  18.   l.commitTo(maxIndex) 
  19.   return true 
  20.  } 
  21.  return false 

etcd 将 raft 相关的所有处理都抽象为了 Message,通过 Step 接口处理各类消息的入口,首先根据Term"值"对消息进行分类处理,再根据消息的"类型"进行分类处理:

  1. func (r *raft) Step(m pb.Message) error { 
  2.  switch {//首先根据消息的Term值进行分类处理 
  3.  case m.Term == 0://本地消息不做处理。MsgHup,MsgProp和MsgReadIndex是本地消息 
  4.  case m.Term > r.Term: 
  5.  case m.Term < r.Term://细节部分,可以自己研究源码 
  6.  } 
  7.  switch m.Type {//根据Message的Type进行分类处理 
  8.  case pb.MsgHup://这里针对MsgHup类型的消息进行处理。 
  9.   if r.preVote {//检查是不是开启了preVote,如果是开启了先调用raft.hup方法,发起preVote。 
  10.   } else { 
  11.    r.hup(campaignElection)//下面讲述 
  12.   } 
  13.  case pb.MsgVote, pb.MsgPreVote: //对MsgVote,MsgPreVote类型的消息进行处理。 
  14.   canVote := r.Vote == m.From || 
  15.    (r.Vote == None && r.lead == None) || 
  16.    (m.Type == pb.MsgPreVote && m.Term > r.Term) 
  17.   if canVote && r.raftLog.isUpToDate(m.Index, m.LogTerm) { 
  18.    r.send(pb.Message{To: m.From, Term: m.Term, Type: voteRespMsgType(m.Type)}) 
  19.    if m.Type == pb.MsgVote { 
  20.     r.electionElapsed = 0 
  21.     r.Vote = m.From 
  22.    } 
  23.   } else { 
  24.    r.send(pb.Message{To: m.From, Term: r.Term, Type: voteRespMsgType(m.Type), Reject: true}) 
  25.   } 
  26.  default://对于其他类型的消息处理,对应的node的step函数处理 
  27.   err := r.step(r, m) 
  28.   if err != nil { 
  29.    return err 
  30.   } 
  31.  } 
  32.  return nil 

这里主要使用hup函数对Message来做处理,在raft.campaign()方法中,除了完成状态切换,还会向集群中的其他节点发送相应类型的消息,例如,如果当前 Follower 节点要切换成 PreCandidate 状态,则会发送 MsgPreVote 消息:

  1. func (r *raft) hup(t CampaignType) { 
  2.  if r.state == StateLeader {//忽略leader 
  3.   return 
  4.  } 
  5.     //方法会检查prs字段中是否还存在当前节点对应的Progress实例,这是为了监测当前节点是否被从集群中移除了 
  6.     if !r.promotable() { 
  7.   return 
  8.  } 
  9.     //获取raftLog中已提交但未应用的Entry记录,异常处理 
  10.  ents, err := r.raftLog.slice(r.raftLog.applied+1, r.raftLog.committed+1, noLimit) 
  11.  r.campaign(t) 
  12. func (r *raft) campaign(t CampaignType) { 
  13.     //该方法的会发送一条包含Term值和类型 
  14.  var term uint64 
  15.  var voteMsg pb.MessageType 
  16.  if t == campaignPreElection {//切换的目标状态是Precandidate 
  17.   r.becomePreCandidate() 
  18.   voteMsg = pb.MsgPreVote 
  19.         //确定要发送的Term值,这里只是增加了消息的Term值,并未增加raft.term字段的值 
  20.   term = r.Term + 1 
  21.  } else {//切换的目标状态是Candidate 
  22.   r.becomeCandidate() 
  23.   voteMsg = pb.MsgVote 
  24.         //给raft.Term字段的值,并将当前节点的选票投给自身 
  25.   term = r.Term 
  26.  } 
  27.  if _, _, res := r.poll(r.id, voteRespMsgType(voteMsg), true); res == quorum.VoteWon { 
  28.         //当得到足够的选票时,则将PreCandidate状态的节点切换成Candidate状态 
  29.         //Candidate状态的节点则切换成Leader状态 
  30.   if t == campaignPreElection { 
  31.    r.campaign(campaignElection) 
  32.   } else { 
  33.    r.becomeLeader() 
  34.   } 
  35.   return 
  36.  } 
  37.  var ids []uint64 
  38.  { 
  39.   idMap := r.prs.Voters.IDs() 
  40.   ids = make([]uint64, 0, len(idMap)) 
  41.   for id := range idMap { 
  42.    ids = append(ids, id) 
  43.   } 
  44.   sort.Slice(ids, func(i, j int) bool { return ids[i] < ids[j] }) 
  45.  } 
  46.  for _, id := range ids {//状态切换完成之后,当前节点会向集群中所有节点发送指定类型的消息 
  47.   if id == r.id { //跳过当前节点自身 
  48.    continue 
  49.   } 
  50.         var ctx []byte 
  51.         //在进行Leader节点转移时,MsgPreVote或MsgVote消息会在Context字段中设置该特殊值 
  52.   if t == campaignTransfer { 
  53.    ctx = []byte(t) 
  54.   } 
  55.         //发送指定类型的消息,其中Index和LogTerm分别是当前节点的raftLog 
  56.         //最后一条消息的Index值和Term值 
  57.   r.send(pb.Message{Term: term, To: id, Type: voteMsg, Index: r.raftLog.lastIndex(), LogTerm: r.raftLog.lastTerm(), Context: ctx}) 
  58.  } 

Follower 节点在选举计时器超时的行为:首先它会通过 tickElection()创建MsgHup消息并将其交给raft.Step()方法进行处理;raft.Step()方法会将当前Follower节点切换成PreCandidate状态,然后创建MsgPreVote类型的消息,最后将该消息追加到raft.msgs字段中,等待上层模块将其发送出去。

本文转载自微信公众号「运维开发故事」,可以通过以下二维码关注。转载本文请联系运维开发故事公众号。

 

来源:运维开发故事内容投诉

免责声明:

① 本站未注明“稿件来源”的信息均来自网络整理。其文字、图片和音视频稿件的所属权归原作者所有。本站收集整理出于非商业性的教育和科研之目的,并不意味着本站赞同其观点或证实其内容的真实性。仅作为临时的测试数据,供内部测试之用。本站并未授权任何人以任何方式主动获取本站任何信息。

② 本站未注明“稿件来源”的临时测试数据将在测试完成后最终做删除处理。有问题或投稿请发送至: 邮箱/279061341@qq.com QQ/279061341

软考中级精品资料免费领

  • 历年真题答案解析
  • 备考技巧名师总结
  • 高频考点精准押题
  • 2024年上半年信息系统项目管理师第二批次真题及答案解析(完整版)

    难度     813人已做
    查看
  • 【考后总结】2024年5月26日信息系统项目管理师第2批次考情分析

    难度     354人已做
    查看
  • 【考后总结】2024年5月25日信息系统项目管理师第1批次考情分析

    难度     318人已做
    查看
  • 2024年上半年软考高项第一、二批次真题考点汇总(完整版)

    难度     435人已做
    查看
  • 2024年上半年系统架构设计师考试综合知识真题

    难度     224人已做
    查看

相关文章

发现更多好内容

猜你喜欢

AI推送时光机
位置:首页-资讯-后端开发
咦!没有更多了?去看看其它编程学习网 内容吧
首页课程
资料下载
问答资讯