ETCD 源码解析

2017-12-15 linux golang

在上篇 ETCD 示例源码 中介绍了 ETCD 代码中 RAFT 相关的示例代码,接着介绍与 ETCD 相关的代码。

数据结构

简单介绍下一些常见的数据结构。

type Storage interface

定义了存储的接口。

type Storage interface {
    // InitialState returns the saved HardState and ConfState information.
    InitialState() (pb.HardState, pb.ConfState, error)
    // Entries returns a slice of log entries in the range [lo,hi).
    // MaxSize limits the total size of the log entries returned, but
    // Entries returns at least one entry if any.
    Entries(lo, hi, maxSize uint64) ([]pb.Entry, error)
    // Term returns the term of entry i, which must be in the range
    // [FirstIndex()-1, LastIndex()]. The term of the entry before
    // FirstIndex is retained for matching purposes even though the
    // rest of that entry may not be available.
    Term(i uint64) (uint64, error)
    // LastIndex returns the index of the last entry in the log.
    LastIndex() (uint64, error)
    // FirstIndex returns the index of the first log entry that is
    // possibly available via Entries (older entries have been incorporated
    // into the latest Snapshot; if storage only contains the dummy entry the
    // first log entry is not available).
    FirstIndex() (uint64, error)
    // Snapshot returns the most recent snapshot.
    // If snapshot is temporarily unavailable, it should return ErrSnapshotTemporarilyUnavailable,
    // so raft state machine could know that Storage needs some time to prepare
    // snapshot and call Snapshot later.
    Snapshot() (pb.Snapshot, error)
}

其中官方提供的 Github Raft Example 中使用的是库自带 MemoryStorage 。

type Ready struct

对于这种 IO 网络密集型的应用,提高吞吐最好的手段就是批量操作,ETCD 与之相关的核心抽象就是 Ready 结构体。

// Ready encapsulates the entries and messages that are ready to read,
// be saved to stable storage, committed or sent to other peers.
// All fields in Ready are read-only.
type Ready struct {
    // The current volatile state of a Node.
    // SoftState will be nil if there is no update.
    // It is not required to consume or store SoftState.
    *SoftState

    // The current state of a Node to be saved to stable storage BEFORE
    // Messages are sent.
    // HardState will be equal to empty state if there is no update.
    pb.HardState

    // ReadStates can be used for node to serve linearizable read requests locally
    // when its applied index is greater than the index in ReadState.
    // Note that the readState will be returned when raft receives msgReadIndex.
    // The returned is only valid for the request that requested to read.
    ReadStates []ReadState

    // Entries specifies entries to be saved to stable storage BEFORE
    // Messages are sent.
    Entries []pb.Entry

    // Snapshot specifies the snapshot to be saved to stable storage.
    Snapshot pb.Snapshot

    // CommittedEntries specifies entries to be committed to a
    // store/state-machine. These have previously been committed to stable
    // store.
    CommittedEntries []pb.Entry

    // Messages specifies outbound messages to be sent AFTER Entries are
    // committed to stable storage.
    // If it contains a MsgSnap message, the application MUST report back to raft
    // when the snapshot has been received or has failed by calling ReportSnapshot.
    Messages []pb.Message

    // MustSync indicates whether the HardState and Entries must be synchronously
    // written to disk or if an asynchronous write is permissible.
    MustSync bool
}

type node struct

raft/node.go 中定义了 type node struct 对应的结构,一个 RAFT 结构通过 Node 表示各结点信息,该结构体内定义了各个管道,用于同步信息,下面会逐一遇到。

type node struct {
    propc      chan pb.Message
    recvc      chan pb.Message
    confc      chan pb.ConfChange
    confstatec chan pb.ConfState
    readyc     chan Ready
    advancec   chan struct{}
    tickc      chan struct{}
    done       chan struct{}
    stop       chan struct{}
    status     chan chan Status
}

其实现,就是通过这些管道在 RAFT 实现与外部应用之间来传递各种消息。

type raft struct

raft/raft.go 中定义了 type raft struct 结构,其中有两个关键函数指针 tickstep,在不同的状态时会调用不同的函数,例如 Follower 中使用 tickElection()stepFollower()

