您现在的位置是:首页 > 文章详情

kubernetes集群中利用etcd和grpc实现golang服务间通信

日期:2018-11-29点击:725

kubernetes集群中利用etcd和grpc实现golang服务间通信

注:文中涉及工作环境相关的网址和IP已经被替换


1. 项目背景

服务运行于docker容器中

使用kubernetes管理容器

服务有多个节点作为一个集群

使用rest接口设置服务缓存中的信息

需要将信息同步到集群中其他节点

2. 项目方案

使用grpc做服务间通信

从etcd中读取服务所有状态为running的节点信息,包括:podIp、status、hostIp、startedAt(启动时间)

服务启动时选取运行时间最长的节点,调用grpc接口请求缓存的信息同步到本容器的服务中

使用rest接口设置缓存的时候,遍历所有节点(不包括自身),调用grpc接口将信息同步到其他节点

方案特点:

- 不需要借助额外的配置管理工具(如:zookeeper)

- 不需要自行管理节点的配置信息(因为kubernetes的etcd中已经有完整的节点信息)

- grpc开发、传输效率高,扩展性好

- grpc使用http2.0方便后续提供rest接口


1. etcd简介

etcd 是用 golang 实现的一种 K-V 分布式存储系统,内部用raft协议做一致性校验,对外提供http的访问接口,最新版中提供了grpc的访问接口。

etcd主要用于:

- 配置管理

- 服务注册于发现

- 选主

- 应用调度

- 分布式队列

- 分布式锁


与etcd类似的还有zookeeper

这里 有一篇文章简单介绍了etcd和zookeeper的优缺点以及etcd的工作原理


2. kubernetes与etcd

前面介绍了etcd特别适合用于做集群服务的配置管理,kubernets 是用于docker容器编排的,也是用golang实现的,所以自然而然就采用etcd作为服务配置的存储方式了。这里 有一篇kubernets的架构介绍。


etcd在kubernetes中的最大作用是保存容器节点(pod)信息,包括:容器的服务名、状态、IP、版本以及其他信息


通过类似如下的命令可以获取到pod的信息

curl http://10.20.30.40:2379/v2/keys/registry/pods/default

etcd中保存的容器节点信息格式如下:

