在使用 RAFT 协议内核时,需要单独实现网络的通讯协议,也就是集群中各个结点之间的通讯,除此之外还有客户端与服务器之间的通讯。
这里简单介绍其实现方式。
集群通讯
实际上,在 ETCD 中有两个与网络交互的流处理对象:streamReader
和 streamWriter
,分别用来处理网络的读写流量。
两者的实现都保存在 etcdserver/api/rafthttp/stream.go
中的实现。
gRPC
V3 版本中与客户端的通讯,使用 gRPC 替换掉了 HTTP ,而服务器各个节点之间的通讯还是使用类似 HTTP 的请求(与真正的 HTTP 请求类似,但是有部分区别,后面详述)。
其中与服务端相关的 RPC 保存在 etcdserver/etcdserverpb/rpc.proto
文件中,通过 service 定义了一系列的 RPC 请求方法,对应的请求和响应报文则在最后定义。
可以使用 scripts/genproto.sh
生成 go 文件,此时会生成两个 rpc.pb.go
以及 rpc.pb.gw.go
后者为一个反向代理,用于将 http 请求转换为 grpc 再发送到后端,可以使用 curl 命令调试。
服务启动
其入口在 etcdserver/api/v3rpc/grpc.go
文件中,而对应各个服务的实现保存在 etcdserver/api/v3rpc
目录下。
示例代码
package main
import (
"fmt"
"github.com/coreos/etcd/clientv3"
"golang.org/x/net/context"
"time"
)
var (
dialTimeout = 5 * time.Second
requestTimeout = 10 * time.Second
endpoints = []string{"127.0.0.1:2379"}
)
func main() {
cli, err := clientv3.New(clientv3.Config{
Endpoints: []string{"127.0.0.1:2379"},
DialTimeout: dialTimeout,
})
if err != nil {
println(err)
}
defer cli.Close()
fmt.Println("Start to running ...")
ctx, cancel := context.WithTimeout(context.Background(), requestTimeout)
_, err = cli.Put(ctx, "/test/hello", "world")
cancel()
ctx, cancel = context.WithTimeout(context.Background(), requestTimeout)
resp, err := cli.Get(ctx, "/test/hello")
cancel()
for _, ev := range resp.Kvs {
fmt.Printf("%s : %s\n", ev.Key, ev.Value)
}
_, err = cli.Put(context.TODO(), "key", "xyz")
ctx, cancel = context.WithTimeout(context.Background(), requestTimeout)
_, err = cli.Txn(ctx).
If(clientv3.Compare(clientv3.Value("key"), ">", "abc")).
Then(clientv3.OpPut("key", "XYZ")).
Else(clientv3.OpPut("key", "ABC")).
Commit()
cancel()
rch := cli.Watch(context.Background(), "/test/hello", clientv3.WithPrefix())
for wresp := range rch {
for _, ev := range wresp.Events {
fmt.Printf("%s %q : %q\n", ev.Type, ev.Kv.Key, ev.Kv.Value)
}
}
if err != nil {
println(err)
}
}
通过如下命令运行。
GOPATH=$YOUR_WORKSPACE/gopath:$GOPATH go run main.go