type raft struct {
	id uint64

	Term uint64
	Vote uint64

	readStates []ReadState

	// the log
	raftLog *raftLog

	maxInflight int
	maxMsgSize  uint64
	prs         map[uint64]*Progress

	state StateType

	votes map[uint64]bool

	msgs []pb.Message

	// the leader id
	lead uint64
	// leadTransferee is id of the leader transfer target when its value is not zero.
	// Follow the procedure defined in raft thesis 3.10.
	leadTransferee uint64
	// New configuration is ignored if there exists unapplied configuration.
	pendingConf bool

	readOnly *readOnly

	// number of ticks since it reached last electionTimeout when it is leader
	// or candidate.
	// number of ticks since it reached last electionTimeout or received a
	// valid message from current leader when it is a follower.
	electionElapsed int

	// number of ticks since it reached last heartbeatTimeout.
	// only leader keeps heartbeatElapsed.
	heartbeatElapsed int

	checkQuorum bool
	preVote     bool

	heartbeatTimeout int
	electionTimeout  int
	// randomizedElectionTimeout is a random number between
	// [electiontimeout, 2 * electiontimeout - 1]. It gets reset
	// when raft changes its state to follower or candidate.
	randomizedElectionTimeout int

	tick func()          // 两个重要的函数指针
	step stepFunc

	logger Logger
}

Node 代表了 etcd 中一个节点,是 RAFT 协议核心部分实现的代码,而在 EtcdServer 的应用层与之对应的是 raftNode ,两者一对一,raftNode 中有匿名嵌入了 node 。

整体架构

Etcd 服务端主要由几大组件构成,各部分介绍如下:

  • EtcdServer[etcdserver/server.go] 主进程,直接或者间接包含了 raftNode、WAL、snapshotter 等多个核心组件,可以理解为一个容器。
  • raftNode[etcdserver/raft.go] 对内部 RAFT 协议实现的封装,暴露简单的接口,用来保证写事务的集群一致性。

处理流程

这里的采用的是异步状态机,基于 GoLang 的 Channel 机制,RAFT 状态机作为一个 Background Thread/Routine 运行,会通过 Channel 接收上层传来的消息,状态机处理完成之后,再通过 Ready() 接口返回给上层。

其中 type Ready struct 结构体封装了一批更新操作,包括了:

  • pb.HardState 需要在发送消息前持久化的消息,包含当前节点见过的最大的 term,在这个 term 给谁投过票,已经当前节点知道的 commit index;
  • Messages 需要广播给所有 peers 的消息;
  • CommittedEntries 已经提交但是还没有apply到状态机的日志;
  • Snapshot 需要持久化的快照。

库的使用者从 type node struct 结构体提供的 ready channel 中不断 pop 出一个个 Ready 进行处理,库使用者通过如下方法拿到 Ready channel 。

func (n *node) Ready() <-chan Ready { return n.readyc }

应用需要对 Ready 的处理包括:

  1. 将 HardState、Entries、Snapshot 持久化到 storage;
  2. 将 Messages 非阻塞的广播给其他 peers;
  3. 将 CommittedEntries (已经提交但是还没有应用的日志) 应用到状态机;
  4. 如果发现 CommittedEntries 中有成员变更类型的 entry,则调用 node 的 ApplyConfChange() 方法让 node 知道;
  5. 调用 Node.Advance() 告诉 raft node 这批状态更新处理完,状态已经演进了,可以给我下一批 Ready 让我处理了。

注意,上述的第 4 部分和 RAFT 论文中的内容有所区别,论文中只要节点收到了成员变更日志就应用,而这里实际需要等到日志提交之后才会应用。

启动流程

ETCD 服务器是通过 EtcdServer 结构抽象,对应了 etcdserver/server.go 中的代码,包含属性 r raftNode,代表 RAFT 集群中的一个节点,启动入口在 etcdmain/main.go 文件中。