{     "action": "get",     "node": {       "key": "/registry/pods/default",       "dir": true,       "nodes": [         {           "key": "/registry/pods/default/hello-web-29a74e26ea3c2138e1727f35a111f4c6-dknwh",           "value": "{\"kind\":\"Pod\",\"apiVersion\":\"v1\",\"metadata\":{\"name\":\"hello-web-29a74e26ea3c2138e1727f35a111f4c6-dknwh\",\"generateName\":\"hello-web-29a74e26ea3c2138e1727f35a111f4c6-\",\"namespace\":\"default\",\"selfLink\":\"/api/v1/namespaces/default/pods/hello-web-29a74e26ea3c2138e1727f35a111f4c6-dknwh\",\"uid\":\"09c45029-3fa0-11e7-a46c-00163e327954\",\"creationTimestamp\":\"2017-05-23T10:10:24Z\",\"labels\":{\"app\":\"hello\",\"deployment\":\"bb6de7bfc7f357818a8c07faf3987d40\",\"tier\":\"frontend\"},\"annotations\":{\"kubernetes.io/created-by\":\"{\\\"kind\\\":\\\"SerializedReference\\\",\\\"apiVersion\\\":\\\"v1\\\",\\\"reference\\\":{\\\"kind\\\":\\\"ReplicationController\\\",\\\"namespace\\\":\\\"default\\\",\\\"name\\\":\\\"hello-web-29a74e26ea3c2138e1727f35a111f4c6\\\",\\\"uid\\\":\\\"e42ce61a-3f9f-11e7-a46c-00163e327954\\\",\\\"apiVersion\\\":\\\"v1\\\",\\\"resourceVersion\\\":\\\"4361319\\\"}}\\n\"},\"ownerReferences\":[{\"apiVersion\":\"v1\",\"kind\":\"ReplicationController\",\"name\":\"hello-web\",\"uid\":\"32559b88-3fa0-11e7-a46c-00163e327954\",\"controller\":true}]},\"spec\":{\"containers\":[{\"name\":\"hello-web\",\"image\":\"docker.helloword.com/hello-web:f022d25\",\"ports\":[{\"containerPort\":8087,\"protocol\":\"TCP\"}],\"env\":[{\"name\":\"SERVER\",\"valueFrom\":{\"configMapKeyRef\":{\"name\":\"cluster-config\",\"key\":\"external.ip\"}}},{\"name\":\"SERVER_PORT\",\"valueFrom\":{\"configMapKeyRef\":{\"name\":\"hello-config\",\"key\":\"hello.api.port\"}}}],\"resources\":{\"limits\":{\"cpu\":\"1\",\"memory\":\"1Gi\"},\"requests\":{\"cpu\":\"100m\",\"memory\":\"512Mi\"}},\"terminationMessagePath\":\"/dev/termination-log\",\"imagePullPolicy\":\"IfNotPresent\"}],\"restartPolicy\":\"Always\",\"terminationGracePeriodSeconds\":30,\"dnsPolicy\":\"ClusterFirst\",\"nodeName\":\"10.30.58.179\",\"securityContext\":{},\"imagePullSecrets\":[{\"name\":\"cn-registry\"}]},\"status\":{\"phase\":\"Running\",\"conditions\":[{\"type\":\"Initialized\",\"status\":\"True\",\"lastProbeTime\":null,\"lastTransitionTime\":\"2017-05-23T10:10:24Z\"},{\"type\":\"Ready\",\"status\":\"True\",\"lastProbeTime\":null,\"lastTransitionTime\":\"2017-05-23T10:10:29Z\"},{\"type\":\"PodScheduled\",\"status\":\"True\",\"lastProbeTime\":null,\"lastTransitionTime\":\"2017-05-23T10:10:24Z\"}],\"hostIP\":\"10.30.58.179\",\"podIP\":\"172.80.13.4\",\"startTime\":\"2017-05-23T10:10:24Z\",\"containerStatuses\":[{\"name\":\"hello-web\",\"state\":{\"running\":{\"startedAt\":\"2017-05-23T10:10:29Z\"}},\"lastState\":{},\"ready\":true,\"restartCount\":0,\"image\":\"docker.helloword.com/hello-web:f022d25\",\"imageID\":\"docker-pullable://docker.helloword.com/hello-web@sha256:f8e0460983b0d3f87733453b588469d8e225afbfc764da2ae55238cd524ef70a\",\"containerID\":\"docker://78cd912de942f744a36bd51907562c5e670fb300ddc85267e3ec72572fdb5617\"}]}}\n",           "modifiedIndex": 4361528,           "createdIndex": 4361320         }       ]     } }

其中value部分的json数据格式化后如下:

