etcd Raft库解析
概述
本文是博客解析raft算法及etcd raft库实现的系列三篇文章之一,之所以详细结合etcd实现解析raft算法原理及实现,因为etcd的raft实现是最接近论文本身的,结合论文原理一起阅读十分酸爽。这个系列文章的索引如下:
另外,我个人还针对etcd 3.1.10版本的raft相关代码实现做了一些代码的注释笔记,地址在此:etcd-3.1.10-codedump。
序言
今年初开始学习了解Raft协议,论文读下来之后还是决定结合一个成熟的代码进行更深的理解。etcd做为一个非常成熟的作品,其Raft库实现也非常精妙,屏蔽了网络、存储等模块,提供接口由上层应用者来实现。
本篇文章解析etcd的Raft库实现,基于etcd 3.1.10版本。etcd的Raft库,位于其代码目录的Raft中。
我自己也单独将3.1.10的代码拉出了一个专门添加了我阅读代码注释的版本,目前Raft这部分基本都做了注释,见: https://github.com/lichuang/etcd-3.1.10-codedump
以下在介绍的时候,可能会混用中文和英文术语,这里先列举出来:
英文 | 中文 |
---|---|
Term | 选举任期,每次选举之后递增1 |
Vote | 选举投票(的ID) |
Entry | Raft算法的日志数据条目 |
candidate | 候选人 |
leader | 领导者 |
follower | 跟随者 |
commit | 提交 |
propose | 提议 |
输入及输出
既然做为一个库使用,就有其确定的输入和输出接口,先来了解这部分再进行后续的展开讨论。
作为一个一致性算法的库,不难想象使用的一般场景是这样的:
- 应用层接收到新的写入数据请求,向该算法库写入一个数据。
- 算法库返回是否写入成功。
- 应用层根据写入结果进行下一步的操作。
然而,Raft库却相对而言更复杂一些,因为还有以下的问题存在:
- 写入的数据,可能是集群状态变更的数据,Raft库在执行写入这类数据之后,需要返回新的状态给应用层。
- Raft库中的数据不可能一直以日志的形式存在,这样会导致数据越来越大,所以有可能被压缩成快照(snapshot)的数据形式,这种情况下也需要返回这部分快照数据。
- 由于etcd的Raft库不包括持久化数据存储相关的模块,而是由应用层自己来做实现,所以也需要返回在某次写入成功之后,哪些数据可以进行持久化保存了。
- 同样的,etcd的Raft库也不自己实现网络传输,所以同样需要返回哪些数据需要进行网络传输给集群中的其他节点。
以上的这些,集中在raft/node.go的Ready结构体中,其包括以下成员:
成员名称 | 类型 | 作用 |
---|---|---|
SoftState | SoftState | 软状态,软状态易变且不需要保存在WAL日志中的状态数据,包括:集群leader、节点的当前状态 |
HardState | HardState | 硬状态,与软状态相反,需要写入持久化存储中,包括:节点当前Term、Vote、Commit |
ReadStates | []ReadStates | 用于读一致性的数据,后续会详细介绍 |
Entries | []pb.Entry | 在向其他集群发送消息之前需要先写入持久化存储的日志数据 |
Snapshot | pb.Snapshot | 需要写入持久化存储中的快照数据 |
CommittedEntries | []pb.Entry | 需要输入到状态机中的数据,这些数据之前已经被保存到持久化存储中了 |
Messages | []pb.Message | 在entries被写入持久化存储中以后,需要发送出去的数据 |
以上的成员说明,最开始看不一定能理解其含义和用法,不过在后续会慢慢展开讨论。
根据上面的分析,应用层在写入一段数据之后,Raft库将返回这样一个Ready结构体,其中可能某些字段是空的,毕竟不是每次改动都会导致Ready结构体中的成员都发生变化,此时使用者就需要根据情况,取出其中不为空的成员进行操作了。
在etcd项目中,也提供了使用Raft库的demo例子,在contrib/raftexample目录中,这里简单的演示了一下如何根据这个raft库实现一个简单的KV存储服务器,下面根据这里的代码结合着上面的Ready结构体,来分析如何使用etcd的Raft库。
raft库对外提供一个Node的interface,其实现有raft/node.go中的node结构体实现,这也是应用层唯一需要与这个raft库直接打交道的结构体,简单的来看看Node接口需要实现的函数:
函数 | 作用 |
---|---|
Tick | 应用层每次tick时需要调用该函数,将会由这里驱动raft的一些操作比如选举等。至于tick的单位是多少由应用层自己决定,只要保证是恒定时间都会来调用一次就好了。 |
Campaign | 调用该函数将驱动节点进入候选人状态,进而将竞争leader。 |
Propose | 提议写入数据到日志中,可能会返回错误。 |
ProposeConfChange | 提交配置变更 |
Step | 将消息msg灌入状态机中 |
Ready | 这里是核心函数,将返回Ready的channel,应用层需要关注这个channel,当发生变更时将其中的数据进行操作 |
Advance | Advance函数是当使用者已经将上一次Ready数据处理之后,调用该函数告诉raft库可以进行下一步的操作 |
这里大部分函数在这个demo中不需要进行关心,我们只看如何对接Ready结构体就好了。
raftexample中,首先在main.go中创建了两个channel:
- proposeC:用于提交写入的数据。
- confChangeC:用于提交配置改动数据。
然后分别启动如下核心的协程:
- 启动HTTP服务器,用于接收用户的请求数据,最终会将用户请求的数据写入前面的proposeC/confChangeC channel中。
- 启动raftNode结构体,该结构体中有上面提到的raft/node.go中的node结构体,也就是通过该结构体实现的Node接口与raft库进行交互。同时,raftNode还会启动协程监听前面的两个channel,收到数据之后通过Node接口的函数调用raft库对应的接口。
以上的交互流程就很清楚了,HTTP服务负责接收用户数据,再写入到两个核心channel中,而raftNode负责监听这两个channel:
- 如果收到proposeC channel的消息,说明有数据提交,则调用Node.Propose函数进行数据的提交。
- 如果收到confChangeC channel的消息,说明有配置变更,则调用Node.ProposeConfChange函数进行配置变更。
- 设置一个定时器tick,每次定时器到时时,调用Node.Tick函数。
- 监听Node.Ready函数返回的Ready结构体channel,有数据变更时根据Ready结构体的不同数据类型进行相应的操作,完成了之后需要调用Node.Advance函数进行收尾。
将以上流程用伪代码实现如下:
// HTTP server
HttpServer主循环:
接收用户提交的数据:
如果是PUT请求:
将数据写入到proposeC中
如果是POST请求:
将配置变更数据写入到confChangeC中
// raft Node
raftNode结构体主循环:
如果proposeC中有数据写入:
调用node.Propose向raft库提交数据
如果confChangeC中有数据写入:
调用node.Node.ProposeConfChange向raft库提交配置变更数据
如果tick定时器到期:
调用node.Tick函数进行raft库的定时操作
如果node.Ready()函数返回的Ready结构体channel有数据变更:
依次处理Ready结构体中各成员数据
处理完毕之后调用node.Advance函数进行收尾处理
到了这里,已经对raft的使用有一个基本的概念了,即通过node结构体实现的Node接口与raft库进行交互,涉及数据变更的核心数据结构就是Ready结构体,接下来可以进一步来分析该库的实现了。
raft库代码结构及核心数据结构
现在可以来看看raft库的代码组织了。
前面已经看到了raft/node.go文件中,提供出去的是Node接口及其实现node结构体,这是外界与raft库打交道的唯一接口,除此之外该路径下的其他文件并不直接与外界打交道。
接着是raft算法的实现文件,raft/raft.go文件,其中包含两个核心数据结构:
- Config:与raft算法相关的配置参数都包装在该结构体中。从这个结构体的命名是大写字母开头,就可以知道是提供给外部调用的。
- raft:具体实现raft算法的结构体。
除去以上两个文件之外,raft目录下的其他文件,都是间接给raft结构体服务的,下面的表格做一个总结和罗列:
结构体/接口 | 所在文件 | 作用 |
---|---|---|
Node接口 | node.go | 提供raft库与外界交互的接口 |
node | node.go | 实现Node接口 |
Config | raft.go | 封装raft算法相关配置参数 |
raft | raft.go | raft算法的实现 |
ReadState | read_only.go | 线性一致性读相关 |
readOnly | read_only.go | 线性一致性读相关 |
raftLog | log.go | 实现raft日志操作 |
Progress | progress.go | 该数据结构用于在leader中保存每个follower的状态信息,leader将根据这些信息决定发送给节点的日志 |
Storage接口 | storage.go | 提供存储接口,应用层可以按照自己的需求实现该接口 |
raft库日志存储相关结构
unstable
顾名思义,unstable数据结构用于还没有被用户层持久化的数据,而其中又包括两部分,如下图所示。
在上图中,前半部分是快照数据,而后半部分是日志条目组成的数组entries,另外unstable.offset成员保存的是entries数组中的第一条数据在raft日志中的索引,即第i条entries数组数据在raft日志中的索引为i + unstable.offset。
这两个部分,并不同时存在,同一时间只有一个部分存在。其中,快照数据仅当当前节点在接收从leader发送过来的快照数据时存在,在接收快照数据的时候,entries数组中是没有数据的;除了这种情况之外,就只会存在entries数组的数据了。因此,当接收完毕快照数据进入正常的接收日志流程时,快照数据将被置空。
理解了以上unstable中数据的分布情况,就不难理解unstable各个函数成员的作用了,下面逐一进行解释。
- maybeFirstIndex:返回unstable数据的第一条数据索引。因为只有快照数据在最前面,因此这个函数只有当快照数据存在的时候才能拿到第一条数据索引,其他的情况下已经拿不到了。
- maybeLastIndex:返回最后一条数据的索引。因为是entries数据在后,而快照数据在前,所以取最后一条数据索引是从entries开始查,查不到的情况下才查快照数据。
- maybeTerm:这个函数根据传入的日志数据索引,得到这个日志对应的任期号。前面已经提过,unstable.offset是快照数据和entries数组的分界线,因为在这个函数中,会区分传入的参数与offset的大小关系,小于offset的情况下在快照数据中查询,否则就在entries数组中查询了。
- stableTo:该函数传入一个索引号i和任期号t,表示应用层已经将这个索引之前的数据进行持久化了,此时unstable要做的事情就是在自己的数据中查询,只有在满足任期号相同以及i大于等于offset的情况下,可以将entries中的数据进行缩容,将i之前的数据删除。
- stableSnapTo:该函数传入一个索引i,用于告诉unstable,索引i对应的快照数据已经被应用层持久化了,如果这个索引与当前快照数据对应的上,那么快照数据就可以被置空了。
- restore:从快照数据中恢复,此时unstable将保存快照数据,同时将offset成员设置成这个快照数据索引的下一位。
- truncateAndAppend:传入日志条目数组,这段数据将添加到entries数组中。但是需要注意的是,传入的数据跟现有的entries数据可能有重合的部分,所以需要根据unstable.offset与传入数据的索引大小关系进行处理,有些数据可能会被截断。
- slice:返回索引范围在[lo-u.offset : hi-u.offset]之间的数据。
- mustCheckOutOfBounds:检查传入的数据索引范围是否合理。
Storage接口
Storage接口,提供了存储持久化日志相关的接口操作。其提供出来的接口函数说明如下。
- InitialState() (pb.HardState, pb.ConfState, error):返回当前的初始状态,其中包括硬状态(HardState)以及配置(里面存储了集群中有哪些节点)。
- Entries(lo, hi, maxSize uint64) ([]pb.Entry, error):传入起始和结束索引值,以及最大的尺寸,返回索引范围在这个传入范围以内并且不超过大小的日志条目数组。
- Term(i uint64) (uint64, error):传入日志索引i,返回这条日志对应的任期号。找不到的情况下error返回值不为空,其中当返回ErrCompacted表示传入的索引数据已经找不到,说明已经被压缩成快照数据了;返回ErrUnavailable:表示传入的索引值大于当前的最大索引。
- LastIndex() (uint64, error):返回最后一条数据的索引。
- FirstIndex() (uint64, error):返回第一条数据的索引。
- Snapshot() (pb.Snapshot, error):返回最近的快照数据。
我对这个接口提供出来的接口函数比较有疑问,因为搜索了etcd的代码,该接口只有MemoryStorage一个实现,而实际上MemoryStorage这个结构体还有其他的函数,比如添加日志数据的操作,但是这个操作并没有在Storage接口中声明。
接下来看看实现了Storage接口的MemoryStorage结构体的实现,其成员主要包括以下几个部分:
- hardState pb.HardState:存储硬状态。
- snapshot pb.Snapshot:存储快照数据。
- ents []pb.Entry:存储紧跟着快照数据的日志条目数组,即ents[i]保存的日志数据索引位置为i + snapshot.Metadata.Index。
raftLog的实现
有了以上的介绍unstable、Storage的准备之后,下面可以来介绍raftLog的实现,这个结构体承担了raft日志相关的操作。
raftLog由以下成员组成。
- storage Storage:前面提到的存放已经持久化数据的Storage接口。
- unstable unstable:前面分析过的unstable结构体,用于保存应用层还没有持久化的数据。
- committed uint64:保存当前提交的日志数据索引。
- applied uint64:保存当前传入状态机的数据最高索引。
需要说明的是,一条日志数据,首先需要被提交(committed)成功,然后才能被应用(applied)到状态机中。因此,以下不等式一直成立:applied <= committed。
raftLog结构体中,几部分数据的排列如下图所示。
这个数据排布的情况,可以从raftLog的初始化函数中看出来:
func newLog(storage Storage, logger Logger) *raftLog {
if storage == nil {
log.Panic("storage must not be nil")
}
log := &raftLog{
storage: storage,
logger: logger,
}
firstIndex, err := storage.FirstIndex()
if err != nil {
panic(err) // TODO(bdarnell)
}
lastIndex, err := storage.LastIndex()
if err != nil {
panic(err) // TODO(bdarnell)
}
// offset从持久化之后的最后一个index的下一个开始
log.unstable.offset = lastIndex + 1
log.unstable.logger = logger
// Initialize our committed and applied pointers to the time of the last compaction.
// committed和applied从持久化的第一个index的前一个开始
log.committed = firstIndex - 1
log.applied = firstIndex - 1
return log
}
在这里:
- firstIndex:该值取自storage.FirstIndex(),可以从MemoryStorage的实现看到,该值是MemoryStorage.ents数组的第一个数据索引,也就是MemoryStorage结构体中快照数据与日志条目数据的分界线。
- lastIndex:该值取自storage.LastIndex(),可以从MemoryStorage的实现看到,该值是MemoryStorage.ents数组的最后一个数据索引。
- unstable.offset:该值为lastIndex索引的下一个位置。
- committed、applied:在初始的情况下,这两个值是firstIndex的上一个索引位置,这是因为在firstIndex之前的数据既然已经是持久化数据了,说明都是已经被提交成功的数据了。
因此,从这里的代码分析可以看出,raftLog的两部分,持久化存储和非持久化存储,它们之间的分界线就是lastIndex,在此之前都是Storage管理的已经持久化的数据,而在此之后都是unstable管理的还没有持久化的数据。
以上分析中还有一个疑问,为什么并没有初始化unstable.snapshot成员,也就是unstable结构体的快照数据?原因在于,上面这个是初始化函数,也就是节点刚启动的时候调用来初始化存储状态的函数,而unstable.snapshot数据,是在启动之后同步数据的过程中,如果需要同步快照数据时才会去进行赋值修改的数据,因此在这里并没有对它进行操作的地方。
raft消息结构体
大体而言,raft算法本质上是一个大的状态机,任何的操作例如选举、提交数据等,最后的操作一定是封装成一个消息结构体,输入到raft算法库的状态机中。
在raft/raftpb/raft.proto文件中,定义了raft算法中传输消息的结构体。熟悉raft论文的都知道,raft算法其实由好几个协议组成,但是在这里,统一定义在了Message这个结构体之中,以下总结了该结构体的成员用途。
成员 | 类型 | 作用 |
---|---|---|
type | MessageType | 消息类型 |
to | uint64 | 消息接收者的节点ID |
from | uint64 | 消息发送者的节点ID |
term | uint64 | 任期ID |
logTerm | uint64 | 日志所处的任期ID |
index | uint64 | 日志索引ID,用于节点向leader汇报自己已经commit的日志数据ID |
entries | Entry | 日志条目数组 |
commit | uint64 | 提交日志索引 |
snapshot | Snapshot | 快照数据 |
reject | bool | 是否拒绝 |
rejectHint | uint64 | 拒绝同步日志请求时返回的当前节点日志ID,用于被拒绝方快速定位到下一次合适的同步日志位置 |
context | bytes | 上下文数据 |
由于这个Message结构体,全部将raft协议相关的数据都定义在了一起,有些协议不是用到其中的全部数据,所以这里的字段都是optinal的,我个人感觉这样不太好,会导致混合在一起显得杂乱无章,所以这里还是将每个协议(即不同的消息类型)中使用的用途做一个记录,如下。
MsgHup消息
成员 | 类型 | 作用 |
---|---|---|
type | MsgHup | 不用于节点间通信,仅用于发送给本节点让本节点进行选举 |
to | uint64 | 消息接收者的节点ID |
from | uint64 | 本节点ID |
MsgBeat消息
成员 | 类型 | 作用 |
---|---|---|
type | MsgBeat | 不用于节点间通信,仅用于leader节点在heartbeat定时器到期时向集群中其他节点发送心跳消息 |
to | uint64 | 消息接收者的节点ID |
from | uint64 | 本节点ID |
MsgProp消息
成员 | 类型 | 作用 |
---|---|---|
type | MsgProp | raft库使用者提议(propose)数据 |
to | uint64 | 消息接收者的节点ID |
from | uint64 | 本节点ID |
entries | Entry | 日志条目数组 |
raft库的使用者向raft库propose数据时,最后会封装成这个类型的消息来进行提交,不同类型的节点处理还不尽相同。
candidate
由于candidate节点没有处理propose数据的责任,所以忽略这类型消息。
follower
首先会检查集群内是否有leader存在,如果当前没有leader存在说明还在选举过程中,这种情况忽略这类消息;否则转发给leader处理。
leader
leader的处理在leader的状态机函数针对MsgProp这种case的处理下,大体如下。
- 检查entries数组是否没有数据,这是一个保护性检查。
- 检查本节点是否还在集群之中,如果已经不在了则直接返回不进行下一步处理。什么情况下会出现一个leader节点发现自己不存在集群之中了?这种情况出现在本节点已经通过配置变化被移除出了集群的场景。
- 检查raft.leadTransferee字段,当这个字段不为0时说明正在进行leader迁移操作,这种情况下不允许提交数据变更操作,因此此时也是直接返回的。
- 检查消息的entries数组,看其中是否带有配置变更的数据。如果其中带有数据变更而raft.pendingConf为true,说明当前有未提交的配置更操作数据,根据raft论文,每次不同同时进行一次以上的配置变更,因此这里会将entries数组中的配置变更数据置为空数据。
- 到了这里可以进行真正的数据propose操作了,将调用raft算法库的日志模块写入数据,根据返回的情况向其他节点广播消息。
MsgApp/MsgSnap消息
MsgApp消息
成员 | 类型 | 作用 |
---|---|---|
type | MsgApp | 用于leader向集群中其他节点同步数据的消息 |
to | uint64 | 消息接收者的节点ID |
from | uint64 | 本节点ID |
entries | Entry | 日志条目数组 |
logTerm | uint64 | 日志所处的任期ID |
index | uint64 | 索引ID |
MsgSnap消息
成员 | 类型 | 作用 |
---|---|---|
type | MsgSnap | 用于leader向follower同步数据用的快照消息 |
to | uint64 | 消息接收者的节点ID |
from | uint64 | 本节点ID |
snapshot | Snapshot | 快照数据 |
如果说前面的MsgProp消息是集群中的节点向leader转发用户提交的数据,那么MsgApp消息就是相反的,是leader节点用于向集群中其他节点同步数据的。
在这里把MsgSnap消息和MsgApp消息放在一起,是因为MsgSnap消息做的事情其实跟前面提到的MsgApp消息是一样的:都是用于leader向follower同步数据。实际上对于leader而言,向某个节点同步数据这个操作,都封装在raft.sendAppend函数中,至于具体用的哪种消息类型由这个函数内部实现。
那么,什么情况下会用到快照数据来同步呢?raft算法中,任何的数据要提交成功,首先leader会在本地写一份日志,再广播出去给集群的其他节点,只有在超过半数以上的节点同意,leader才能进行提交操作,这一个流程在前面讲解MsgAppResp消息流程时做了解释。
但是,如果这个日志文件不停的增长,显然是不能接受的。因此,在某些时刻,节点会将日志数据进行压缩处理,就是把当前的数据写入到一个快照文件中。而leader在向某一个节点进行数据同步时,是根据该节点上的日志记录进行数据同步的。
比方说,leader上已经有最大索引为10的日志数据,而节点A的日志索引是2,那么leader将从3开始向节点A同步数据。
但是如果前面的数据已经进行了压缩处理,转换成了快照数据,而压缩后的快照数据实际上已经没有日志索引相关的信息了。这时候只能将快照数据全部同步给节点了。还是以前面的流程为例,假如leader上日志索引为7之前的数据都已经被压缩成了快照数据,那么这部分数据在同步时是需要整份传输过去的,只有当同步完成节点赶上了leader上的日志进度时,才开始正常的日志同步流程。 而同步数据时,需要区分两种情况:
MsgAppResp消息
成员 | 类型 | 作用 |
---|---|---|
type | MsgAppResp | 集群中其他节点针对leader的MsgApp/MsgSnap消息的应答消息 |
to | uint64 | 消息接收者的节点ID |
from | uint64 | 本节点ID |
index | uint64 | 日志索引ID,用于节点向leader汇报自己已经commit的日志数据ID |
reject | bool | 是否拒绝同步日志的请求 |
rejectHint | uint64 | 拒绝同步日志请求时返回的当前节点日志ID,用于被拒绝方快速定位到下一次合适的同步日志位置 |
在节点收到leader的MsgApp/MsgSnap消息时,可能出现leader上的数据与自身节点数据不一致的情况,这种情况下会返回reject为true的MsgAppResp消息,同时rejectHint字段是本节点raft最后一条日志的索引ID。
而index字段则返回的是当前节点的日志索引ID,用于向leader汇报自己已经commit的日志数据ID,这样leader就知道下一次同步数据给这个节点时,从哪条日志数据继续同步了。
leader节点在收到MsgAppResp消息的处理流程大体如下(stepLeader函数中MsgAppResp case的处理流程)。
-
首先,收到节点的MsgAppResp消息,说明该节点是活跃的,因此保存节点状态的RecentActive成员置为true。
-
接下来,再根据msg.Reject的返回值,即节点是否拒绝了这次数据同步,来区分两种情况进行处理。
msg.Reject为true的情况
如果msg.Reject为true,说明节点拒绝了前面的MsgApp/MsgSnap消息,根据msg.RejectHint成员回退leader上保存的关于该节点的日志记录状态。比如leader前面认为从日志索引为10的位置开始向节点A同步数据,但是节点A拒绝了这次数据同步,同时返回RejectHint为2,说明节点A告知leader在它上面保存的最大日志索引ID为2,这样下一次leader就可以直接从索引为2的日志数据开始同步数据到节点A。而如果没有这个RejectHint成员,leader只能在每次被拒绝数据同步后都递减1进行下一次数据同步,显然这样是低效的。
-
因为上面节点拒绝了这次数据同步,所以节点的状态可能存在一些异常,此时如果leader上保存的节点状态为ProgressStateReplicate,那么将切换到ProgressStateProbe状态(关于这几种状态,下面会谈到)。
-
前面已经按照msg.RejectHint修改了leader上关于该节点日志状态的索引数据,接着再次尝试按照这个新的索引数据向该节点再次同步数据。
msg.Reject为false的情况
这种情况说明这个节点通过了leader的这一次数据同步请求,这种情况下根据msg.Index来判断在leader中保存的该节点日志数据索引是否发生了更新,如果发生了更新那么就说明这个节点通过了新的数据,这种情况下会做以下的几个操作。
- 修改节点状态
- 如果该节点之前在ProgressStateProbe状态,说明之前处于探测状态,此时可以切换到ProgressStateReplicate,开始正常的接收leader的同步数据了。
- 如果之前处于ProgressStateSnapshot状态,即还在同步副本,说明节点之前可能落后leader数据比较多才采用了接收副本的状态。这里还需要多做一点解释,因为在节点落后leader数据很多的情况下,可能leader会多次通过snapshot同步数据给节点,而当 pr.Match >= pr.PendingSnapshot的时候,说明通过快照来同步数据的流程完成了,这时可以进入正常的接收同步数据状态了,这就是函数Progress.needSnapshotAbort要做的判断。
- 如果之前处于ProgressStateReplicate状态,此时可以修改leader关于这个节点的滑动窗口索引,释放掉这部分数据索引,好让节点可以接收新的数据了。关于这个滑动窗口设计,见下面详细解释。
-
判断是否有新的数据可以提交(commit)了。因为raft的提交数据的流程是这样的:首先节点将数据提议(propose)给leader,leader在将数据写入到自己的日志成功之后,再通过MsgApp把这些提议的数据广播给集群中的其他节点,在某一条日志数据收到超过半数(qurom)的节点同意之后,才认为是可以提交(commit)的。因此每次leader节点在收到一条MsgAppResp类型消息,同时msg.Reject又是false的情况下,都需要去检查当前有哪些日志是超过半数的节点同意的,再将这些可以提交(commit)的数据广播出去。而在没有数据可以提交的情况下,如果之前节点处于暂停状态,那么将继续向该节点同步数据。
-
最后还要做一个跟leader迁移相关的操作。如果该消息节点是准备迁移过去的新leader节点(raft.leadTransferee == msg.From),而且此时该节点上的Match索引已经跟旧的leader的日志最大索引一致,说明新旧节点的日志数据已经同步,可以正式进行集群leader迁移操作了。
MsgVote/MsgPreVote消息以及MsgVoteResp/MsgPreVoteResp消息
这里把这四种消息放在一起了,因为不论是Vote还是PreVote流程,其请求和应答时传输的数据都是一样的。
先看请求数据。
成员 | 类型 | 作用 |
---|---|---|
type | MsgVote/MsgPreVote | 节点投票给自己以进行新一轮的选举 |
to | uint64 | 消息接收者的节点ID |
from | uint64 | 本节点ID |
term | uint64 | 任期ID |
index | uint64 | 日志索引ID,用于节点向leader汇报自己已经commit的日志数据ID |
logTerm | uint64 | 日志所处的任期ID |
context | bytes | 上下文数据 |
应答数据。
成员 | 类型 | 作用 |
---|---|---|
type | MsgVoteResp/MsgPreVoteResp | 投票应答消息 |
to | uint64 | 消息接收者的节点ID |
from | uint64 | 本节点ID |
reject | bool | 是否拒绝 |
节点调用raft.campaign函数进行投票给自己进行一次新的选举,其中的参数CampaignType有以下几种类型:
- campaignPreElection:对应PreVote的场景。
- campaignElection:正常的选举场景。
- campaignTransfer:由于leader迁移发生的选举。如果是这种类型的选举,那么msg.Context字段保存的是“CampaignTransfer”`字符串,这种情况下会强制进行leader的迁移。
MsgVote还需要带上几个与本节点日志相关的数据(Index、LogTerm),因为raft算法要求,一个节点要成为leader的一个必要条件之一就是这个节点上的日志数据是最新的。
PreVote
这里需要特别解释一下PreVote的场景。
考虑到一种情况:当出现网络分区的时候,A、B、C、D、E五个节点被划分成了两个网络分区,A、B、C组成的分区和D、E组成的分区,其中的D节点,如果在选举超时到来时,都没有收到来自leader节点A的消息(因为网络已经分区),那么D节点认为需要开始一次新的选举了。
正常的情况下,节点D应该把自己的任期号term递增1,然后发起一次新的选举。由于网络分区的存在,节点D肯定不会获得超过半数以上的的投票,因为A、B、C三个节点组成的分区不会收到它的消息,这会导致节点D不停的由于选举超时而开始一次新的选举,而每次选举又会递增任期号。
在网络分区还没恢复的情况下,这样做问题不大。但是当网络分区恢复时,由于节点D的任期号大于当前leader节点的任期号,这会导致集群进行一次新的选举,即使节点D肯定不会获得选举成功的情况下(因为节点D的日志落后当前集群太多,不能赢得选举成功)。
为了避免这种无意义的选举流程,节点可以有一种PreVote的状态,在这种状态下,想要参与选举的节点会首先连接集群的其他节点,只有在超过半数以上的节点连接成功时,才能真正发起一次新的选举。
所以,在PreVote状态下发起选举时,并不会导致节点本身的任期号递增1,而只有在进行正常选举时才会将任期号加1进行选举。
MsgVote/MsgPreVote的处理流程
来看看节点对于投票消息的处理,这些处理有两处,但是都在raft.Step函数中。
- 首先该函数会判断msg.Term是否大于本节点的Term,如果消息的任期号更大则说明是一次新的选举。这种情况下将根据msg.Context是否等于“CampaignTransfer”字符串来确定是不是一次由于leader迁移导致的强制选举过程。同时也会根据当前的electionElapsed是否小于electionTimeout来确定是否还在租约期以内。如果既不是强制leader选举又在租约期以内,那么节点将忽略该消息的处理,在论文4.2.3部分论述这样做的原因,是为了避免已经离开集群的节点在不知道自己已经不在集群内的情况下,仍然频繁的向集群内节点发起选举导致耗时在这种无效的选举流程中。如果以上检查流程通过了,说明可以进行选举了,如果消息类型还不是MsgPreVote类型,那么此时节点会切换到follower状态且认为发送消息过来的节点msg.From是新的leader。
case m.Term > r.Term:
// 消息的Term大于节点当前的Term
lead := m.From
if m.Type == pb.MsgVote || m.Type == pb.MsgPreVote {
// 如果收到的是投票类消息
// 当context为campaignTransfer时表示强制要求进行竞选
force := bytes.Equal(m.Context, []byte(campaignTransfer))
// 是否在租约期以内
inLease := r.checkQuorum && r.lead != None && r.electionElapsed < r.electionTimeout
if !force && inLease {
// 如果非强制,而且又在租约期以内,就不做任何处理
// 非强制又在租约期内可以忽略选举消息,见论文的4.2.3,这是为了阻止已经离开集群的节点再次发起投票请求
// If a server receives a RequestVote request within the minimum election timeout
// of hearing from a current leader, it does not update its term or grant its vote
r.logger.Infof("%x [logterm: %d, index: %d, vote: %x] ignored %s from %x [logterm: %d, index: %d] at term %d: lease is not expired (remaining ticks: %d)",
r.id, r.raftLog.lastTerm(), r.raftLog.lastIndex(), r.Vote, m.Type, m.From, m.LogTerm, m.Index, r.Term, r.electionTimeout-r.electionElapsed)
return nil
}
// 否则将lead置为空
lead = None
}
switch {
// 注意Go的switch case不做处理的话是不会默认走到default情况的
case m.Type == pb.MsgPreVote:
// Never change our term in response to a PreVote
// 在应答一个prevote消息时不对任期term做修改
case m.Type == pb.MsgPreVoteResp && !m.Reject:
// We send pre-vote requests with a term in our future. If the
// pre-vote is granted, we will increment our term when we get a
// quorum. If it is not, the term comes from the node that
// rejected our vote so we should become a follower at the new
// term.
default:
r.logger.Infof("%x [term: %d] received a %s message with higher term from %x [term: %d]",
r.id, r.Term, m.Type, m.From, m.Term)
// 变成follower状态
r.becomeFollower(m.Term, lead)
}
- 在raft.Step函数的后面,会判断消息类型是MsgVote或者MsgPreVote来进一步进行处理。其判断条件是以下两个条件同时成立:
- 当前没有给任何节点进行过投票(r.Vote == None ),或者消息的任期号更大(m.Term > r.Term ),或者是之前已经投过票的节点(r.Vote == m.From))。这个条件是检查是否可以还能给该节点投票。
- 同时该节点的日志数据是最新的(r.raftLog.isUpToDate(m.Index, m.LogTerm) )。这个条件是检查这个节点上的日志数据是否足够的新。 只有在满足以上两个条件的情况下,节点才投票给这个消息节点,将修改raft.Vote为消息发送者ID。如果不满足条件,将应答msg.Reject=true,拒绝该节点的投票消息。
case pb.MsgVote, pb.MsgPreVote:
// 收到投票类的消息
// The m.Term > r.Term clause is for MsgPreVote. For MsgVote m.Term should
// always equal r.Term.
if (r.Vote == None || m.Term > r.Term || r.Vote == m.From) && r.raftLog.isUpToDate(m.Index, m.LogTerm) {
// 如果当前没有给任何节点投票(r.Vote == None)或者投票的节点term大于本节点的(m.Term > r.Term)
// 或者是之前已经投票的节点(r.Vote == m.From)
// 同时还满足该节点的消息是最新的(r.raftLog.isUpToDate(m.Index, m.LogTerm)),那么就接收这个节点的投票
r.logger.Infof("%x [logterm: %d, index: %d, vote: %x] cast %s for %x [logterm: %d, index: %d] at term %d",
r.id, r.raftLog.lastTerm(), r.raftLog.lastIndex(), r.Vote, m.Type, m.From, m.LogTerm, m.Index, r.Term)
r.send(pb.Message{To: m.From, Type: voteRespMsgType(m.Type)})
if m.Type == pb.MsgVote {
// Only record real votes.
// 保存下来给哪个节点投票了
r.electionElapsed = 0
r.Vote = m.From
}
} else {
// 否则拒绝投票
r.logger.Infof("%x [logterm: %d, index: %d, vote: %x] rejected %s from %x [logterm: %d, index: %d] at term %d",
r.id, r.raftLog.lastTerm(), r.raftLog.lastIndex(), r.Vote, m.Type, m.From, m.LogTerm, m.Index, r.Term)
r.send(pb.Message{To: m.From, Type: voteRespMsgType(m.Type), Reject: true})
}
MsgVoteResp/MsgPreVoteResp的处理流程
来看节点收到投票应答数据之后的处理。
- 节点调用raft.poll函数,其中传入msg.Reject参数表示发送者是否同意这次选举,根据这些来计算当前集群中有多少节点给这次选举投了同意票。
- 如果有半数的节点同意了,如果选举类型是PreVote,那么进行Vote状态正式进行一轮选举;否则该节点就成为了新的leader,调用raft.becomeLeader函数切换状态,然后开始同步日志数据给集群中其他节点了。
- 而如果半数以上的节点没有同意,那么重新切换到follower状态。
case myVoteRespType:
// 计算当前集群中有多少节点给自己投了票
gr := r.poll(m.From, m.Type, !m.Reject)
r.logger.Infof("%x [quorum:%d] has received %d %s votes and %d vote rejections", r.id, r.quorum(), gr, m.Type, len(r.votes)-gr)
switch r.quorum() {
case gr: // 如果进行投票的节点数量正好是半数以上节点数量
if r.state == StatePreCandidate {
r.campaign(campaignElection)
} else {
// 变成leader
r.becomeLeader()
r.bcastAppend()
}
case len(r.votes) - gr: // 如果是半数以上节点拒绝了投票
// 变成follower
r.becomeFollower(r.Term, None)
}
MsgHeartbeat/MsgHeartbeatResp消息
心跳请求消息。
成员 | 类型 | 作用 |
---|---|---|
type | MsgHeartbeat | 用于leader向follower发送心跳消息 |
to | uint64 | 消息接收者的节点ID |
from | uint64 | 本节点ID |
commit | uint64 | 提交日志索引 |
context | bytes | 上下文数据,在这里保存一致性读相关的数据 |
心跳请求应答消息。
成员 | 类型 | 作用 |
---|---|---|
type | MsgHeartbeatResp | 用于follower向leader应答心跳消息 |
to | uint64 | 消息接收者的节点ID |
from | uint64 | 本节点ID |
context | bytes | 上下文数据,在这里保存一致性读相关的数据 |
leader中会定时向集群中其他节点发送心跳消息,该消息的作用除了探测节点的存活情况之外,还包括:
- commit成员:leader选择min[节点上的Match,leader日志最大提交索引],用于告知节点哪些日志可以进行提交(commit)。
- context:与线性一致性读相关,后面会进行解释。
MsgUnreachable消息
成员 | 类型 | 作用 |
---|---|---|
type | MsgUnreachable | 用于应用层向raft库汇报某个节点当前已不可达 |
to | uint64 | 消息接收者的节点ID |
from | uint64 | 不可用的节点ID |
仅leader才处理这类消息,leader如果判断该节点此时处于正常接收数据的状态(ProgressStateReplicate),那么就切换到探测状态。
MsgSnapStatus消息
成员 | 类型 | 作用 |
---|---|---|
type | MsgSnapStatus | 用于应用层向raft库汇报某个节点当前接收快照状态 |
to | uint64 | 消息接收者的节点ID |
from | uint64 | 节点ID |
reject | bool | 是否拒绝 |
仅leader处理这类消息:
- 如果reject为false:表示接收快照成功,将切换该节点状态到探测状态。
- 否则接收失败。
MsgCheckQuorum消息
成员 | 类型 | 作用 |
---|---|---|
type | MsgCheckQuorum | 用于leader检查集群可用性的消息 |
to | uint64 | 消息接收者的节点ID |
from | uint64 | 节点ID |
leader的定时器函数,在超过选举时间时,如果当前打开了raft.checkQuorum开关,那么leader将给自己发送一条MsgCheckQuorum消息,对该消息的处理是:检查集群中所有节点的状态,如果超过半数的节点都不活跃了,那么leader也切换到follower状态。
MsgTransferLeader消息
成员 | 类型 | 作用 |
---|---|---|
type | MsgTransferLeader | 用于迁移leader |
to | uint64 | 消息接收者的节点ID |
from | uint64 | 注意这里不是发送者的ID了,而是准备迁移过去成为新leader的节点ID |
这类消息follower将转发给leader处理,因为follower并没有修改集群配置状态的权限。
leader在收到这类消息时,是以下的处理流程。
- 如果当前的raft.leadTransferee成员不为空,说明有正在进行的leader迁移流程。此时会判断是否与这次迁移是同样的新leader ID,如果是则忽略该消息直接返回;否则将终止前面还没有完毕的迁移流程。
- 如果这次迁移过去的新节点,就是当前的leader ID,也直接返回不进行处理。
- 到了这一步就是正式开始这一次的迁移leader流程了,一个节点能成为一个集群的leader,其必要条件是上面的日志与当前leader的一样多,所以这里会判断是否满足这个条件,如果满足那么发送MsgTimeoutNow消息给新的leader通知该节点进行leader迁移,否则就先进行日志同步操作让新的leader追上旧leader的日志数据。
case pb.MsgTransferLeader:
leadTransferee := m.From
lastLeadTransferee := r.leadTransferee
if lastLeadTransferee != None {
// 判断是否已经有相同节点的leader转让流程在进行中
if lastLeadTransferee == leadTransferee {
r.logger.Infof("%x [term %d] transfer leadership to %x is in progress, ignores request to same node %x",
r.id, r.Term, leadTransferee, leadTransferee)
// 如果是,直接返回
return
}
// 否则中断之前的转让流程
r.abortLeaderTransfer()
r.logger.Infof("%x [term %d] abort previous transferring leadership to %x", r.id, r.Term, lastLeadTransferee)
}
// 判断是否转让过来的leader是否本节点,如果是也直接返回,因为本节点已经是leader了
if leadTransferee == r.id {
r.logger.Debugf("%x is already leader. Ignored transferring leadership to self", r.id)
return
}
// Transfer leadership to third party.
r.logger.Infof("%x [term %d] starts to transfer leadership to %x", r.id, r.Term, leadTransferee)
// Transfer leadership should be finished in one electionTimeout, so reset r.electionElapsed.
r.electionElapsed = 0
r.leadTransferee = leadTransferee
if pr.Match == r.raftLog.lastIndex() {
// 如果日志已经匹配了,那么就发送timeoutnow协议过去
r.sendTimeoutNow(leadTransferee)
r.logger.Infof("%x sends MsgTimeoutNow to %x immediately as %x already has up-to-date log", r.id, leadTransferee, leadTransferee)
} else {
// 否则继续追加日志
r.sendAppend(leadTransferee)
}
MsgTimeoutNow消息
成员 | 类型 | 作用 |
---|---|---|
type | MsgTimeoutNow | leader迁移时,当新旧leader的日志数据同步后,旧leader向新leader发送该消息通知可以进行迁移了 |
to | uint64 | 新的leader ID |
from | uint64 | 旧的leader的节点ID |
新的leader节点,在还未迁移之前仍然是follower,在收到这条消息后,就可以进行迁移了,此时会调用前面分析MsgVote时说过的campaign函数,传入的参数是campaignTransfer,表示这是一次由于迁移leader导致的选举流程。
MsgReadIndex和MsgReadIndexResp消息
这两个消息一一对应,使用的成员也一样,在后面分析读一致性的时候再详细解释。
成员 | 类型 | 作用 |
---|---|---|
type | MsgReadIndex | 用于读一致性的消息 |
to | uint64 | 接收者节点ID |
from | uint64 | 发送者节点ID |
entries | Entry | 日志条目数组 |
其中,entries数组只会有一条数据,带上的是应用层此次请求的标识数据,在follower收到MsgReadIndex消息进行应答时,同样需要把这个数据原样带回返回给leader,详细的线性读一致性的实现在后面展开分析。
节点状态
每个raft的节点,分为以下三种状态:
- candidate:候选人状态,节点切换到这个状态时,意味着将进行一次新的选举。
- follower:跟随者状态,节点切换到这个状态时,意味着选举结束。
- leader:领导者状态,所有数据提交都必须先提交到leader上。
每一个状态都有其对应的状态机,每次收到一条提交的数据时,都会根据其不同的状态将消息输入到不同状态的状态机中。同时,在进行tick操作时,每种状态对应的处理函数也是不一样的。
所以raft结构体中将不同的状态,及其不同的处理函数独立出来几个成员变量:
成员 | 作用 |
---|---|
state | 保存当前节点状态 |
tick函数 | tick函数,每个状态对应的tick函数不同 |
step函数 | 状态机函数,同样每个状态对应的状态机也不相同 |
raft库中提供几个成员函数becomeCandidate、becomeFollower、becomeLeader分别进入这几种状态的,这些函数中做的事情,概况起来就是:
- 切换raft.state成员到对应状态。
- 切换raft.tick函数到对应状态的处理函数。
- 切换raft.step函数到对应状态的状态机。
选举流程
raft算法的第一步是首先选举出一个leader出来,在没有产生leader的情况下,其他数据提交等操作都无从谈起,所以这里首先从选举的流程开始谈起。
发起选举的节点
只有在candidate或者follower状态下的节点,才有可能发起一个选举流程,而这两种状态的节点,其对应的tick函数都是raft.tickElection函数,这个函数的主要流程是:
- 将选举超时递增1。
- 当选举超时到期,同时该节点又在集群中时,说明此时可以进行一轮新的选举。此时会向本节点发送HUP消息,这个消息最终会走到状态机函数raft.Step中进行处理。
明白了raft.tickElection函数的作用,可以来看选举流程了:
-
节点启动时都以follower状态启动,同时随机选择自己的选举超时时间。之所以每个节点随机选择自己的超时时间,是为了避免同时有两个节点同时进行选举,这种情况下会出现没有任何一个节点赢得半数以上的投票从而这一轮选举失败,继续再进行下一轮选举
-
在follower的tick函数tickElection函数中,当选举超时到时,节点向自己发送HUP消息。
-
在状态机函数raft.Step函数中,在收到HUP消息之后,节点首先判断当前有没有没有apply的配置变更消息,如果有就忽略该消息。其原因在于,当有配置更新的情况下不能进行选举操作,即要保证每一次集群成员变化时只能同时变化一个,不能同时有多个集群成员的状态发生变化。
-
否则进入campaign函数中进行选举:首先将任期号+1,然后广播给其他节点选举消息,带上的其它字段包括:节点当前的最后一条日志索引(Index字段),最后一条日志对应的任期号(LogTerm字段),选举任期号(Term字段,即前面已经进行+1之后的任期号),Context字段(目的是为了告知这一次是否是leader转让类需要强制进行选举的消息)。
-
如果在一个选举超时之内,该发起新的选举流程的节点,得到了超过半数的节点投票,那么状态就切换到leader状态,成为leader的同时,leader将发送一条dummy的append消息,目的是为了提交该节点上在此任期之前的值(见疑问部分如何提交之前任期的值)
收到选举消息的节点
-
当收到任期号大于当前节点任期号的消息,同时该消息类型如果是选举类的消息(类型为prevote或者vote)时,会做以下判断:
-
首先会判断一下该消息是否为强制要求进行选举的类型(context为campaignTransfer,context为这种类型时表示在进行leader转让,流程见下面的leader转让流程)
-
判断当前是否在租约期以内,判断的条件包括:checkQuorum为true,当前节点保存的leader不为空,没有到选举超时,前面这三个条件同时满足。
-
如果不是强制要求选举,同时又在租约期以内,那么就忽略该选举消息返回不进行处理,这么做是为了避免出现那些离开集群的节点,频繁发起新的选举请求(见论文4.2.3)。
-
如果不是前面的忽略选举消息的情况,那么除非是prevote类的选举消息,在收到其他消息的情况下,该节点都切换为follower状态。
-
此时需要针对投票类型中带来的其他字段进行处理了,需要同时满足以下两个条件:
-
只有在没有给其他节点进行过投票,或者消息的term任期号大于当前节点的任期号,或者之前的投票给的就是这个发出消息的节点
-
进行选举的节点,它的日志是更新的,条件为:logterm比本节点最新日志的任期号大,在两者相同的情况下,消息的index大于等于当前节点最新日志的index,即总要保证该选举节点的日志比自己的大。
-
只有在同时满足以上两个条件的情况下,才能同意该节点的选举,否则都会被拒绝。这么做的原因是:保证最后能胜出来当新的leader的节点,它上面的日志都是最新的。
集群成员变化流程
大原则是不能同时进行两个以上的成员变更,因为同时进行两个以上的成员变更,可能会出现集群中有两个leader即导致了集群分裂的情况出现。
成员变化分为以下几种情况:成员删减、leader转让,下面分开讲解。
一般的成员删减
成员变化操作做为日志的特殊类型,当可以进行commit的情况下,各个节点拿出该消息进行节点内部的成员删减操作。
leader转让
-
旧leader在接收到转让leader消息之后,会做如下的判断: a. 如果新的leader上的日志,已经跟当前leader上的日志同步了,那么发送timeout消息。 b. 否则继续发append消息到新的leader上,目的为了让其能够与旧leader日志同步。
-
当旧leader处于转让leader状态时,将停止接收新的prop消息,这样就避免出现在转让过程中新旧leader一直日志不能同步的情况。
-
当旧leader收到append消息应答时,如果当前处于leader转让状态,那么会判断新的leader日志是否已经与当前leader同步,如果是将发送timeout消息。
-
新的leader当收到timeout消息时,将使用context为campaignTransfer的选举消息发起新一轮选举,当context为该类型时,此时的选举是强制进行的(见前面的选举流程)。
如何做到线性一致性?
线性一致性(Linearizable Read)通俗来讲,就是读请求需要读到最新的已经commit的数据,不会读到老数据。
由于所有的leader和follower都能处理客户端的读请求,所以存在可能造成返回读出的旧数据的情况:
-
leader和follower之间存在状态差,因为follower总是由leader同步过去的,可能会返回同步之前的数据。
-
如果发生了网络分区,某个leader实际上已经被隔离出了集群之外,但是该leader并不知道,如果还继续响应客户端的读请求,也可能会返回旧的数据。
因此,在接收到客户端的读请求时,需要保证返回的数据都是当前最新的。
ReadOnlySafe方式
leader在接收到读请求时,需要向集群中的超半数server确认自己仍然是当前的leader,这样它返回的就是最新的数据。
在etcd-raft中,为了实现ReadOnlySafe,有如下的数据结构:
type ReadState struct {
Index uint64
RequestCtx []byte
}
其中:
- Index:接收到该读请求时,当前节点的commit索引。
- RequestCtx:客户端读请求的唯一标识。
ReadState结构体用于保存读请求到来时的节点状态。
type readIndexStatus struct {
req pb.Message
index uint64
acks map[uint64]struct{}
}
readIndexStatus数据结构用于追踪leader向follower发送的心跳信息,其中:
- req:保存原始的readIndex请求。
- index:leader当前的commit日志索引。
- acks:存放该readIndex请求有哪些节点进行了应答,当超过半数应答时,leader就可以确认自己还是当前集群的leader。
type readOnly struct {
option ReadOnlyOption
pendingReadIndex map[string]*readIndexStatus
readIndexQueue []string
}
readOnly用于管理全局的readIndx数据,其中:
- option:readOnly选项。
- pendingReadIndex:当前所有待处理的readIndex请求,其中key为客户端读请求的唯一标识。
- readIndexQueue:保存所有readIndex请求的请求唯一标识数组。
有了以上的数据结构介绍,后面是流程介绍:
-
server收到客户端的读请求,此时会调用raft.ReadIndex函数发起一个MsgReadIndex的请求,带上的参数是客户端读请求的唯一标识(此时可以对照前面分析的MsgReadIndex及其对应应答消息的格式)。
-
follower将向leader直接转发MsgReadIndex消息,而leader收到不论是本节点还是由其他server发来的MsgReadIndex消息,其处理都是:
a. 首先如果该leader在成为新的leader之后没有提交过任何值,那么会直接返回不做处理。
b. 调用r.readOnly.addRequest(r.raftLog.committed, m)保存该MsgreadIndex请求到来时的commit索引。
c. r.bcastHeartbeatWithCtx(m.Entries[0].Data),向集群中所有其他节点广播一个心跳消息MsgHeartbeat,并且在其中带上该读请求的唯一标识。
d. follower在收到leader发送过来的MsgHeartbeat,将应答MsgHeartbeatResp消息,并且如果MsgHeartbeat消息中有ctx数据,MsgHeartbeatResp消息将原样返回这个ctx数据。
e. leader在接收到MsgHeartbeatResp消息后,如果其中有ctx字段,说明该MsgHeartbeatResp消息对应的MsgHeartbeat消息,是收到ReadIndex时leader消息为了确认自己还是集群leader发送的心跳消息。首先会调用r.readOnly.recvAck(m)函数,根据消息中的ctx字段,到全局的pendingReadIndex中查找是否有保存该ctx的带处理的readIndex请求,如果有就在acks map中记录下该follower已经进行了应答。
f. 当ack数量超过了集群半数时,意味着该leader仍然还是集群的leader,此时调用r.readOnly.advance(m)函数,将该readIndex之前的所有readIndex请求都认为是已经成功进行确认的了,所有成功确认的readIndex请求,将会加入到readStates数组中,同时leader也会向follower发送MsgReadIndexResp。
g. follower收到MsgReadIndexResp消息时,同样也会更新自己的readStates数组信息。
h. readStates数组的信息,将做为ready结构体的信息更新给上层的raft协议库的使用者。
需要特别说明的是,处理读请求时,实际上leader需要确保当前自己是不是leader、该读请求对应的commit索引是否得到了半数投票,而当一个节点刚成为leader的时候,如果没有提交过任何数据,那么在它所在的这个任期(term)内的commit索引当时是并不知道的,因此在成为leader之后,需要马上提交一个no-op的空日志,这样拿到该任期的第一个commit索引。
上图中,在leader收到MsgReadIndex后:
- 向readOnly中添加与这次请求ctx相关的数据:
- 向pendingReadIndex中添加以ctx为key的readIndexStatus,其中保存了当前的commitIndex、原始的MsgReadIndex消息、以及用于存放有哪些节点应答了该消息的acks数组。
- 向readIndexQueue数组中添加ctx。
- leader向集群中其他节点广播MsgHeartbeat消息,其中带上这次MsgReadIndex的ctx。
在这之后,follower应答leader的MsgHeartbeat消息,如果消息中存在ctx字段都会带上应答,于是leader中的处理:
- 收到MsgHeartbeatResp消息之后,如果发现其中有ctx,就去计算应答有没有超过半数,没有超过半数则返回。
- 走到这里就是超过半数应答了,此时拿到新的readIndexStatus数组。
- 遍历前面拿到的readIndexStatus数组,生成新的readStates数组。
- 放到Ready中下一次给客户端。
总结一下,分为四步:
- leader检查自己在当前任期有没有commit过一条entry,没有提交过则不允许处理readIndex请求。
- leader记录下来收到readIndex请求时候的commit index,然后leader向集群中所有节点发心跳广播,其中带上readIndex相关的ctx字段。
- 当超过半数的节点应答了第二部的心跳消息,说明此时leader还是集群的leader。
- 生成新的readStates数组放入Ready结构体中,等待下一次客户端来获取该数据。
杂项
节点的几种状态
一个节点在leader上保存的状态有:
const (
ProgressStateProbe ProgressStateType = iota
ProgressStateReplicate
ProgressStateSnapshot
)
以下来分开解释这几种状态。
ProgressStateProbe
探测状态,当节点拒绝了最近的append消息时,那么就会进入探测状态,此时leader会试图继续往前追述该节点的日志从哪里开始丢失的,让该节点的日志能跟leader同步上。在probe状态时,只能向它发送一次append消息,此后除非状态发生变化,否则就暂停向该节点发送新的append消息了。
只有在以下情况才会恢复取消暂停状态(调用Progress的resume函数):
- 收到该节点的心跳消息。
- 该节点成功应答了前面的最后一条append消息。
至于Probe状态,只有在该节点成功应答了Append消息之后,在leader上保存的索引值发生了变化,才会修改其状态切换到Replicate状态。
ProgressStateReplicate
正常接收副本数据的状态,当处于该状态时,leader在发送副本消息之后,就修改该节点的next索引为发送消息的最大索引+1
ProgressStateSnapshot
接收快照状态。 当leader向某个follower发送append消息,试图让该follower状态跟上leader时,发现此时leader上保存的索引数据已经对不上了,比如leader在index为10之前的数据都已经写入快照中了,但是该follower需要的是10之前的数据,此时就会切换到该状态下,发送快照给该follower。
因为快照数据可能很多,不知道会同步多久,所以单独把这个状态抽象出来。
当快照数据同步追上之后,并不是直接切换到Replicate状态,而是首先切换到Probe状态。
Progress上的数据索引
Progress结构体中有两个保存该follower节点日志索引的数据,其中:
- Next:保存下一次leader发送append消息给该follower时的日志索引。
- Match:保存该follower节点上的最大日志索引。
在正常情况下,Next = Match + 1,也就是Next总是节点当前保存最大日志索引的下一条索引。
有两种情况除外:
- 接收快照状态:此时Next = max(pr.Match+1, pendingSnapshot+1)
- 当该follower不在Replicate状态时,说明不是正常的接收副本状态。此时当leader与follower同步leader上的日志时,可能出现覆盖的情况,即此时follower上面假设Match为3,但是索引为3的数据会被leader覆盖,此时Next指针可能会一直回溯到与leader上日志匹配的位置,再开始正常同步日志,此时也会出现Next != Match + 1的情况出现。
如上图所示,节点s1上最大日志索引为2,即Match = 2,Next = 3。 但是,由于新选出来的leader s2,其最大日志索引为3,此时s3需要同步日志到s1上,发现s1上的日志与自己的不匹配,所以会一直找到两者最开始匹配的索引位置,即最终找到索引1,因此会保存s1的Next索引为1,而Match还是2(因为此时还没有修改s1上的日志)。当最终s1上的数据与s2同步时,此时Next = 4,Match=3。
流量控制
Progress结构体中,使用另一个inflights的数据结构用于流量控制。 该结构体使用一个固定大小的循环缓冲区来控制给一个节点同步数据的流量控制,每当给该follower发送同步消息时,就占用该缓冲区的一个空间;反之,当收到该follower的成功接收了该同步消息的应答之后,就释放缓冲区的空间。
当该缓冲区数据饱和时,将暂停继续同步数据到该follower。
##快照、日志的存储