如前所述,ETCD 中 RAFT 协议的只是实现了其核心的部分,而其中的存储模块需要单独实现。
Storage
应用程序需要实现存储 IO 和网络通讯,其中存储在 RAFT 中通过 type Storage interface
定义,包括了读取 log、执行 Snapshot 等接口。
其本身实现了基于内存的 MemoryStorage[raft/storage.go]
,ETCD 将其作为 Cache 使用,每次事务中会先将日志持久化到存储设备上,然后再更新 MemoryStorage 。
type Storage interface {
// 初始化时会返回持久化之后的HardState和ConfState
InitialState() (pb.HardState, pb.ConfState, error)
// 返回范围[lo,hi)内的日志数据
Entries(lo, hi, maxSize uint64) ([]pb.Entry, error)
// 获取entry i的term值
Term(i uint64) (uint64, error)
// 日志中最新一条日志的序号
LastIndex() (uint64, error)
// 日志中的第一条日志序号,老的日志已经保存到snapshot中
FirstIndex() (uint64, error)
Snapshot() (pb.Snapshot, error)
}
type MemoryStorage struct {
sync.Mutex
hardState pb.HardState
snapshot pb.Snapshot
ents []pb.Entry
}
如上是,Storage
接口和 MemoryStorage
结构体的定义。
ETCD 实现
实际上是在 etcdserver/storage.go
中的实现,其接口定义名称与上述相同,同样也是 type Storage interface
,注意不要将两者混淆。
type Storage interface {
Save(st raftpb.HardState, ents []raftpb.Entry) error
SaveSnap(snap raftpb.Snapshot) error
Close() error
}
type storage struct {
*wal.WAL
*snap.Snapshotter
}
在上述定义的 type storage struct
结构体中,根据 Go 语言的特性,因为没有声明成员变量的名字,可以直接使用 WAL 和 Snapshotter 定义的方法,也就是该结构体是对后两者的封装。
这里的 storage
和 MemoryStorage
的结合使用就是在 etcdserver/raft.go
实现,对应了 type raftNode struct
结构体,其中包含的是 type raftNodeConfig struct
,也就是真正的封装。
type raftNodeConfig struct {
isIDRemoved func(id uint64) bool
raft.Node
raftStorage *raft.MemoryStorage
storage Storage
heartbeat time.Duration // for logging
transport rafthttp.Transporter
}
如上,其中 raftStorage
是提供给 RAFT 协议层使用的,而 storage
则是 ETCD 实现持久化存贮的核心内容。
启动流程
NewServer() etcdserver/server.go
|-store.New() store/store.go 根据入参创建一个初始化的目录
| |-newStore() 创建数据存储的目录
|-snap.New() snap/snapshotter.go 这里只是初始化一个对象,并未做实际操作
|-openBackend() etcdserver/backend.go
| |-newBackend() 在新的协程中打开,同时会设置10秒的超时时间
|
| <<<haveWAL>>> 存在WAL日志,也就是非第一次部署
|-Snapshotter.Load() snap/snapshotter.go 开始加载snapshot
| |-Snapshotter.snapNames() 会遍历snap目录下的文件,并逆序排列返回
| |-loadSnap() 依次加载上述返回的snap文件
| |-Read() 读取文件,如果报错那么会添加一个.broken的后缀
| |-ioutil.ReadFile() 调用系统接口读取文件
| |-snappb.Unmarshal() 反序列化
| |-crc32.Update() 更新并校验CRC的值
| |-raftpb.Unmarshal() 再次反序列化获取值
|-store.Recovery() store/store.go 从磁盘中恢复数据
| |-json.Unmarshal() snap中保存的应该是json体
| |-newTtlKeyHeap() 一个TTL的最小栈,用来查看将要过期的数据
| |-recoverAndclean() ???没有理清楚具体删除的是什么过期数据
|-recoverSnapshotBackend() etcdserver/backend.go 开始恢复snapshot
| |-openSnapshotBackend() 这里会将最新的一次的snapshot重命名为DB
| |-openBackend()
|
|-restartNode() etcdserver/raft.go
| |-readWAL()
| |-raft.NewMemoryStorage()
| |-ApplySnapshot()
| |-SetHardState()
| |-Append()
| |-RestartNode()
|-SetStore()
|-SetBackend()
|-Recover()