main()                                etcdmain/main.go
 |-checkSupportArch()
 |-startEtcdOrProxyV2()               etcdmain/etcd.go
   |-newConfig()
   |-setupLogging()
   |-startEtcd()
   | |-embed.StartEtcd()              embed/etcd.go
   |   |-startPeerListeners()
   |   |-startClientListeners()
   |   |-EtcdServer.ServerConfig()    生成新的配置
   |   |-EtcdServer.NewServer()       etcdserver/server.go正式启动RAFT服务<<<1>>>
   |   |-EtcdServer.Start()           开始启动服务
   |   | |-EtcdServer.start()
   |   |   |-wait.New()               新建WaitGroup组以及一些管道服务
   |   |   |-EtcdServer.run()         etcdserver/raft.go 启动应用层的处理协程<<<2>>>
   |   |-Etcd.servePeers()            启动集群内部通讯
   |   | |-etcdhttp.NewPeerHandler()  启动http服务
   |   | |-v3rpc.Server()             启动gRPC服务 api/v3rpc/grpc.go
   |   |   |-grpc.NewServer()         调用gRPC的接口创建
   |   |   |-pb.RegisterKVServer()    注册各种的服务,这里包含了多个
   |   |   |-pb.RegisterWatchServer()
   |   |-Etcd.serveClients()          启动协程处理客户请求
   |   |-Etcd.serveMetrics()
   |-notifySystemd()
   |-select()                         等待stopped
   |-osutil.Exit()

在标记 1 处会启动 RAFT 协议的核心部分,也就是 node.run()[raft/node.go]

在标记 2 处启动的是 ETCD 应用层的处理协程,对应了 raftNode.start()[etcdserver/raft.go]

这里基本上是大致的启动流程,主要是解析参数,设置日志,启动监听端口等,接下来就是其核心部分 etcdserver.NewServer()

启动RAFT

应用通过 raft.StartNode() 来启动 raft 中的一个副本,函数内部会通过启动一个 goroutine 运行。

NewServer()                           etcdserver/server.go 通过配置创建一个新的EtcdServer对象,不同场景不同
 |-store.New()
 |-wal.Exist()
 |-restartNode()                      etcdserver/raft.go 已有WAL,直接根据SnapShot启动,最常见场景
 | |-readWAL()                        读取WAL
 | |-NewCluster()                     每个会对应一个新的集群配置
 | |-raft.RestartNode()               raft/node.go 真正做重启节点的函数
 |   |-newRaft()                      raft/raft.go 新建一个type raft struct对象
 |   | |-raft.becomeFollower()        成为Follower状态
 |   |-newNode()                      raft/node.go 新建一个type node struct对象
 |   |-node.run()                     raft/node.go RAFT协议运行的核心函数,会单独启动一个协程<<<1>>>
 |-NewAuthStore()
 |                                    <====会根据不同的启动场景执行相关任务
 |-startNode()                        新建一个节点,前提是没有WAL日志,且是新配置结点 etcdserver/raft.go
 | |-raft.NewMemoryStorage()
 | |-raft.StartNode()                 启动一个节点raft/node.go,开始node的处理过程<<<start>>>
 |   |-newRaft()                      创建RAFT对象raft/raft.go
 |   |-raft.becomeFollower()          这里会对关键对象初始化以及赋值,包括step=stepFollower r.tick=r.tickElection函数
 |   | |-raft.reset()                 开始启动时设置term为1
 |   | | |-raft.resetRandomizedElectionTimeout() 更新选举的随机超时时间
 |   |-raftLog.append()               将配置更新日志添加
 |   |-raft.addNode()
 |   |-newNode()                      新建节点
 |   |-node.run()                     raft/node.go 节点运行,会启动一个协程运行 <<<long running>>>
 |     |-newReady()                   新建type Ready对象
 |     |-raft.tick()                  等待n.tickc管道,这里实际就是在上面赋值的tickElection()函数
 |
 |-time.NewTicker()                   在通过&EtcdServer{}创建时新建tick时钟 etcdserver/server.go

启动的后台程序也就是 node.run()

客户端发送请求

这里是通过 clientv3 发送数据,该端口使用的是 gRPC 通讯,关于客户端的使用方式,可以参考代码 clientv3 目录下的 example 示例,例如 example_kv_test.go

clientv3.New()                 clientv3/client.go
 |-newClient()
 | |-Client{}                  示例化Client对象
 | |-newHealthBalancer()       etcd实现的负载均衡策略
 | |-Client.dial()             开始建立链接
 | | |-Client.dialSetupOpts()
 | | |-grpc.DialContext()      真正建立链接
 | |-NewCluster()              新建集群配置
 | |-NewKV()                   其入参是上述创建的Client
 |   |-RetryKVClient()         新建KV对象时指定了remote参数<<<1>>>
 |     |-NewKVClient()         调用proto生成的函数接口建立链接
 |-client.autoSync()           单独协程开启自动重连