{   "kind": "Pod",   "apiVersion": "v1",   "metadata": {     "name": "hello-web-29a74e26ea3c2138e1727f35a111f4c6-dknwh",     "generateName": "hello-web-29a74e26ea3c2138e1727f35a111f4c6-",     "namespace": "default",     "selfLink": "/api/v1/namespaces/default/pods/hello-web-29a74e26ea3c2138e1727f35a111f4c6-dknwh",     "uid": "09c45029-3fa0-11e7-a46c-00163e327954",     "creationTimestamp": "2017-05-23T10:10:24Z",     "labels": {       "app": "hello",       "deployment": "bb6de7bfc7f357818a8c07faf3987d40",       "tier": "frontend"     },     "annotations": {       "kubernetes.io/created-by": "{\"kind\":\"SerializedReference\",\"apiVersion\":\"v1\",\"reference\":{\"kind\":\"ReplicationController\",\"namespace\":\"default\",\"name\":\"hello-web-29a74e26ea3c2138e1727f35a111f4c6\",\"uid\":\"e42ce61a-3f9f-11e7-a46c-00163e327954\",\"apiVersion\":\"v1\",\"resourceVersion\":\"4361319\"}}\n"     },     "ownerReferences": [       {         "apiVersion": "v1",         "kind": "ReplicationController",         "name": "hello-web",         "uid": "32559b88-3fa0-11e7-a46c-00163e327954",         "controller": true       }     ]   },   "spec": {     "containers": [       {         "name": "hello-web",         "image": "docker.helloword.com/hello-web:f022d25",         "ports": [           {             "containerPort": 8087,             "protocol": "TCP"           }         ],         "env": [           {             "name": "SERVER",             "valueFrom": {               "configMapKeyRef": {                 "name": "cluster-config",                 "key": "external.ip"               }             }           },           {             "name": "SERVER_PORT",             "valueFrom": {               "configMapKeyRef": {                 "name": "hello-config",                 "key": "hello.api.port"               }             }           }         ],         "resources": {           "limits": {             "cpu": "1",             "memory": "1Gi"           },           "requests": {             "cpu": "100m",             "memory": "512Mi"           }         },         "terminationMessagePath": "/dev/termination-log",         "imagePullPolicy": "IfNotPresent"       }     ],     "restartPolicy": "Always",     "terminationGracePeriodSeconds": 30,     "dnsPolicy": "ClusterFirst",     "nodeName": "10.30.58.179",     "securityContext": {},     "imagePullSecrets": [       {         "name": "cn-registry"       }     ]   },   "status": {     "phase": "Running",     "conditions": [       {         "type": "Initialized",         "status": "True",         "lastProbeTime": null,         "lastTransitionTime": "2017-05-23T10:10:24Z"       },       {         "type": "Ready",         "status": "True",         "lastProbeTime": null,         "lastTransitionTime": "2017-05-23T10:10:29Z"       },       {         "type": "PodScheduled",         "status": "True",         "lastProbeTime": null,         "lastTransitionTime": "2017-05-23T10:10:24Z"       }     ],     "hostIP": "10.30.58.179",     "podIP": "172.80.13.4",     "startTime": "2017-05-23T10:10:24Z",     "containerStatuses": [       {         "name": "hello-web",         "state": {           "running": {             "startedAt": "2017-05-23T10:10:29Z"           }         },         "lastState": {},         "ready": true,         "restartCount": 0,         "image": "docker.helloword.com/hello-web:f022d25",         "imageID": "docker-pullable://docker.helloword.com/hello-web@sha256:f8e0460983b0d3f87733453b588469d8e225afbfc764da2ae55238cd524ef70a",         "containerID": "docker://78cd912de942f744a36bd51907562c5e670fb300ddc85267e3ec72572fdb5617"       }     ]   } }

3. grpc简介

grpc是google实现的一种基于protobuf的远程服务调用框架,数据采用二进制传输,其传输协议是基于http2.0。


相比于其他各种rpc框架,grpc由于基于protobuf和http2.0,具有以下优点:

- 通用性好,支持各种语言

- 二进制传输,效率高

- 扩展性好,只需要修改protobuf文件并重新生成代码


4. grpc开发环境搭建

4.1 protobuf环境

首先,去https://github.com/google/protobuf/releases/tag/v3.3.0 这个页面下载对应的protobuf编译器安装文件并安装好protoc

go get -u github.com/golang/protobuf cd $GOPATH/src/github.com/golang/protobuf # 如果有安装makefile,直接执行make install,如果没有则执行以下命令 go install ./proto ./jsonpb ./ptypes go install ./protoc-gen-go

4.2 grpc环境

#安装grpc依赖库 go get -u google.golang.org/grpc #安装grpc-go插件,用于将proto文件编译成grpc的golang代码 go get -u github.com/grpc/grpc-go cd $GOPATH/src mv github.com/grpc/grpc-go google.golang.org/grpc/grpc-go

遇到go get无法下载的包,也可以通过 http://gopm.io/ 或者 http://golangtc.com/download/package 进行下载


5. 定义proto文件

syntax = "proto3"; //使用proto3版本 //用于java等语言的package配置 option java_multiple_files = true; option java_package = "io.grpc.examples.hellorpc"; option java_outer_classname = "hellorpcProto"; //用于golang等语言的package配置 package hellorpc; //定义服务接口,其中rpc关键字表示 rpc 接口,用于生成grpc接口代码 service Sync {     rpc Get (SyncRequest) returns(SyncResponse) {}     rpc Set (SyncRequest) returns(SyncResponse) {}     rpc GetAll(SyncRequest)returns(SyncResponse) {} } //定义请求数据类型, repeated最终会转换成golang中的数组/切片 message SyncRequest {     repeated SyncData data= 1; } //定义返回的数据类型 message SyncResponse {     repeated SyncData data= 1; } //定义实体数据类型,用type字段表示请求的数据类型,用data字段保存请求的数据或者返回的数据 //map最终会转换成golang中的map[string]string类型 message SyncData {     int32 type = 1;     map data = 2; }

编译proto文件

protoc --go_out=plugins=grpc:./hellorpc hellorpc.proto

其中–go_out用于指定go的proto编译插件以及插件参数

