go微服务框架go-micro深度学习(五) stream 调用过程详解
上一篇写了一下rpc调用过程的实现方式,简单来说就是服务端把实现了接口的结构体对象进行反射,抽取方法,签名,保存,客户端调用的时候go-micro封请求数据,服务端接收到请求时,找到需要调用调用的对象和对应的方法,利用反射进行调用,返回数据。 但是没有说stream的实现方式,感觉单独写一篇帖子来说这个更好一些。上一篇帖子是基础,理解了上一篇,stream实现原理一点即破。先说一下使用方式,再说原理。
当前go-micro对 rpc 调用的方式大概如下:
普通的rpc调用 是这样:
1.连接服务器或者从缓存池得到连接 2.客户端 ->发送数据 -> 服务端接收 3.服务端 ->返回数据 -> 客户端处理数据 4.关闭连接或者把连接返回到缓存池
当前 rps stream的实现方式 是这样子:
1. 连接服务器 2. 客户端多次发送请求-> 服务端接收 3. 服务端多次返回数据-> 客户端处理数据 4. 关闭连接
当数据量比较大的时候我们可以用stream方式分批次传输数据。对于客户端还是服务端没有限制,我们可以根据自己的需要使用stream方式,使用方式也非常的简单,在定义接口的时候在参数或者返回值前面加上stream然后就可以多次进行传输了,使用的代码还是之前写的例子,代码都在github上:
比如我的例子中定义了两个使用stream的接口,一个只在返回值使用stream,另一个是在参数和返回值前都加上了stream,最终的使用方式没有区别
rpc Stream(model.SRequest) returns (stream model.SResponse) {} rpc BidirectionalStream(stream model.SRequest) returns (stream model.SResponse) {}
看一下go-micro为我们生成的代码rpcapi.micro.go里,不要被吓到,生成了很多代码,但是没啥理解不了的
Server端
// Server API for Say service type SayHandler interface { // .... others Stream(context.Context, *model.SRequest, Say_StreamStream) error BidirectionalStream(context.Context, Say_BidirectionalStreamStream) error } type Say_StreamStream interface { SendMsg(interface{}) error RecvMsg(interface{}) error Close() error Send(*model.SResponse) error } type Say_BidirectionalStreamStream interface { SendMsg(interface{}) error RecvMsg(interface{}) error Close() error Send(*model.SResponse) error Recv() (*model.SRequest, error) } // .... others
Client端
// Client API for Say service type SayService interface { //... others Stream(ctx context.Context, in *model.SRequest, opts ...client.CallOption) (Say_StreamService, error) BidirectionalStream(ctx context.Context, opts ...client.CallOption) (Say_BidirectionalStreamService, error) } type Say_StreamService interface { SendMsg(interface{}) error RecvMsg(interface{}) error Close() error Recv() (*model.SResponse, error) } type Say_BidirectionalStreamService interface { SendMsg(interface{}) error RecvMsg(interface{}) error Close() error Send(*model.SRequest) error Recv() (*model.SResponse, error) }
你会发现参数前面加了 Stream后,生成的代码会把你的参数变成一个接口,这个接口主要要的方法是
SendMsg(interface{}) error RecvMsg(interface{}) error Close() error
剩下的两个接口方法是根据你是发送还是接收生成的,如果有发送就会有Send(你的参数),如果有接收会生成Rev() (你的参数, error),但这两个方法只是为了让你使用时方便,里面调用的还是SendMsg(interface)和RecvMsg(interface)方法,但是他们是怎么工作的,如何多次发送和接收传输的数据,是不是感觉很神奇。
我就以TsBidirectionalStream
方法为例开始分析,上一篇和再早之前的帖子已经说了服务端启动的时候都做了哪些操作,这里就不再赘述,
服务端的实现,很简单,不断的获取客户端发过来的数据,再给客户端一次一次的返回一些数据。
/* 模拟数据 */ func (s *Say) BidirectionalStream(ctx context.Context, stream rpcapi.Say_BidirectionalStreamStream) error { for { req, err := stream.Recv() if err == io.EOF { break } if err != nil { return err } for i := int64(0); i < req.Count; i++ { if err := stream.Send(&model.SResponse{Value: []string {lib.RandomStr(lib.Random(3, 6))}}); err != nil { return err } } } return nil }
启动服务,服务开始监听客户端传过来的数据.....
客户端调用服务端方法:
// 调用 func TsBidirectionalStream(client rpcapi.SayService) { rspStream, err := client.BidirectionalStream(context.Background()) if err != nil { panic(err) } // send go func() { rspStream.Send(&model.SRequest{Count: 2}) rspStream.Send(&model.SRequest{Count: 5}) // close the stream if err := rspStream.Close(); err != nil { fmt.Println("stream close err:", err) } }() // recv idx := 1 for { rsp, err := rspStream.Recv() if err == io.EOF { break } else if err != nil { panic(err) } fmt.Printf("test stream get idx %d data %v\n", idx, rsp) idx++ } fmt.Println("Read Value End") }
当客户端在调用rpc的stream方法是要很得到stream
rspStream, err := client.BidirectionalStream(context.Background()) // func (c *sayService) BidirectionalStream(ctx context.Context, opts ...client.CallOption) (Say_BidirectionalStreamService, error) { req := c.c.NewRequest(c.name, "Say.BidirectionalStream", &model.SRequest{}) stream, err := c.c.Stream(ctx, req, opts...) if err != nil { return nil, err } return &sayServiceBidirectionalStream{stream}, nil }
这个调用c.c.Stream(ctx, req, opts...)
是关键,他的内部实现就是和服务器进行连接,然后返回一个stream,进行操作。
客户端:和服务端建立连接,返回Stream,进行接收和发送数据 服务端:接收客户端连接请求,利用反射找到相应的方法,组织Strem,传给方法,进行数据的发送和接收
建立连接的时候就是一次rpc调用,服务端接受连接,然后客户端发送一次调用,但是传输的是空数据,服务端利用反射找到具体的方法,组织stream,调用具体方法,利用这个连接,客户端和服务端进行多次通信。
低调大师中文资讯倾力打造互联网数据资讯、行业资源、电子商务、移动互联网、网络营销平台。
持续更新报道IT业界、互联网、市场资讯、驱动更新,是最及时权威的产业资讯及硬件资讯报道平台。
转载内容版权归作者及来源网站所有,本站原创内容转载请注明来源。
- 上一篇
JVM的4种垃圾回收算法、垃圾回收机制与总结
本系列持续更新。 一、垃圾回收算法 1.标记清除 标记-清除算法将垃圾回收分为两个阶段:标记阶段和清除阶段。 在标记阶段首先通过根节点(GC Roots),标记所有从根节点开始的对象,未被标记的对象就是未被引用的垃圾对象。然后,在清除阶段,清除所有未被标记的对象。 适用场合: 存活对象较多的情况下比较高效适用于年老代(即旧生代)缺点: 容易产生内存碎片,再来一个比较大的对象时(典型情况:该对象的大小大于空闲表中的每一块儿大小但是小于其中两块儿的和),会提前触发垃圾回收扫描了整个空间两次(第一次:标记存活对象;第二次:清除没有标记的对象) 2.复制算法 从根集合节点进行扫描,标记出所有的存活对象,并将这些存活的对象复制到一块儿新的内存(图中下边的那一块儿内存)上去,之后将原来的那一块儿内存(图中上边的那一块儿内存)全部回收掉 现在的商业虚拟机都采用这种收集算法来回收新生代。 适用场合: 存活对象较少的情况下比较高效扫描了整个空间一次(标记存活对象并复制移动)适用于年轻代(即新生代):基本上98%的对象是"朝生夕死"的,存活下来的会很少 缺点: 需要一块儿空的内存空间需要复制移动对象 **...
- 下一篇
给全文搜索引擎Manticore (Sphinx) search 增加中文分词
Sphinx search 是一款非常棒的开源全文搜索引擎,它使用C++开发,索引和搜索的速度非常快,我使用sphinx的时间也有好多年了。最初使用的是coreseek,一个国人在sphinxsearch基础上添加了mmseg分词的搜索引擎,可惜后来不再更新,sphinxsearch的版本太低,bug也会出现;后来也使用最新的sphinxsearch,它可以支持几乎所有语言,通过其内置的ngram tokenizer对中文进行索引和搜索。 但是,像中文、日文、韩文这种文字使用ngram还是有很大弊端的: 当Ngram=1时,中文(日文、韩文)被分解成一个个的单字,就像把英文分解成一个个字母那样。这会导致每个单字的索引很长,搜索效率下降,同时搜索结果习惯性比较差。 当Ngram=2或更大时,会产生很多无意义的“组合”,比如“的你”、“为什”等,导致索引的字典、索引文件等非常大,同时也影响搜索速度。 基于以上弊端,为中日韩文本加入分词的tokenizer是很有必要的。 于是决定来做这件事。先去Sphinxsearch网站去看看,发现它已经发布了新的3.x版本,而且加入了很多很棒的特性,然而...
相关文章
文章评论
共有0条评论来说两句吧...
文章二维码
点击排行
推荐阅读
最新文章
- CentOS6,7,8上安装Nginx,支持https2.0的开启
- SpringBoot2整合Thymeleaf,官方推荐html解决方案
- CentOS关闭SELinux安全模块
- CentOS7设置SWAP分区,小内存服务器的救世主
- Docker安装Oracle12C,快速搭建Oracle学习环境
- Docker快速安装Oracle11G,搭建oracle11g学习环境
- CentOS7编译安装Gcc9.2.0,解决mysql等软件编译问题
- CentOS8安装MyCat,轻松搞定数据库的读写分离、垂直分库、水平分库
- SpringBoot2整合MyBatis,连接MySql数据库做增删改查操作
- Mario游戏-低调大师作品