cli.Put()                      实际上对应了kv.go中的实现
 |-kv.Put()                    kv.go 中的实现
   |-kv.Do()                   调用该函数实现,统一实现接口,根据类型调用不同的接口
     | <<<tPut>>>
     |-pb.PutRequest{}         构造proto中指定的请求
     | |-kv.remote.Put()       在如上新建客户端时,将kv.remote设置为了RetryKVClient()返回值
     |=retryKVClient.Put()     上述调用实际上就是该函数
       |-rkv.kc.Put()          最终的gRPC调用接口,发送请求并处理返回值

上述的最终调用,在外层会封装一个 retryf() 函数,也就是如果有异常会直接重试。

服务端处理请求

服务器 RPC 接口的定义在 etcdserver/etcdserverpb/rpc.proto 文件中,对应了 service KV 中的定义,而真正的启动对应了 api/v3rpc/grpc.go 中的实现。

以 KV 存储为例,其对应了 NewQuotaKVServer() 中的实现,这里实际上是封装了一层,用来检查是否有足够的空间。

Put

例如,对于 Put 请求,对应了该函数中的实现。

quotaKVServer.Put() api/v3rpc/quota.go 首先检查是否满足需求
 |-quotoAlarm.check() 检查
 |-KVServer.Put() api/v3rpc/key.go 真正的处理请求
   |-checkPutRequest() 校验请求参数是否合法
   |-RaftKV.Put() etcdserver/v3_server.go 处理请求
   |=EtcdServer.Put() 实际调用的是该函数
   | |-raftRequest()
   |   |-raftRequestOnce()
   |     |-processInternalRaftRequestOnce() 真正开始处理请求
   |       |-context.WithTimeout() 创建超时的上下文信息
   |       |-raftNode.Propose() raft/node.go
   |         |-raftNode.step() 对于类型为MsgProp类型消息,向propc通道中传入数据
   |-header.fill() etcdserver/api/v3rpc/header.go填充响应的头部信息

此时,实际上已经将添加记录的请求发送到了 RAFT 协议的核心层处理。

Range

没有操作单个 key 的方法,即使是读取单个 key,也是需要使用 Range 方法。

上述的 quota 检查实际上只针对了 Put Txn 操作,其它的请求,例如 Range 实际上会直接调用 api/v3rpc/key.go 中的实现。

kvServer.Range() api/v3rpc/key.go
 |-checkRangeRequest()
 |-RaftKV.Range()
 |-header.fill()

日志复制

在 RAFT 协议中,整个集群所有变更都必须通过 Leader 发起,如上其入口为 node.Propose()

func (n *node) Propose(ctx context.Context, data []byte) error {
	return n.step(ctx, pb.Message{Type: pb.MsgProp, Entries: []pb.Entry{Data: data}})
}

这里消息类型是 pb.MsgProp ,对于 leader 来说,实际上调用的是 stepLeader() 函数。

case pb.MsgProp:
	r.appendEntry(m.Entries...)
	r.bcastAppend()
	return

RAFT 核心处理

状态机简介

在 RAFT 协议实现的代码中,node[raft/node.go] 是其核心的实现,也是整个分布式算法的核心所在。

另外,通过 raftNode[etcdserver/raft.go] 对 node 进一步封装,只对 EtcdServer 暴露了 startNode()start()apply()processMessages() 等少数几个接口。

其中核心部分是通过 start() 方法启动的一个协程,这里会等待从 readyc 通道上报的数据。

状态机处理

如上,在添加数据时,已经添加到了 propc 管道中,此时会触发 node.run()[raft/node.go] 中协程。

