Giraph源码分析(三)—— 消息通信
由前文知道每个BSPServiceWorker有一个WorkerServer对象,WorkerServer对象里面又有ServerData对象,作为数据实。ServerData中包含该Worker的partitionStore、edgeStore、incomingMessageStore、currentMessageStore、聚集值等。其中incomingMessageStore对象为MessageStoreByPartition(接口)类型,也就是说消息时按照分区来存储的。MessageStoreByPartition接口的关系图如下:
在SimpleMessageStore抽象类中,有一个ConcurrentMap>类型的变量map,用来存储消息。第一层是pairtitionID到发送到该partition消息的映射;第二层是VertexID 到发送给该Vertex的消息队列。
《Giraph通信模块分析》:http://my.oschina.net/skyaugust/blog/95182
每个顶点的消息列表具体为ExtendedDataOutput类型,它继承DataOutput接口,增加了几个方法而已。每个消息是以字节形式写入到ExtendedDataOutput对象中的。
发送消息时,采用异步式通信。
图顶点的计算处理与消息通信并发执行,在计算过程中就可以发送消息,将大规模消息发送分散在不同的时间段,避免瞬时网络通信阻塞,但是接受端需要额外的空间,存储临时接收到的消息,相当于空间换时间。而集中式通信,图顶点的计算处理与消息通信串行进行,在计算完毕后,统一发送消息,控制和实现方式简单,可在发送端对消息进行最大程度优化,但容易造成瞬时间的网络通信阻塞以及增加发送端的消息存储开销。
不同Worker间的消息通信使用RPC方式,具体为Netty。同一Worker内,连续两次迭代的消息直接通过内存操作,把要发送的消息直接复制到Worker的incomingMessageStore中。下面详述消息的存储格式和发送机制。
Giraph使用Cache来缓存消息,当消息达到一定阈值后,一次性发送。
既按照bulk模式进行,不会一条一条信息发送。向某个顶点发送的消息是按照 pair存储在ByteArrayVertexIdData中(实际为ByteArrayVertexIdMessages类型)。介绍如下: org.apache.giraph.utils.ByteArrayVertexIdData
功能:把<顶点ID,data> Pair 存储在一个 byte数组中。里面有 ExtendedDataOutput对象用来存储数据。
该类中还有一个内部类:VertexIdDataIterator,该内部类继承 VertexIdIterator类。
org.apache.giraph.comm.SendCache用来缓存发送的信息,然后以“Bulk”模式发送。在Giraph中,每个Worker上可以对应多个分区。消息缓存的阈值是以Worker为单位计算,而不是Partition。
SendCache中有ByteArrayVertexIdData[ ] dataCache数组用来存储发送给每个Partition的消息;有int[ ] dataSizes数组用于记录向每个Worker发送的消息大小,若大于MAX_MSG_REQUEST_SIZE(默认为512KB)就把此Worker上的所有Partition缓存的消息发送到给该Worker,同一Worker内消息也是如此缓存;有int[ ] initBufferSizes数组用于记录每个Worker上的每个Partition的初始化ByteArrayVertexIdData中ExtendedDataOutput对象的大小,同一Worker上的所有Partition初始值相同,该值为平均值。记MAX_MSG_REQUEST_SIZE(message request size)值为M, 该Worker上有P个 partitions,ADDTITIONNAL_MSG_REQUEST_SIZE(比平均值大的因子)默认为0.2f,记为A。则每个Partition的初始大小为:M*(1+A) / P .
由前文知道,每个Worker都有一个NettyWorkerClientRequestProcessor用来发送消息。该类中有SendMessageCache对象用来缓存向外发送的信息。NettyWorkerClientRequestProcessor类中的sendMessageRequest(I,M)
方法如下,用于向某个顶点destVertexId发送消息message。
方法解释:首先根据destVertexId得到对应的partitionId和WorkerInfo,然后把消息add到SendMessageCache中,并返回向该顶点所属Worker发送的消息大小workerMessageSize。若该值大于默认值512KB,则把此Worker对应的所有Partition消息从SendMessageCache中删除,把删除的消息赋值给workerMessages,其类型为PairList> ,key为partitionId,value为发送给该partition的消息列表,最后调用doRequest()方法发送信息。doRequest()方法如下:
可以看到在发送消息时,先判断是否在同一Worker上。如果是的话,调用SendWorkerMessagesRequest的doRequest发送消息;否则使用WorkerClient(底层使用Netty)进行消息发送。下面着重讨论同一Worker内的机制。
org.apache.giraph.comm.requests.SendWorkerMessagesRequest类中的doRequest方法如下:
参数为该Worker的ServerData,代码中的partitionVertexData实际为PairList>workerMessages。遍历来添加到ServerData中的incomingMessageStore中。
ByteArrayMessagesPerVertexStore类中的addPartitionMessages()方法如下:
当用户使用了Combiner,incomingMessageStore对应的类型则为OneMessagePerVertexStore,该类为每个顶点只存储一个消息,而非消息队列。 结构如下图:
当添加一条消息时,会把顶点已对应的消息和要添加的消息调用combine()方法进行合并,然后存储在上述结构图中。addPartitionMessages()方法如下:
在ComputeCallable中的call()方法调用computePartition(Partition)计算完所有Partition上的顶点后,调用WorkerClientRequestProcessor.flush()方法把所有剩余的消息发送出去。
低调大师中文资讯倾力打造互联网数据资讯、行业资源、电子商务、移动互联网、网络营销平台。
持续更新报道IT业界、互联网、市场资讯、驱动更新,是最及时权威的产业资讯及硬件资讯报道平台。
转载内容版权归作者及来源网站所有,本站原创内容转载请注明来源。
- 上一篇
Protocol Buffers 开发者指南
欢迎来到 protocol buffers 的开发者指南。protocol buffers 是一个语言中立,平台中立针对通讯协议,数据存储和其他领域中对结构化数据进行序列化的扩展方法。 本文档主要针对的是 Java,C++ 或 Python 的开发人员希望在开发的应用程序中使用 Protocol Buffers。这个有关Protocol Buffers 摘要性的介绍将会告诉你如何开始使用Protocol Buffers。如果你希望更加深入的了解有关Protocol Buffers 的内容,你可以进入tutorials或者protocol buffer encoding页面来详细了解。 有关 API 的参考文档,请参考页面:reference documentation这里提供了所有这 3 种语言的参考,同时也针对.protolanguage和style提供相关的指南。 什么是 Protocol Buffers? Protocol buffers 是对结构化数据序列化的一个灵活,高效,自动化工具 —— 你可以将Protocol buffers 想象成 XML,但是体积更小,更快也更加简单...
- 下一篇
申通快递单号查询api接口免费对接调用
申通物流轨迹查询-使用的物流单号和快递单号即可实现查询物流信息。 目前提供的快递查询接口有免费版和收费版,目前比较常用的是菜鸟和快递鸟接口。 快递鸟接口免费不限量对接 接口规则 (1)、查询接口支持按照运单号查询(单个查询,并发不超过10个/S)。 (2)、指定的物流运单号选择相应的快递公司编码,格式不对或则编码错误都会返失败的信息。如EMS物流单号应选择快递公司编码(EMS) (3)、返回的物流跟踪信息按照发生的时间升序排列。 (4)、接口指令1002。 (5)、请求地址:http://api.kdniao.cc/Ebusiness/EbusinessOrderHandle.aspx (6)、接口提供:快递鸟 系统级和应用级输入参数返回结果参数 JAVAdemo import java.io.BufferedReader; import java.io.IOException; import java.io.InputStreamReader; import java.io.OutputStreamWriter; import java.io.UnsupportedEncodingE...
相关文章
文章评论
共有0条评论来说两句吧...
文章二维码
点击排行
推荐阅读
最新文章
- Windows10,CentOS7,CentOS8安装Nodejs环境
- Docker安装Oracle12C,快速搭建Oracle学习环境
- Windows10,CentOS7,CentOS8安装MongoDB4.0.16
- CentOS8编译安装MySQL8.0.19
- MySQL8.0.19开启GTID主从同步CentOS8
- CentOS8安装Docker,最新的服务器搭配容器使用
- CentOS8,CentOS7,CentOS6编译安装Redis5.0.7
- SpringBoot2整合Redis,开启缓存,提高访问速度
- CentOS7,8上快速安装Gitea,搭建Git服务器
- Docker使用Oracle官方镜像安装(12C,18C,19C)