编译成功后,会在 hellorpc目录中生成 hellorpc.pb.go 文件,可以在其他go文件中通过 import “hello-api/hellorpc” 来使用文件中定义的接口


6. hellorpc.pb.go 文件分析

前面提到的 service Sync 部分会编译成如下两部分

type SyncClient interface {     Get(ctx context.Context, in *SyncRequest, opts ...grpc.CallOption) (*SyncResponse, error)     Set(ctx context.Context, in *SyncRequest, opts ...grpc.CallOption) (*SyncResponse, error)     GetAll(ctx context.Context, in *SyncRequest, opts ...grpc.CallOption) (*SyncResponse, error) } type SyncServer interface {     Get(context.Context, *SyncRequest) (*SyncResponse, error)     Set(context.Context, *SyncRequest) (*SyncResponse, error)     GetAll(context.Context, *SyncRequest) (*SyncResponse, error) }

其中 SyncClient 的接口 在 hellorpc.pb.go 里面已经实现好了接口,直接调用即可,但SyncServer定义的接口是需要我们自己实现


7. 服务端代码实现(rtc_server.go)

//先定义server类型,并实现好SyncServer定义的接口 type server struct {} const (     HELLO_SYNC_REST_CLUSTER_INFO = iota ) func (s *server)Get(ctx context.Context, in *hellorpc.SyncRequest) (*hellorpc.SyncResponse, error){     var response = hellorpc.SyncResponse{Data: make([]*hellorpc.SyncData, 0, 10)}     for i := 0; i < len(in.Data); i++{ request := in.Data[i] switch request.Type { case hello_SYNC_REST_CLUSTER_INFO: // get something from local cache and set to response break } } return &response, nil } func (s *server)Set(ctx context.Context, in *hellorpc.SyncRequest) (*hellorpc.SyncResponse, error){ var response = hellorpc.SyncResponse{Data: make([]*hellorpc.SyncData, 0, 10)} for i := 0; i < len(in.Data); i++{ request := in.Data[i] switch request.Type { case HELLO_SYNC_REST_CLUSTER_INFO: // set something to local cache, and set the result to response break } } return &response, nil } func (s *server)GetAll(ctx context.Context, in *hellorpc.SyncRequest) (*hellorpc.SyncResponse, error){ var response = hellorpc.SyncResponse{Data: make([]*hellorpc.SyncData, 0, 10)} for i := 0; i < len(in.Data); i++{ request := in.Data[i] switch request.Type { case HELLO_SYNC_REST_CLUSTER_INFO: // get all data from local cache, and set the result to response break } } return &response, nil }

实现好接口后,我们需要将服务注册到grpc,这里我们实现一个名为StartSyncServer的函数来做这些事情