node.run()                       raft/node.go 单独的协程调用
 |-newReady()                    获取已经就绪的数据,也就是msgs []pb.Message中的数据,保存到了rd.Messages中
 |-Ready.containsUpdates()       判断是否有更新,以决定是否将数据发送到readyc的管道中
 |-hasLeader()                   如果leader已经变化,那么需要获取最新的propc管道
 |                               等待propc获取数据
 |-raft.Step()                   raft/raft.go
   |-raft.step()                 这里是一个函数指针,不同状态调用函数有所区别
   |=stepLeader()                对于Leader来说,也就是同步到其它节点
   | |
   | |  <<<pb.MsgProp>>>
   | |-raft.appendEntry()        将日志添加到raftlog的unstable entry中,等待commit变成stable entry
   | | |                             放到storage中,最终变成snapshot
   | | |-raftLog.lastIndex()     raft/log.go 日志最新的ID,并对每个消息赋值ID
   | | |-raftLog.append()        将新的entry加到unstable entry中
   | | | |-unstable.truncateAndAppend()
   | | |-raft.getProgress().maybeUpdate() 这里更新了leader自己的Match
   | | |-raft.maybeCommit()      只增加了自己的commit,未收到其它节点的返回消息,此时不会更新commit index
   | |   |-raftLog.maybeCommit() 会读取raft.prs中的内容,也就是Progress
   | |     |-raftLog.commitTo()  修改commitIndex
   | |
   | |-raft.bcastAppend()        将entry广播到其它的节点,也就是日志复制
   | | |-raft.sendAppend()       构造pb.MsgApp类型的消息结构体开始发送
   | |   |-raft.send()
   | |     |-append()            添加到msgs []pb.Message中,这里相当于一个发送的缓冲区
   | |
   | |  <<<pb.MsgAppResp>>>
   | |-maybeUpdate()             从其它节点获取到的响应消息,更新本地计数
   | |-raft.maybeCommit()        判断是否提交成功,如果更新成功则广播
   | |-raft.bcastAppend()
   |
   |=stepFollower()              对于Follower来说
     |
     |  <<<pb.MsgProp>>>
     |-raft.send()               直接添加到msgs []pb.Message数组中并转发给Leader
     |
     |  <<<pb.MsgApp>>>
     |-raft.handleAppendEntries()
       |-raft.send()

注意,这里在处理时,readyc 和 advancec 只有一个是有效值。

if advancec != nil { /* 如果 advance 不空,则把 readyc 置空 */
	readyc = nil
} else { /* 每次循环都会创建一个新的ready对象,其中包含了数据msgs */
	rd = newReady(r, prevSoftSt, prevHardSt)
	if rd.containsUpdates() { /* 如果raft.msgs中队列大小不为0,表示有数据发出 */
		readyc = n.readyc
	} else {
		readyc = nil
	}
}

if lead != r.lead {
	if r.hasLeader() {//当前raft节点r中lead不为空,表示已经存在leader
		if lead == None {
			r.logger.Infof("raft.node: %x elected leader %x at term %d", r.id, r.lead, r.Term)
		} else {
			r.logger.Infof("raft.node: %x changed leader from %x to %x at term %d", 
					    r.id, lead, r.lead, r.Term)
		}
		propc = n.propc
	} else {
		r.logger.Infof("raft.node: %x lost leader %x at term %d", r.id, lead, r.Term)
		propc = nil
	}
	lead = r.lead
}

在下个循环中,会通过 newReady() 读取数据,也就是 msgs []pb.Message 中的数据,并发送到 readyc 管道中。接着会触发消息的发送,也就是 raftNode.start()[etcdserver/raft.go] 中的处理。

raftNode.start() etcdserver/raft.go 单独协程处理,包括发送消息
 |
 | <<<readyc>>>                这里会等待raft/node.go中node.Ready()返回的管道
 |-rd := <- r.Ready()          阻塞等待readyc管道中的消息,包括上述提交的数据
 |-apply{}                     构造apply对象,其中包括了已经提交的日志,SnapShot等
 |-updateCommittedIndex()
 | |-raftReadyHandler.updateCommittedIndex()
 | applyc<-ap                  添加到管道中,并等待提交完成
 |-transport.Send()            将数据,真正发送到对端
 |
 |-raftNode.processMessages()  会根据不同类型的消息进行一些异常的处理
 |-transport.Send()            rafthttp/transport.go 发送请求消息
 |-storage.Save()              这里同时会对日志以及SnapShot进行持久化处理

在消息通过 append(r.msgs, m) 添加到了发送缓冲区中之后,接着就是如何通过网络层发送数据。

