现在已知的 Golang 版本的 RAFT 的开源实现主要有两个:一个是 CoreOS 的 etcd 中的实现,使用的项目有比如 tidb、cockroachdb 等;另外一个是 hashcorp 的 RAFT 实现,使用的项目有比如 consul、InfluxDB 等。
相比而言,前者只实现了一个整体框架,很多的功能需要用户实现,难度增加但是更加灵活;而后者则是完整的实现,WAL、SnapShot、存储、序列化等。
简介
整体来说,该库实现了 RAFT 协议核心内容,如 append log、选主逻辑、snapshot、成员变更等;但该库没有实现消息传输和接收,只会把待发送消息保存在内存中,通过用户自定义的网络传输层取出消息并发送出去,并且在网络接收端,需要调一个库函数,用于将收到的消息传入库。
同时,该库定义了一个 Storage 接口,需要库的使用者自行实现。
示例程序
RAFT 协议的实现主要包括了四部分:协议逻辑、存储、消息序列化和网络传输,而 ETCD 对应的 RAFT 库只实现了最核心算法。
源码可以直接下载 Github coreos/etcd 其中有一个简单的示例 contrib/raftexample,这是一个内存的 KV 存储引擎。
示例中的 wal 和 snapshot 实际上使用的是 ETCD 中的实现。
使用
看下如何在系统中使用。
编译
直接修改编译脚本 build 中的编译命令,然后直接运行 ./build
即可。
go build -o "${out}/raftexample" ${REPO_PATH}/contrib/raftexample || return
测试
可以在一个节点上启动三个服务进程(也可以只启动一个)。
$ raftexample --id 1 --cluster http://127.0.0.1:12379,http://127.0.0.1:22379,http://127.0.0.1:32379 --port 12380
$ raftexample --id 2 --cluster http://127.0.0.1:12379,http://127.0.0.1:22379,http://127.0.0.1:32379 --port 22380
$ raftexample --id 3 --cluster http://127.0.0.1:12379,http://127.0.0.1:22379,http://127.0.0.1:32379 --port 32380
如上,通过参数指定整个集群的节点数,启动后会发起一次选举过程。
为了调试方便,可以通过 [goreman]({{ site.production_url }}/post/golang-introduce.html#goreman) 启动。
发送数据
然后,通过如下方式发送和获取数据。
curl -L http://127.0.0.1:12380/my-key -XPUT -d hello
curl -L http://127.0.0.1:12380/my-key
结点管理
可以通过以下方式将一个新节点加入集群:
$ curl -L http://127.0.0.1:12380/4 -XPOST -d http://127.0.0.1:42379
$ raftexample --id 4 --port 42380 --join \
--cluster http://127.0.0.1:12379,http://127.0.0.1:22379,http://127.0.0.1:32379,http://127.0.0.1:42379
或者删除节点。
$ curl -L http://127.0.0.1:12380/3 -XDELETE
压测
可以通过如下的脚本生成压测的脚本。
#!/bin/sh -e
for ((i=1; i<=150; i ++)); do
uuid=`uuidgen`
echo "curl -L http://127.0.0.1:12380/${uuid} -XPUT -d '${uuid}-hello'"
done
数据结构
简单介绍下常见的数据结构。
KVStore
通过 kvstore 抽象了应用的全部相关内容。
type kvstore struct {
proposeC chan<- string
mu sync.RWMutex
kvStore map[string]string
snapshotter *snap.Snapshotter
}
最核心的成员有:
- proposeC 这是应用和底层 RAFT 库之间的通信管道(Channel),所有对应用的更新请求都会由应用通过该管道向底层 RAFT 库传递;
- kvStore:内存状态机,存储应用的状态数据;
- snapshotter 应用管理 Snapshot 的接口。
RaftNode
该结构用来衔接底层 RAFT 协议以及应用层,通过该结构可以简化 RAFT 底层的细节,降低系统耦合度。该结构需要处理的任务包括:
- 应用的更新请求传递给底层 RAFT 协议;
- RAFT 已提交的请求传输给应用以更新应用的状态机;
- 处理 RAFT 组件产生的指令,如选举指令、数据复制指令、集群节点变更指令等;
- 处理 WAL 日志;
- 将底层 RAFT 组件的指令通过网络传输至集群其它节点等。
相较于应用来说,该结构 (type raftNode struct) 功能的实现更为复杂,其核心数据结构定义如下:
- proposeC 应用将客户的更新请求传递至底层 RAFT 组件的管道;
- commitC 底层 RAFT 组件通知应用层准备提交的通道;
Ready 数据
通过 type Ready struct
定义可以知道,其中保存了多种状态的数据:
- 什么时候可以读。ReadState 用来支持 Linearizable Read。
- 需要持久话的状态。HardState、Entries 需要在正式发送数据之前持久化。
- 需要执行SnapShot的数据。Snapshot 。
- 已经提交的数据,可以应用到状态机。CommittedEntries 。
- 需要发送到其它机器的消息。Messages 需要在处理完持久化数据之后处理。
在 node.run()[raft/node.go]
中,会通过 newReady()
新建 Ready 对象,其中包含了上述的成员内容,那么新建 Ready 对象无非就是如何构建其中的成员变量。
另外,在示例代码 raftNode.serveChannels()
中,可以将 Ready 对象打印出来。
type Ready struct {
*SoftState
pb.HardState // 在发送信息前需要持久化的状态
ReadStates []ReadState
Entries []pb.Entry // 通过raftLog.unstableEntries()读取的是raftLog.unstable.entries中的数据
Snapshot pb.Snapshot
CommittedEntries []pb.Entry // 包括了所有已经持久化到日志但是还没有应用到状态机的数据
// raftLog.nextEnts() raft/log.go 用来获取所有需要提交的日志,用来应用到状态机
Messages []pb.Message // 包含了应该发送给对端的数据,也就是直接读取的raft.msgs[]中缓存的数据
MustSync bool
}
type raftLog struct {
storage Storage // 包含了上次snapshot之后所有持久化的日志
unstable unstable // 未提交的日志,包括snapshot
committed uint64 // 已经在多数节点上持久化的最大日志号
applied uint64 // 在本节点已经应用到状态机的日志号
logger Logger
}
在启动时会将已经写入到 WAL 中的数据写入到 Ready CommittedEntries 。
处理流程
RAFT 协议是一种在多节点组成的集群之间进行状态同步的协议,示例是通过 newRaftNode()
新建一个 raftNode 抽象对象,在创建时需要指定其它节点的 IP(peers)、该节点的 ID(id) 等信息。
main() main.go
|-make(chan string) 新建proposeC管道,用来将用户层数据发送给RAFT协议层
|-make(chan raftpb.ConfChange) 新建confChangeC管道,用来将配置修改信息发送给RAFT协议层
|-newRaftNode() raft.go 返回结构体会作为底层RAFT协议与上层应用的中间结合体
| | 同时会返回commitC errorC管道,用来接收请求和错误信息
| |-raftNode() <<<1>>>新建raftNode对象,重点proposeC
| |-raftNode.startRaft() [协程] 启动示例中的代码
| |-os.Mkdir() 如果snapshot目录不存在则创建
| |-snap.New() snap/snapshotter.go 只是实例化一个对象,并设置其中的dir成员
| |-wal.Exist() wal/util.go 判断WAL日志目录是否存在,用来确定是否是第一次启动
| |-raftNode.replayWAL() raft.go 开始读取WAL日志,并赋值到raftNode.wal中
| | |-raftNode.loadSnapshot()
| | | |-snap.Snapshotter.Load() snap/snapshotter.go 中的Load()函数
| | | |-Snapshotter.snapNames() 会打开目录并遍历目录下的文件,按照文件名进行排序
| | | |-Snapshotter.loadSnap()
| | | |-Snapshotter.Read() 会读取文件并计算CRC校验值,判断文件是否合法
| | |-raftNode.openWAL() 打开snapshot,如果不存在则创建目录,否则加载snapshot中保存的值并尝试加载
| | | |-wal.Open() wal/wal.go 打开指定snap位置的WAL日志,注意snap需要持久化到WAL中才可以
| | | |-wal.openAtIndex() 打开某个snapshot处的日志,并读取之后
| | | |-readWalNames() wal/util.go读取日志目录下的文件,会检查命名格式
| | | |-searchIndex() 查找指定的index序号
| | |-ReadAll() 读取所有日志,真正开始读取WAL
| | |-raft.NewMemoryStorage() 使用ETCD中的内存存储
| | |-raft.NewMemoryStorage() raft/storage.go 新建内存存储
| | |--->>> 从这里开始的三步操作是文档中启动节点前要求的
| | |-MemoryStorage.ApplySnapshot() 这里实际上只更新snapshot和新建ents成员,并未做其它操作
| | |-MemoryStorage.SetHartState() 更新hardState成员
| | |-MemoryStorage.Append() 添加到ents中
| | |-raftNode.lastIndex 更新成员变量
| |
| |-raft.Config{} raft/raft.go 构建RAFT核心的配置项,详细可以查看源码中的定义
| |-raft.RestartNode() raft/node.go 如果已经安装过WAL则直接重启Node,这最常见场景
| | |-raft.newRaft() raft/raft.go
| | | |-raft.becomeFollower() 启动后默认先成为follower 【became follower at term】
| | | | 返回新建对象 【newRaft】
| | |-newNode() raft/node.go 这里只是实例化一个node对象,并未做实际初始化操作
| | |-node.run() 启动一个后台协程开始运行
| |
| |-raft.StartNode() 第一次安装,则重新部署
| |
| |-rafthttp.Transport{} 传输层的配置参数
| |-transport.Start() rafthttp/transport.go 启动HTTP服务
| | |-newStreamRoundTripper() 如下的实现是对http库的封装,保存在pkg/transport目录下
| | | |-NewTimeoutTransport()
| | | |-NewTransport()
| | | |-http.Transport{} 调用http库创建实例
| | |-NewRoundTripper()
| |-transport.AddPeer() rafthttp/transport.go 添加对端服务,如果是三个节点,会添加两个
| | |-startPeer() rafthttp/peer.go 【starting peer】
| | | |-pipeline.start() rafthttp/pipeline.go
| | | | |-pipeline.handle() 这里会启动一个协程处理
| | | |---> 【started HTTP pipelining with peer】
| | | |-peer{} 新建对象
| | | | |-startStreamWriter() 会启动两个streamWriter
| | | | |-streamWriter.run() 启动协程处理 【started streaming with peer (writer)】
| | | | | <<<cw.connc>>>
| | | | |-cw.status.active() 与对端已经建立链接【peer 1 became active】
| | | | |---> 【established a TCP streaming connection with peer (... writer)】
| | | |-streamReader.start() 这里会启动msgAppV2Reader、msgAppReader两个streamReader读取
| | | |-streamReader.run() 启动协程处理,这里是一个循环处理 【started streaming with peer (... reader)】
| | |---> 【started peer】
| |
| |-raftNode.serveRaft() [协程] 主要是启动网络监听
| |-raftNode.serveChannels() [协程] raft.go 真正的业务处理,在协程中监听用户请求、配置等命令
| | |-raftStorage.Snapshot() 获取snapshot
| | |-raft.Node.Propose() 阻塞等待该用户请求被RAFT状态机接受
| | |-raft.Node.ProposeConfChange() 发送配置请求
| |
| |-raft.Node.Tick() 另外的协程处理RAFT组件的同步信息
|
|-newKVStore() kvstore.go 创建内存KV存储结构
| |-kvstore{} 实例化KVStore存储对象
| |-kvstore.readCommits() 会启动一个协程,也是存储的核心,用于读取已经提交的数据
| | 这里实际上调用了两次,第一次是函数内调用,第二次是协程
| |-snapshot.Load() 第一次commitC中没有数据,实际上是加载snapshot
| |-recoverFromSnapshot() 从snapshot中恢复
| | 接下来是协程的处理
| |-gob.NewDecoder() 反序列化
| |-kvStore[] 保存到内存中
|
|-serveHttpKVAPI() 启动对外提供服务的HTTP端口
|-srv.ListenAndServe() 真正启动客户端的监听服务
一般是定时器超时
raft.Step()
| <<<pb.MsgHup>>>
|---> 【is starting a new election at term】
|-raft.campaign()
|-raft.becomeCandidate() 进入到选举状态,也可以是PreCandidate
|-raft.poll() 首先模拟收到消息给自己投票
|-raft.quorum() 因为集群可能是单个节点,这里会检查是否满足条件,如果是
| |-raft.becomeLeader() 如果满足则成为主
|-raft.send() 发送选举请求,消息类型可以是MsgPreVote或者MsgVote
|---> 【sent MsgVote request】
raft.stepCandidate()
|-raft.poll() 【received MsgVoteResp from】
| |-raft.becomeLeader() 如果满足多数派
| | |-raft.appendEntry() 添加一个空日志,记录成为主的事件
| | |---> 【became leader at term】
| |-raft.bcastAppend() 广播发送
| |-raft.sendAppend()
|---> 【has received 2 MsgVoteResp votes and 0 vote rejections】
node.run()
|---> 【raft.node ... elected leader at term ...】
其中 RestartNode()
与 StartNode()
的区别在于,前者从日志文件中读取配置,而后者需要从命令行中传参。
初始化
启动流程主要分为了三步:
- 初始化 RAFT
- 应用初始化
- 应用开启对外服务
应用和 RAFT 组件之间是通过 Channel 进行信息传递。
上述的 HTTP 端口中真正处理请求的函数为 ServeHTTP()
函数,下面会挨个介绍。
新建raftNode对象
在初始化时,为了与 RAFT 内的协议层进行通讯,需要提供 4 个 Channel,分别是:
proposeC := make(chan string)
confChangeC := make(chan raftpb.ConfChange)
commitC := make(chan *string)
errorC := make(chan error)
其中,前两个在创建 raftNode
前创建传入,就是数据的入口;而后两个则是在创建完成后返回,相当于数据的出口。
- proposeC 用于向RAFT协议层提交写请求,也就是 Propose;
- confChangeC 用于向RAFT协议层提交配置变更请求,也就是 ProposeConfChange;
- commitC 把已经提交的entries从RAFT协议层暴露给用户 State Machine;
- errorC 让用户可以及时处理RAFT协议层抛出的错误信息。
写数据
简单来说就是,用户发送一个 PUT 请求,用来写入 KV 内存数据,可以分为如下步骤。
http PUT -1-> kvStore.Propose -2-> proposeC -3-> raft -4-> commitC -5-> map[string] string
HTTP 请求数据调用 kvStore.Propose()
方法把请求数据通过 proposeC
管道发送给 RAFT 协议核心,在 RAFT 协议中经过一系列的操作后再把数据通过 commitC
这个管道暴露出来。
初始化时,会启动一个协程来消费 commitC
这个管道,也就是把已经提交的结果最终写入到内存中的 map[string]stringe
里边去。
处理流程
如上所述,HTTP 真正处理请求是在 ServeHTTP()
函数中,包括了 PUT(增加数据) GET(查看数据) POST(修改配置) DELETE(删除数据) 四种类型的请求处理。
对于客户端的更新请求,首先通过 HTTP 协议传输给应用,目前无法直接处理更新 KVStore,需要先提交至 RAFT 组件在集群内部对本次提交达成一致。也即是,要将这次请求通过 proposeC 管道将请求发送给 raftNode 结构。
ServeHTTP() httpapi.go
|====> PUT方法
|-ioutil.ReadAll() 从HTTP中读取请求
|-kvstore.Propose() kvstore.go 正式提交请求,阻塞直到RAFT状态机提交成功
| |-glob.NewEncoder() 序列化
| |-s.proposeC <- buf.String() 通过proposeC管道发送请求到RAFT核心,会阻塞直到返回
|
|-http.ResponseWriter.WriteHeader() 返回数据结果
|
|====> GET方法
|-kvstore.Lookup() 查找并返回数据
在发送到 proposeC 之后,实际上会开始调用 serveChannels()
启动的协程中,然后会一直阻塞直到该请求返回结果。
接下来就是 RAFT 组件的核心处理部分,也即是提供的 Propose()
API 接口,这里暂时不讨论。
提交数据
也就是第二步,会调用 raftNode 中的 raftNode.node.Propose()
方法把数据交给 raft 核心处理。
// raft/node.go
func (n *node) Propose(ctx context.Context, data []byte) error {
return n.step(ctx, pb.Message{Type: pb.MsgProp, Entries: []pb.Entry{Data: data}})
}
其中的 step
是一个函数指针,根据角色可以是 stepFollower()
、stepCandidate()
、stepLeader()
等不同的函数,当然这些处理都是在 RAFT 核心中完成的。
在启动之后,实际上会在后台运行一个 long running 的协程,也就是 raft/node.go
中的 run()
方法,核心代码的示例如下。
func (n *node) run(r *raft) {
for { // 死循环
if advancec != nil {
readyc = nil
} else {
rd = newReady(r, prevSoftSt, prevHardSt)
if rd.containsUpdates() {
readyc = n.readyc
} else {
readyc = nil
}
}
}
}
消息先进入 r.msgs
被 newReady()
函数取走,用户代码通过消费 Ready() <-chan Ready
来处理各种消息。
func (rc *raftNode) serveChannels() {
// event loop on raft state machine updates
for {
select {
case <-ticker.C:
rc.node.Tick()
// store raft entries to wal, then publish over commit channel
case rd := <-rc.node.Ready():
rc.wal.Save(rd.HardState, rd.Entries) // 保存到持久化存储中
if !raft.IsEmptySnap(rd.Snapshot) {
rc.saveSnap(rd.Snapshot)
rc.raftStorage.ApplySnapshot(rd.Snapshot)
rc.publishSnapshot(rd.Snapshot)
}
rc.raftStorage.Append(rd.Entries)
rc.transport.Send(rd.Messages)
// 通过commitC告诉给下游的用户代码
if ok := rc.publishEntries(rc.entriesToApply(rd.CommittedEntries)); !ok {
rc.stop()
return
}
rc.maybeTriggerSnapshot()
rc.node.Advance() // 处理完成需要主动告诉raft
case err := <-rc.transport.ErrorC:
rc.writeError(err)
return
case <-rc.stopc:
rc.stop()
return
}
}
}
// publishEntries writes committed log entries to commit channel and returns
// whether all entries could be published.
func (rc *raftNode) publishEntries(ents []raftpb.Entry) bool {
for i := range ents {
switch ents[i].Type {
// 正常的HTTP PUT会触发一个EntryNormal请求
case raftpb.EntryNormal:
if len(ents[i].Data) == 0 {
// ignore empty messages
break
}
s := string(ents[i].Data)
select {
case rc.commitC <- &s:
case <-rc.stopc:
return false
}
case raftpb.EntryConfChange:
var cc raftpb.ConfChange
cc.Unmarshal(ents[i].Data)
rc.confState = *rc.node.ApplyConfChange(cc)
switch cc.Type {
case raftpb.ConfChangeAddNode:
if len(cc.Context) > 0 {
rc.transport.AddPeer(types.ID(cc.NodeID), []string{string(cc.Context)})
}
case raftpb.ConfChangeRemoveNode:
if cc.NodeID == uint64(rc.id) {
log.Println("I've been removed from the cluster! Shutting down.")
return false
}
rc.transport.RemovePeer(types.ID(cc.NodeID))
}
}
// after commit, update appliedIndex
rc.appliedIndex = ents[i].Index
// special nil commit to signal replay has finished
if ents[i].Index == rc.lastIndex {
select {
case rc.commitC <- nil:
case <-rc.stopc:
return false
}
}
}
return true
}
RAFT 指令处理
现在客户端的请求通过 proposeC
管道进入了 RAFT 组件,在数据完成同步之后还是通过 Ready()
暴露给应用,然后由应用负责写日志,完成提交,同步给其它的 Follower 节点等。
也就是如何一步步的处理 RAFT 组件内部的指令请求,同样是在 serveChannels()
中启动的协程。
func (rc *raftNode) serveChannels() {
// 上面部分是启动了一个协程处理Propose请求
// event loop on raft state machine updates
for {
select {
case <-ticker.C:
rc.node.Tick()
// 1. 通过Ready()获取到RAFT组件指令
case rd := <-rc.node.Ready():
// 2. 写WAL日志,包含的是当前的状态信息
rc.wal.Save(rd.HardState, rd.Entries)
if !raft.IsEmptySnap(rd.Snapshot) {
rc.saveSnap(rd.Snapshot)
rc.raftStorage.ApplySnapshot(rd.Snapshot)
rc.publishSnapshot(rd.Snapshot)
}
// 3. 这是干什么?
rc.raftStorage.Append(rd.Entries)
// 4. 发送给某个Follower
rc.transport.Send(rd.Messages)
// 5. 将已经commit的日志提交到应用状态机
ok := rc.publishEntries(rc.entriesToApply(rd.CommittedEntries))
if !ok {
rc.stop()
return
}
rc.maybeTriggerSnapshot()
// 6. 通知RAFT组件该请求已经处理完成,可以进行下次的请求了
rc.node.Advance()
case err := <-rc.transport.ErrorC:
rc.writeError(err)
return
case <-rc.stopc:
rc.stop()
return
}
}
}
当 RAFT 组件判定已经复制到了多个节点之后,也就是认为已经提交(Commit),此时 RAFT 组件会通过 commitC 管道将请求返回给应用(KVStore),应用收到请求后将其应用到状态机,也就是内存中的 KV 存储。
简单来说,通过 readCommits()
接收用户发送的请求并发送给 RAFT 组件;在 RAFT 组件处理完成提交后,再发送给 serveChannels()
继续处理,保存到应用的 KV 存储中。
定时器
在 raft/raft.go
中定义了 type Config struct
结构体。
type Config struct {
ID uint64 // 本节点的ID,不能为0
peers []uint64 // 当前集群的所有ID列表,目前仅用来测试
learners []uint64 // 集群中的Learner列表,仅用来接收Leader节点发送的消息,不会进行投票选举
ElectionTick int // 也就是选举的超时时间,单位是Node.Tick;当Follower在当前选举周期内没有
// 收到任何消息时开始变成Candidate开始选举
HeartbeatTick int Leader // 为了维持其当前的角色发起的心跳请求
}
一般来说要满足 ElectionTick >> HeartbeatTick
,以防非必要的主切换,一般为 ElectionTick = 10 * HeartbeatTick
。
定时器创建
对于 ETCD 来说,在 newRaftNode() 函数中,会新建一个 ticker 时钟触发器,用来产生时钟事件。示例中,会在 raftNode.serveChannels()
中初始化定时器。
对于时间间隔,默认是保存在 embed/config.go
中的 cfg.TickMs
,当然,也可以通过命令行的入参 --heartbeat-interval
指定。
NewServer() etcdserver/server.go
|-heartbeat 会设置为cfg.TickMs的值,而该值默认在embed/config.go中初始化为100ms
|-newRaftNode() etcdserver/raft.go 在该函数中会将相应的heartbeat的值传入
|-time.NewTicker() 调用time包中提供的函数实现
接着看下这里的配置是如何生效的。
无论是通过 RestartNode()
还是 StartNode()
,最终都会调用 newRaft()
新建一个 raft 对象,其中会将上述的配置分别赋值给 electionTimeout
和 heartbeatTimeout
。
定时器触发
在 raftNode.start()[etcdserver/raft.go]
中,会等待时钟事件的触发,一次也就是一个 Tick 。
每次 Tick 都需要调用 node.Tick()[raft/node.go]
函数,该函数实际上就是向 tickc 中发送一个空的结构体,用来触发一次心跳事件。
为了防止由于负载过高导致时钟事件丢失,会将管道设置为 128 缓冲。
raftNode.start()
| <<<raftNode.ticker.C>>>
|-node.Tick() 触发tick事件,向tickc中发送一个结构体
node.run() raft/node.go
| <<<node.tickc>>> 触发了心跳事件
|-raft.tick() 这里是一个函数指针,不同的角色调用的函数不同
|=== Leader
|-raft.tickHeartBeat() 对于Leader会调用该函数
| 判断是否要发送心跳信息,如果需要则发送MsgBeat类型的消息
日志管理
在实现时,实际上日志 (WAL) 和 Snapshot 已经糅合到了一起,因此在重新构建状态机时必须要两者合作才可以,那么介绍时同样合到一起。
首先需要加载 snapshot 的最新值,然后根据这个 index 在 WAL 目录下查找之后的日志,并回放这些日志即可。
示例使用了 ETCD 提供的通用日志库来进行日志管理,这里重点看下应用层如何使用提供的 WAL 日志模块,其实现后面再详细描述。
日志追加
为了防止数据丢失,在更新之前会先将日志项追加到日志文件中,也就是如下:
func (rc *raftNode) serveChannels() {
......
for {
select {
case <-ticker.C:
rc.node.Tick()
// 正常更新请求,第一步先追加日志
case rd := <-rc.node.Ready():
rc.wal.Save(rd.HardState, rd.Entries)
......
}
......
}
日志重放
在程序启动时,第一步便是进行日志重放,构建内存状态机。
func (rc *raftNode) replayWAL() *wal.WAL {
snapshot := rc.loadSnapshot()
w := rc.openWAL(snapshot)
_, st, ents, err := w.ReadAll()
if err != nil {
log.Fatalf("raftexample: failed to read WAL (%v)", err)
}
rc.raftStorage = raft.NewMemoryStorage()
if snapshot != nil {
rc.raftStorage.ApplySnapshot(*snapshot)
}
rc.raftStorage.SetHardState(st)
rc.raftStorage.Append(ents)
if len(ents) > 0 {
rc.lastIndex = ents[len(ents)-1].Index
} else {
rc.commitC <- nil
}
return w
}
其它
BugFix
如果直接运行示例会发现日志的格式有所区别。
实际上,在 etcdserver/raft.go
文件中,有定义 init()
函数用于设置默认的 logger,也就是 raft.SetLogger()
的处理。
在 raftexample/raft.go
中增加 init()
函数,然后添加如下内容即可。
func init() {
raft.SetLogger(capnslog.NewPackageLogger("github.com/coreos/etcd", "raft"))
}