ETCD 网络模块

2017-12-05 database

在使用 RAFT 协议内核时,需要单独实现网络的通讯协议,也就是集群中各个结点之间的通讯,除此之外还有客户端与服务器之间的通讯。

这里简单介绍其实现方式。

集群通讯

实际上,在 ETCD 中有两个与网络交互的流处理对象:streamReaderstreamWriter,分别用来处理网络的读写流量。

两者的实现都保存在 etcdserver/api/rafthttp/stream.go 中的实现。

gRPC

V3 版本中与客户端的通讯,使用 gRPC 替换掉了 HTTP ,而服务器各个节点之间的通讯还是使用类似 HTTP 的请求(与真正的 HTTP 请求类似,但是有部分区别,后面详述)。

其中与服务端相关的 RPC 保存在 etcdserver/etcdserverpb/rpc.proto 文件中,通过 service 定义了一系列的 RPC 请求方法,对应的请求和响应报文则在最后定义。

可以使用 scripts/genproto.sh 生成 go 文件,此时会生成两个 rpc.pb.go 以及 rpc.pb.gw.go 后者为一个反向代理,用于将 http 请求转换为 grpc 再发送到后端,可以使用 curl 命令调试。

服务启动

其入口在 etcdserver/api/v3rpc/grpc.go 文件中,而对应各个服务的实现保存在 etcdserver/api/v3rpc 目录下。

示例代码

package main

import (
        "fmt"
        "github.com/coreos/etcd/clientv3"
        "golang.org/x/net/context"
        "time"
)

var (
        dialTimeout    = 5 * time.Second
        requestTimeout = 10 * time.Second
        endpoints      = []string{"127.0.0.1:2379"}
)

func main() {
        cli, err := clientv3.New(clientv3.Config{
                Endpoints:   []string{"127.0.0.1:2379"},
                DialTimeout: dialTimeout,
        })
        if err != nil {
                println(err)
        }
        defer cli.Close()

        fmt.Println("Start to running ...")
        ctx, cancel := context.WithTimeout(context.Background(), requestTimeout)
        _, err = cli.Put(ctx, "/test/hello", "world")
        cancel()

        ctx, cancel = context.WithTimeout(context.Background(), requestTimeout)
        resp, err := cli.Get(ctx, "/test/hello")
        cancel()

        for _, ev := range resp.Kvs {
                fmt.Printf("%s : %s\n", ev.Key, ev.Value)
        }

        _, err = cli.Put(context.TODO(), "key", "xyz")
        ctx, cancel = context.WithTimeout(context.Background(), requestTimeout)
        _, err = cli.Txn(ctx).
                If(clientv3.Compare(clientv3.Value("key"), ">", "abc")).
                Then(clientv3.OpPut("key", "XYZ")).
                Else(clientv3.OpPut("key", "ABC")).
                Commit()
        cancel()

        rch := cli.Watch(context.Background(), "/test/hello", clientv3.WithPrefix())
        for wresp := range rch {
                for _, ev := range wresp.Events {
                        fmt.Printf("%s %q : %q\n", ev.Type, ev.Kv.Key, ev.Kv.Value)
                }
        }

        if err != nil {
                println(err)
        }
}

通过如下命令运行。

GOPATH=$YOUR_WORKSPACE/gopath:$GOPATH go run main.go