在搜索 r.msgs 是,实际用的只有在 newReady() 函数中,也就是上述的处理协程中,对应了 node.run() 函数,此时会发送到 readyc 管道中。

其中,raft/node.go 中有如下的实现。

func (n *node) Ready() <-chan Ready { return n.readyc }

也就是是说,实际处理 readyc 请求是在 raftNode.start()[etcdserver/raft.go] 中。

消息发送

一般在 raft/raft.go 文件中,会通过 r.send() 发送,也就是 raft.send() 发送消息时,例如,如下是处于 Follower 状态时的处理函数 stepFollower()

func stepFollower(r *raft, m pb.Message) {
	switch m.Type {
	case pb.MsgProp:
		if r.lead == None {
			r.logger.Infof("%x no leader at term %d; dropping proposal", r.id, r.Term)
			return
		} else if r.disableProposalForwarding {
			r.logger.Infof("%x not forwarding to leader %x at term %d; dropping proposal",
				r.id, r.lead, r.Term)
			return
		}
		m.To = r.lead
		r.send(m)
	// ... ...
	}
}

在同一个文件中,最终会调用 append(r.msgs, m),那么这个消息是在什么时候消费的呢?

type node struct 结构体中,存在一个 readyc 的管道。

type node struct {
	readyc chan Ready
}

raft/node.go 中存在一个 node.run() 函数,会读取所有的消息,然后同时通过管道发送。

func newReady(r *raft, prevSoftSt *SoftState, prevHardSt pb.HardState) Ready {
	rd := Ready{
	    Entries:          r.raftLog.unstableEntries(),
		CommittedEntries: r.raftLog.nextEnts(),
		Messages:         r.msgs,
	}
	... ...
}

也就是说,在 node.gonode.run() 中构建了 Ready 对象,对象里就包涵被赋值的 msgs,并最终写到 node.readyc 这个管道里,如下是对应这个 case 的实现:

case readyc <- rd:
	r.msgs = nil
	r.readStates = nil
	advancec = n.advancec

这里的 msgs 已经读取过并写入到了管道中,直接设置为空,并会赋值 advancec,在 etcdserver/raft.goraftNode.start() 中,会起一个单独的协程读取数据;其中读取的函数实现在 raft/node.go 中:

func (n *node) Ready() <-chan Ready { return n.readyc }

应用层 (也就是etcd) 读取到的 Ready 里面包含了 Vote 消息,会调用网络层发送消息出去,并且调用 Advance() 。

raft/node.go->run() 函数中,是一个节点 (Node) 的主要处理过程,开始处于 Follower 状态,然后随着 case <-n.tickc 进行,开始进入选举。

Progress

RAFT 实现的内部,本身还维护了一个子状态机。

                            +--------------------------------------------------------+
                            |                  send snapshot                         |
                            |                                                        |
                  +---------+----------+                                  +----------v---------+
              +--->       probe        |                                  |      snapshot      |
              |   |  max inflight = 1  <----------------------------------+  max inflight = 0  |
              |   +---------+----------+                                  +--------------------+
              |             |            1. snapshot success
              |             |               (next=snapshot.index + 1)
              |             |            2. snapshot failure
              |             |               (no change)
              |             |            3. receives msgAppResp(rej=false&&index>lastsnap.index)
              |             |               (match=m.index,next=match+1)
receives msgAppResp(rej=true)
(next=match+1)|             |
              |             |
              |             |
              |             |   receives msgAppResp(rej=false&&index>match)
              |             |   (match=m.index,next=match+1)
              |             |
              |             |
              |             |
              |   +---------v----------+
              |   |     replicate      |
              +---+  max inflight = n  |
                  +--------------------+

详细可以查看 raft/design.md 中的介绍,对于 Progress 实际上就是 Leader 维护的各个 Follower 的状态信息,总共分为三种状态:probe, replicate, snapshot 。

应该是 AppendEntries 接口的一种实现方式,为每个节点维护两个 Index 信息:A) matchIndex 已知服务器的最新 Index,如果还未确定则是 0; B) nextIndex 用来标示需要从那个索引开始复制。那么 Leader 实际上就是将 nextIndex 到最新的日志复制到 Follower 节点。

参考

两种不同的实现方式 Github CoreOS-etcdGithub Hashicorp-raft