func StartSyncServer(address string) error{     lis, err := net.Listen("tcp", address)     if err != nil {         beego.Debug("start sync server error: %v", err)         return err     }     s := grpc.NewServer()     hellorpc.RegisterSyncServer(s, &server{})     //由于s.Serve方法是会一直阻塞住,所以我们需要起一个go routine来执行,在其停止后输出错误信息     go func(){         err := s.Serve(lis)         beego.Debug("sync server stopped with error: %v", err)     }()     return nil }

将StartSyncServer函数添加到模块的 init 函数中执行,我们服务端的代码就基本完成了


8. 客户端代码实现(rtc_client.go)

//先定义好客户端类型syncClient,这里我们利用继承的方式将hellorpc.SyncClient实现的方法继承过来 type syncClient struct{     hellorpc.SyncClient     conn *grpc.ClientConn     address string } func OpenSyncClient(address string)(syncClient, error) {     s := syncClient{}     //grpc.WithInsecure用于关闭安全验证,因为我们是在docker内部环境里使用,不暴露在外网,就没有加安全认证了     conn, err := grpc.Dial(address, grpc.WithInsecure(), grpc.WithTimeout(5*time.Second))     if err != nil {         fmt.Println("----open client error %v, conn: %v", err, conn)         return s, err     }     s.conn = conn     s.address = address     s.client = hellorpc.NewSyncClient(conn)     return s, nil } func CloseSyncClient(s *syncClient) {     if s.conn != nil {         s.conn.Close()         s.conn = nil         s.client = nil     } }

这样我们只需要编写 c, err := OpenSyncClient(address),既可通过 response, err := c.Get(context.Background(), request) 的方式调用hellorpc.SyncClient定义的方法


9. etcd客户端代码实现(部分功能)

根据etcd的返回值数据结构,我们定义一下两种类型的数据

//用于保存etcd的返回的数据 type EtcdData struct{     Key             string     Dir             bool     Value           interface{}     CreatedIndex    int32     ModifiedIndex   int32     Nodes           []EtcdData } //用于保存pod相关的数据 type PodData struct {     Name            string     PodIP           string     HostIP          string     Status          string     UpdateTime      string     Timestamp       int64 } func newEtcdData() EtcdData{     return EtcdData{Dir: false, Value: "", Key: "", Nodes: make([]EtcdData, 0, 100)} }

接下来我们实现EtcdClient

//先定义好EtcdClient的数据结构 type EtcdClient struct{} //用于解析etcd返回的数据 func parseEtcdData(dataIn map[string]interface{}, dataOut *EtcdData) error {     if key, ok := dataIn["key"]; ok {         dataOut.Key = key.(string)     }     if isDir, ok := dataIn["dir"]; ok {         dataOut.Dir = isDir.(bool)     }     if value, ok := dataIn["value"]; ok {         dataOut.Value = value     }     if createdIndex, ok := dataIn["createdIndex"]; ok {         dataOut.CreatedIndex = int32(createdIndex.(float64))     }     if modifiedIndex, ok := dataIn["modifiedIndex"]; ok {         dataOut.ModifiedIndex = int32(modifiedIndex.(float64))     }     if nodes, ok := dataIn["nodes"]; ok {         var subnodes = nodes.([]interface{})         for i := 0; i < len(subnodes); i++{ node := subnodes[i].(map[string]interface{}) var nodeData = newEtcdData() parseEtcdData(node, &nodeData) dataOut.Nodes = append(dataOut.Nodes,nodeData) } } return nil } //实现Get方法用于获取某个key的值 func (c *EtcdClient)Get(baseUrl, key string)(EtcdData, error){ var url = baseUrl + key var res = newEtcdData() var result = make(map[string]interface{}) resp, err := http.Get(url) if err == nil{ out, err1 := ioutil.ReadAll(resp.Body) if err1 == nil{ err2 := json.Unmarshal([]byte(out), &result) if err2 != nil{ return res, err2 } node := result["node"].(map[string]interface{}) err = parseEtcdData(node, &res) }else{ return res, err1 } } return res, err }

由于我们的服务是跑在docker里,由kubernetes进行服务编排,所以我们需要解析kubernetes在etcd中保存的数据

//用于解析pod的状态信息func parsePodStatus(podStatus interface{}, podData *PodData){     pod_status := podStatus.(map[string]interface{})    if podIP, ok := pod_status["podIP"]; ok {         podData.PodIP = podIP.(string)     }    if hostIP, ok := pod_status["hostIP"]; ok {         podData.HostIP = hostIP.(string)     }    if status, ok := pod_status["phase"]; ok{         podData.Status = strings.ToLower(status.(string))        if containerStatuses, ok := pod_status["containerStatuses"]; ok{            for i := 0; i 

10. EtcdClient结合SyncClient

拿到了服务所有容器的IP

遍历所有pod的IP

使用IP+端口建立连接(OpenSyncClient)

执行grpc server端提供的服务接口,如:c.Get …

校验/处理返回值(同步本地信息)

断开连接(CloseSyncClient)

11. 总结

直接使用kubernetes的etcd,主要是因为kubernetes的etcd已经有所有节点的信息,不需要另外再维护节点信息

protobuf文件中的request、response数据结构中使用repeate以及SyncData采用map,主要用于批量请求、返回结果以及方便扩展

自定义数据结构保存etcd返回的数据,而不是直接使用json处理后的数据,主要是因为各接口之间使用方便,更易于维护。

使用继承的方式来扩展的接口,可以有效减少代码量


原文链接:https://blog.roncoo.com/article/128938
关注公众号

低调大师中文资讯倾力打造互联网数据资讯、行业资源、电子商务、移动互联网、网络营销平台。

持续更新报道IT业界、互联网、市场资讯、驱动更新,是最及时权威的产业资讯及硬件资讯报道平台。

转载内容版权归作者及来源网站所有,本站原创内容转载请注明来源。

文章评论

共有0条评论来说两句吧...

文章二维码

扫描即可查看该文章

点击排行

推荐阅读

最新文章