在上篇 ETCD 示例源码 中介绍了 ETCD 代码中 RAFT 相关的示例代码,接着介绍与 ETCD 相关的代码。
数据结构
简单介绍下一些常见的数据结构。
type Storage interface
定义了存储的接口。
type Storage interface {
// InitialState returns the saved HardState and ConfState information.
InitialState() (pb.HardState, pb.ConfState, error)
// Entries returns a slice of log entries in the range [lo,hi).
// MaxSize limits the total size of the log entries returned, but
// Entries returns at least one entry if any.
Entries(lo, hi, maxSize uint64) ([]pb.Entry, error)
// Term returns the term of entry i, which must be in the range
// [FirstIndex()-1, LastIndex()]. The term of the entry before
// FirstIndex is retained for matching purposes even though the
// rest of that entry may not be available.
Term(i uint64) (uint64, error)
// LastIndex returns the index of the last entry in the log.
LastIndex() (uint64, error)
// FirstIndex returns the index of the first log entry that is
// possibly available via Entries (older entries have been incorporated
// into the latest Snapshot; if storage only contains the dummy entry the
// first log entry is not available).
FirstIndex() (uint64, error)
// Snapshot returns the most recent snapshot.
// If snapshot is temporarily unavailable, it should return ErrSnapshotTemporarilyUnavailable,
// so raft state machine could know that Storage needs some time to prepare
// snapshot and call Snapshot later.
Snapshot() (pb.Snapshot, error)
}
其中官方提供的 Github Raft Example 中使用的是库自带 MemoryStorage 。
type Ready struct
对于这种 IO 网络密集型的应用,提高吞吐最好的手段就是批量操作,ETCD 与之相关的核心抽象就是 Ready 结构体。
// Ready encapsulates the entries and messages that are ready to read,
// be saved to stable storage, committed or sent to other peers.
// All fields in Ready are read-only.
type Ready struct {
// The current volatile state of a Node.
// SoftState will be nil if there is no update.
// It is not required to consume or store SoftState.
*SoftState
// The current state of a Node to be saved to stable storage BEFORE
// Messages are sent.
// HardState will be equal to empty state if there is no update.
pb.HardState
// ReadStates can be used for node to serve linearizable read requests locally
// when its applied index is greater than the index in ReadState.
// Note that the readState will be returned when raft receives msgReadIndex.
// The returned is only valid for the request that requested to read.
ReadStates []ReadState
// Entries specifies entries to be saved to stable storage BEFORE
// Messages are sent.
Entries []pb.Entry
// Snapshot specifies the snapshot to be saved to stable storage.
Snapshot pb.Snapshot
// CommittedEntries specifies entries to be committed to a
// store/state-machine. These have previously been committed to stable
// store.
CommittedEntries []pb.Entry
// Messages specifies outbound messages to be sent AFTER Entries are
// committed to stable storage.
// If it contains a MsgSnap message, the application MUST report back to raft
// when the snapshot has been received or has failed by calling ReportSnapshot.
Messages []pb.Message
// MustSync indicates whether the HardState and Entries must be synchronously
// written to disk or if an asynchronous write is permissible.
MustSync bool
}
type node struct
在 raft/node.go
中定义了 type node struct
对应的结构,一个 RAFT 结构通过 Node 表示各结点信息,该结构体内定义了各个管道,用于同步信息,下面会逐一遇到。
type node struct {
propc chan pb.Message
recvc chan pb.Message
confc chan pb.ConfChange
confstatec chan pb.ConfState
readyc chan Ready
advancec chan struct{}
tickc chan struct{}
done chan struct{}
stop chan struct{}
status chan chan Status
}
其实现,就是通过这些管道在 RAFT 实现与外部应用之间来传递各种消息。
type raft struct
在 raft/raft.go
中定义了 type raft struct
结构,其中有两个关键函数指针 tick
和 step
,在不同的状态时会调用不同的函数,例如 Follower 中使用 tickElection()
和 stepFollower()
。
type raft struct {
id uint64
Term uint64
Vote uint64
readStates []ReadState
// the log
raftLog *raftLog
maxInflight int
maxMsgSize uint64
prs map[uint64]*Progress
state StateType
votes map[uint64]bool
msgs []pb.Message
// the leader id
lead uint64
// leadTransferee is id of the leader transfer target when its value is not zero.
// Follow the procedure defined in raft thesis 3.10.
leadTransferee uint64
// New configuration is ignored if there exists unapplied configuration.
pendingConf bool
readOnly *readOnly
// number of ticks since it reached last electionTimeout when it is leader
// or candidate.
// number of ticks since it reached last electionTimeout or received a
// valid message from current leader when it is a follower.
electionElapsed int
// number of ticks since it reached last heartbeatTimeout.
// only leader keeps heartbeatElapsed.
heartbeatElapsed int
checkQuorum bool
preVote bool
heartbeatTimeout int
electionTimeout int
// randomizedElectionTimeout is a random number between
// [electiontimeout, 2 * electiontimeout - 1]. It gets reset
// when raft changes its state to follower or candidate.
randomizedElectionTimeout int
tick func() // 两个重要的函数指针
step stepFunc
logger Logger
}
Node 代表了 etcd 中一个节点,是 RAFT 协议核心部分实现的代码,而在 EtcdServer 的应用层与之对应的是 raftNode ,两者一对一,raftNode 中有匿名嵌入了 node 。
整体架构
Etcd 服务端主要由几大组件构成,各部分介绍如下:
EtcdServer[etcdserver/server.go]
主进程,直接或者间接包含了 raftNode、WAL、snapshotter 等多个核心组件,可以理解为一个容器。raftNode[etcdserver/raft.go]
对内部 RAFT 协议实现的封装,暴露简单的接口,用来保证写事务的集群一致性。
处理流程
这里的采用的是异步状态机,基于 GoLang 的 Channel 机制,RAFT 状态机作为一个 Background Thread/Routine 运行,会通过 Channel 接收上层传来的消息,状态机处理完成之后,再通过 Ready() 接口返回给上层。
其中 type Ready struct
结构体封装了一批更新操作,包括了:
pb.HardState
需要在发送消息前持久化的消息,包含当前节点见过的最大的 term,在这个 term 给谁投过票,已经当前节点知道的 commit index;Messages
需要广播给所有 peers 的消息;CommittedEntries
已经提交但是还没有apply到状态机的日志;Snapshot
需要持久化的快照。
库的使用者从 type node struct
结构体提供的 ready channel 中不断 pop 出一个个 Ready 进行处理,库使用者通过如下方法拿到 Ready channel 。
func (n *node) Ready() <-chan Ready { return n.readyc }
应用需要对 Ready 的处理包括:
- 将 HardState、Entries、Snapshot 持久化到 storage;
- 将 Messages 非阻塞的广播给其他 peers;
- 将 CommittedEntries (已经提交但是还没有应用的日志) 应用到状态机;
- 如果发现 CommittedEntries 中有成员变更类型的 entry,则调用 node 的
ApplyConfChange()
方法让 node 知道; - 调用
Node.Advance()
告诉 raft node 这批状态更新处理完,状态已经演进了,可以给我下一批 Ready 让我处理了。
注意,上述的第 4 部分和 RAFT 论文中的内容有所区别,论文中只要节点收到了成员变更日志就应用,而这里实际需要等到日志提交之后才会应用。
启动流程
ETCD 服务器是通过 EtcdServer 结构抽象,对应了 etcdserver/server.go
中的代码,包含属性 r raftNode
,代表 RAFT 集群中的一个节点,启动入口在 etcdmain/main.go
文件中。
main() etcdmain/main.go
|-checkSupportArch()
|-startEtcdOrProxyV2() etcdmain/etcd.go
|-newConfig()
|-setupLogging()
|-startEtcd()
| |-embed.StartEtcd() embed/etcd.go
| |-startPeerListeners()
| |-startClientListeners()
| |-EtcdServer.ServerConfig() 生成新的配置
| |-EtcdServer.NewServer() etcdserver/server.go正式启动RAFT服务<<<1>>>
| |-EtcdServer.Start() 开始启动服务
| | |-EtcdServer.start()
| | |-wait.New() 新建WaitGroup组以及一些管道服务
| | |-EtcdServer.run() etcdserver/raft.go 启动应用层的处理协程<<<2>>>
| |-Etcd.servePeers() 启动集群内部通讯
| | |-etcdhttp.NewPeerHandler() 启动http服务
| | |-v3rpc.Server() 启动gRPC服务 api/v3rpc/grpc.go
| | |-grpc.NewServer() 调用gRPC的接口创建
| | |-pb.RegisterKVServer() 注册各种的服务,这里包含了多个
| | |-pb.RegisterWatchServer()
| |-Etcd.serveClients() 启动协程处理客户请求
| |-Etcd.serveMetrics()
|-notifySystemd()
|-select() 等待stopped
|-osutil.Exit()
在标记 1 处会启动 RAFT 协议的核心部分,也就是 node.run()[raft/node.go]
。
在标记 2 处启动的是 ETCD 应用层的处理协程,对应了 raftNode.start()[etcdserver/raft.go]
。
这里基本上是大致的启动流程,主要是解析参数,设置日志,启动监听端口等,接下来就是其核心部分 etcdserver.NewServer()
。
启动RAFT
应用通过 raft.StartNode()
来启动 raft 中的一个副本,函数内部会通过启动一个 goroutine 运行。
NewServer() etcdserver/server.go 通过配置创建一个新的EtcdServer对象,不同场景不同
|-store.New()
|-wal.Exist()
|-restartNode() etcdserver/raft.go 已有WAL,直接根据SnapShot启动,最常见场景
| |-readWAL() 读取WAL
| |-NewCluster() 每个会对应一个新的集群配置
| |-raft.RestartNode() raft/node.go 真正做重启节点的函数
| |-newRaft() raft/raft.go 新建一个type raft struct对象
| | |-raft.becomeFollower() 成为Follower状态
| |-newNode() raft/node.go 新建一个type node struct对象
| |-node.run() raft/node.go RAFT协议运行的核心函数,会单独启动一个协程<<<1>>>
|-NewAuthStore()
| <====会根据不同的启动场景执行相关任务
|-startNode() 新建一个节点,前提是没有WAL日志,且是新配置结点 etcdserver/raft.go
| |-raft.NewMemoryStorage()
| |-raft.StartNode() 启动一个节点raft/node.go,开始node的处理过程<<<start>>>
| |-newRaft() 创建RAFT对象raft/raft.go
| |-raft.becomeFollower() 这里会对关键对象初始化以及赋值,包括step=stepFollower r.tick=r.tickElection函数
| | |-raft.reset() 开始启动时设置term为1
| | | |-raft.resetRandomizedElectionTimeout() 更新选举的随机超时时间
| |-raftLog.append() 将配置更新日志添加
| |-raft.addNode()
| |-newNode() 新建节点
| |-node.run() raft/node.go 节点运行,会启动一个协程运行 <<<long running>>>
| |-newReady() 新建type Ready对象
| |-raft.tick() 等待n.tickc管道,这里实际就是在上面赋值的tickElection()函数
|
|-time.NewTicker() 在通过&EtcdServer{}创建时新建tick时钟 etcdserver/server.go
启动的后台程序也就是 node.run()
。
客户端发送请求
这里是通过 clientv3
发送数据,该端口使用的是 gRPC 通讯,关于客户端的使用方式,可以参考代码 clientv3 目录下的 example 示例,例如 example_kv_test.go
。
clientv3.New() clientv3/client.go
|-newClient()
| |-Client{} 示例化Client对象
| |-newHealthBalancer() etcd实现的负载均衡策略
| |-Client.dial() 开始建立链接
| | |-Client.dialSetupOpts()
| | |-grpc.DialContext() 真正建立链接
| |-NewCluster() 新建集群配置
| |-NewKV() 其入参是上述创建的Client
| |-RetryKVClient() 新建KV对象时指定了remote参数<<<1>>>
| |-NewKVClient() 调用proto生成的函数接口建立链接
|-client.autoSync() 单独协程开启自动重连
cli.Put() 实际上对应了kv.go中的实现
|-kv.Put() kv.go 中的实现
|-kv.Do() 调用该函数实现,统一实现接口,根据类型调用不同的接口
| <<<tPut>>>
|-pb.PutRequest{} 构造proto中指定的请求
| |-kv.remote.Put() 在如上新建客户端时,将kv.remote设置为了RetryKVClient()返回值
|=retryKVClient.Put() 上述调用实际上就是该函数
|-rkv.kc.Put() 最终的gRPC调用接口,发送请求并处理返回值
上述的最终调用,在外层会封装一个 retryf()
函数,也就是如果有异常会直接重试。
服务端处理请求
服务器 RPC 接口的定义在 etcdserver/etcdserverpb/rpc.proto
文件中,对应了 service KV
中的定义,而真正的启动对应了 api/v3rpc/grpc.go
中的实现。
以 KV 存储为例,其对应了 NewQuotaKVServer()
中的实现,这里实际上是封装了一层,用来检查是否有足够的空间。
Put
例如,对于 Put 请求,对应了该函数中的实现。
quotaKVServer.Put() api/v3rpc/quota.go 首先检查是否满足需求
|-quotoAlarm.check() 检查
|-KVServer.Put() api/v3rpc/key.go 真正的处理请求
|-checkPutRequest() 校验请求参数是否合法
|-RaftKV.Put() etcdserver/v3_server.go 处理请求
|=EtcdServer.Put() 实际调用的是该函数
| |-raftRequest()
| |-raftRequestOnce()
| |-processInternalRaftRequestOnce() 真正开始处理请求
| |-context.WithTimeout() 创建超时的上下文信息
| |-raftNode.Propose() raft/node.go
| |-raftNode.step() 对于类型为MsgProp类型消息,向propc通道中传入数据
|-header.fill() etcdserver/api/v3rpc/header.go填充响应的头部信息
此时,实际上已经将添加记录的请求发送到了 RAFT 协议的核心层处理。
Range
没有操作单个 key 的方法,即使是读取单个 key,也是需要使用 Range 方法。
上述的 quota 检查实际上只针对了 Put Txn 操作,其它的请求,例如 Range 实际上会直接调用 api/v3rpc/key.go
中的实现。
kvServer.Range() api/v3rpc/key.go
|-checkRangeRequest()
|-RaftKV.Range()
|-header.fill()
日志复制
在 RAFT 协议中,整个集群所有变更都必须通过 Leader 发起,如上其入口为 node.Propose()
。
func (n *node) Propose(ctx context.Context, data []byte) error {
return n.step(ctx, pb.Message{Type: pb.MsgProp, Entries: []pb.Entry{Data: data}})
}
这里消息类型是 pb.MsgProp
,对于 leader 来说,实际上调用的是 stepLeader()
函数。
case pb.MsgProp:
r.appendEntry(m.Entries...)
r.bcastAppend()
return
RAFT 核心处理
状态机简介
在 RAFT 协议实现的代码中,node[raft/node.go]
是其核心的实现,也是整个分布式算法的核心所在。
另外,通过 raftNode[etcdserver/raft.go]
对 node 进一步封装,只对 EtcdServer 暴露了 startNode()
、start()
、apply()
、processMessages()
等少数几个接口。
其中核心部分是通过 start()
方法启动的一个协程,这里会等待从 readyc 通道上报的数据。
状态机处理
如上,在添加数据时,已经添加到了 propc 管道中,此时会触发 node.run()[raft/node.go]
中协程。
node.run() raft/node.go 单独的协程调用
|-newReady() 获取已经就绪的数据,也就是msgs []pb.Message中的数据,保存到了rd.Messages中
|-Ready.containsUpdates() 判断是否有更新,以决定是否将数据发送到readyc的管道中
|-hasLeader() 如果leader已经变化,那么需要获取最新的propc管道
| 等待propc获取数据
|-raft.Step() raft/raft.go
|-raft.step() 这里是一个函数指针,不同状态调用函数有所区别
|=stepLeader() 对于Leader来说,也就是同步到其它节点
| |
| | <<<pb.MsgProp>>>
| |-raft.appendEntry() 将日志添加到raftlog的unstable entry中,等待commit变成stable entry
| | | 放到storage中,最终变成snapshot
| | |-raftLog.lastIndex() raft/log.go 日志最新的ID,并对每个消息赋值ID
| | |-raftLog.append() 将新的entry加到unstable entry中
| | | |-unstable.truncateAndAppend()
| | |-raft.getProgress().maybeUpdate() 这里更新了leader自己的Match
| | |-raft.maybeCommit() 只增加了自己的commit,未收到其它节点的返回消息,此时不会更新commit index
| | |-raftLog.maybeCommit() 会读取raft.prs中的内容,也就是Progress
| | |-raftLog.commitTo() 修改commitIndex
| |
| |-raft.bcastAppend() 将entry广播到其它的节点,也就是日志复制
| | |-raft.sendAppend() 构造pb.MsgApp类型的消息结构体开始发送
| | |-raft.send()
| | |-append() 添加到msgs []pb.Message中,这里相当于一个发送的缓冲区
| |
| | <<<pb.MsgAppResp>>>
| |-maybeUpdate() 从其它节点获取到的响应消息,更新本地计数
| |-raft.maybeCommit() 判断是否提交成功,如果更新成功则广播
| |-raft.bcastAppend()
|
|=stepFollower() 对于Follower来说
|
| <<<pb.MsgProp>>>
|-raft.send() 直接添加到msgs []pb.Message数组中并转发给Leader
|
| <<<pb.MsgApp>>>
|-raft.handleAppendEntries()
|-raft.send()
注意,这里在处理时,readyc 和 advancec 只有一个是有效值。
if advancec != nil { /* 如果 advance 不空,则把 readyc 置空 */
readyc = nil
} else { /* 每次循环都会创建一个新的ready对象,其中包含了数据msgs */
rd = newReady(r, prevSoftSt, prevHardSt)
if rd.containsUpdates() { /* 如果raft.msgs中队列大小不为0,表示有数据发出 */
readyc = n.readyc
} else {
readyc = nil
}
}
if lead != r.lead {
if r.hasLeader() {//当前raft节点r中lead不为空,表示已经存在leader
if lead == None {
r.logger.Infof("raft.node: %x elected leader %x at term %d", r.id, r.lead, r.Term)
} else {
r.logger.Infof("raft.node: %x changed leader from %x to %x at term %d",
r.id, lead, r.lead, r.Term)
}
propc = n.propc
} else {
r.logger.Infof("raft.node: %x lost leader %x at term %d", r.id, lead, r.Term)
propc = nil
}
lead = r.lead
}
在下个循环中,会通过 newReady()
读取数据,也就是 msgs []pb.Message
中的数据,并发送到 readyc 管道中。接着会触发消息的发送,也就是 raftNode.start()[etcdserver/raft.go]
中的处理。
raftNode.start() etcdserver/raft.go 单独协程处理,包括发送消息
|
| <<<readyc>>> 这里会等待raft/node.go中node.Ready()返回的管道
|-rd := <- r.Ready() 阻塞等待readyc管道中的消息,包括上述提交的数据
|-apply{} 构造apply对象,其中包括了已经提交的日志,SnapShot等
|-updateCommittedIndex()
| |-raftReadyHandler.updateCommittedIndex()
| applyc<-ap 添加到管道中,并等待提交完成
|-transport.Send() 将数据,真正发送到对端
|
|-raftNode.processMessages() 会根据不同类型的消息进行一些异常的处理
|-transport.Send() rafthttp/transport.go 发送请求消息
|-storage.Save() 这里同时会对日志以及SnapShot进行持久化处理
在消息通过 append(r.msgs, m)
添加到了发送缓冲区中之后,接着就是如何通过网络层发送数据。
在搜索 r.msgs
是,实际用的只有在 newReady()
函数中,也就是上述的处理协程中,对应了 node.run()
函数,此时会发送到 readyc 管道中。
其中,raft/node.go 中有如下的实现。
func (n *node) Ready() <-chan Ready { return n.readyc }
也就是是说,实际处理 readyc 请求是在 raftNode.start()[etcdserver/raft.go]
中。
消息发送
一般在 raft/raft.go
文件中,会通过 r.send()
发送,也就是 raft.send()
发送消息时,例如,如下是处于 Follower 状态时的处理函数 stepFollower()
。
func stepFollower(r *raft, m pb.Message) {
switch m.Type {
case pb.MsgProp:
if r.lead == None {
r.logger.Infof("%x no leader at term %d; dropping proposal", r.id, r.Term)
return
} else if r.disableProposalForwarding {
r.logger.Infof("%x not forwarding to leader %x at term %d; dropping proposal",
r.id, r.lead, r.Term)
return
}
m.To = r.lead
r.send(m)
// ... ...
}
}
在同一个文件中,最终会调用 append(r.msgs, m)
,那么这个消息是在什么时候消费的呢?
在 type node struct
结构体中,存在一个 readyc 的管道。
type node struct {
readyc chan Ready
}
在 raft/node.go
中存在一个 node.run()
函数,会读取所有的消息,然后同时通过管道发送。
func newReady(r *raft, prevSoftSt *SoftState, prevHardSt pb.HardState) Ready {
rd := Ready{
Entries: r.raftLog.unstableEntries(),
CommittedEntries: r.raftLog.nextEnts(),
Messages: r.msgs,
}
... ...
}
也就是说,在 node.go
里 node.run()
中构建了 Ready 对象,对象里就包涵被赋值的 msgs,并最终写到 node.readyc
这个管道里,如下是对应这个 case 的实现:
case readyc <- rd:
r.msgs = nil
r.readStates = nil
advancec = n.advancec
这里的 msgs 已经读取过并写入到了管道中,直接设置为空,并会赋值 advancec,在 etcdserver/raft.go
的 raftNode.start()
中,会起一个单独的协程读取数据;其中读取的函数实现在 raft/node.go 中:
func (n *node) Ready() <-chan Ready { return n.readyc }
应用层 (也就是etcd) 读取到的 Ready 里面包含了 Vote 消息,会调用网络层发送消息出去,并且调用 Advance() 。
在 raft/node.go->run()
函数中,是一个节点 (Node) 的主要处理过程,开始处于 Follower 状态,然后随着 case <-n.tickc
进行,开始进入选举。
Progress
RAFT 实现的内部,本身还维护了一个子状态机。
+--------------------------------------------------------+
| send snapshot |
| |
+---------+----------+ +----------v---------+
+---> probe | | snapshot |
| | max inflight = 1 <----------------------------------+ max inflight = 0 |
| +---------+----------+ +--------------------+
| | 1. snapshot success
| | (next=snapshot.index + 1)
| | 2. snapshot failure
| | (no change)
| | 3. receives msgAppResp(rej=false&&index>lastsnap.index)
| | (match=m.index,next=match+1)
receives msgAppResp(rej=true)
(next=match+1)| |
| |
| |
| | receives msgAppResp(rej=false&&index>match)
| | (match=m.index,next=match+1)
| |
| |
| |
| +---------v----------+
| | replicate |
+---+ max inflight = n |
+--------------------+
详细可以查看 raft/design.md 中的介绍,对于 Progress 实际上就是 Leader 维护的各个 Follower 的状态信息,总共分为三种状态:probe, replicate, snapshot 。
应该是 AppendEntries 接口的一种实现方式,为每个节点维护两个 Index 信息:A) matchIndex 已知服务器的最新 Index,如果还未确定则是 0; B) nextIndex 用来标示需要从那个索引开始复制。那么 Leader 实际上就是将 nextIndex 到最新的日志复制到 Follower 节点。
参考
两种不同的实现方式 Github CoreOS-etcd、Github Hashicorp-raft 。