Giraph 源码分析(五)—— 加载数据+同步总结
作者|白松
关于Giraph 共有九个章节,本文第五个章节。
环境:在单机上(机器名:giraphx)启动了2个workers。
输入:SSSP文件夹,里面有1.txt和2.txt两个文件。
1、在Worker向Master汇报健康状况后,就开始等待Master创建InputSplit。
方法:每个Worker通过检某个Znode节点是否存在,同时在此Znode上设置Watcher。若不存在,就通过BSPEvent的waitForever()方法释放当前线程的锁,陷入等待状态。一直等到master创建该znode。此步骤位于BSPServiceWorker类中的startSuperStep方法中,等待代码如下:
2、Master调用createInputSplits()方法创建InputSplit。
在generateInputSplits()方法中,根据用户设定的VertexInputFormat获得InputSplits。代码如下:
其中minSplitCountHint为创建split的最小数目,其值如下:
minSplitCountHint = Workers数目 * NUM_INPUT_THREADS
NUM_INPUT_THREADS表示 每个Input split loading的线程数目,默认值为1 。 经查证,在TextVertexValueInputFormat抽象类中的getSplits()方法中的minSplitCountHint参数被忽略。用户输入的VertexInputFormat继承TextVertexValueInputFormat抽象类。
如果得到的splits.size小于minSplitCountHint,那么有些worker就没被用上。
得到split信息后,要把这些信息写到Zookeeper上,以便其他workers访问。上面得到的split信息如下:
[hdfs://giraphx:9000/user/root/SSSP/1.txt:0+66, hdfs://giraphx:9000/user/root/SSSP/2.txt:0+46]
遍历splits List,为每个split创建一个Znode,值为split的信息。如为split-0创建Znode,值为:hdfs://giraphx:9000/user/root/SSSP/1.txt:0+66
/_hadoopBsp/job_201404102333_0013/_vertexInputSplitDir/0
为split-1创建znode(如下),值为:hdfs://giraphx:9000/user/root/SSSP/2.txt:0+46
/_hadoopBsp/job_201404102333_0013/_vertexInputSplitDir/1
最后创建znode: /_hadoopBsp/job_201404102333_0013/_vertexInputSplitsAllReady 表示所有splits都创建好了。
3、Master根据splits创建Partitions。首先确定partition的数目。
BSPServiceMaster中的MasterGraphPartitioner对象默认为HashMasterPartitioner。它的createInitialPartitionOwners()方法如下:
上面代码中是在工具类PartitionUtils计算Partition的数目,计算公式如下:
partitionCount=PARTITION_COUNT_MULTIPLIER availableWorkerInfos.size() availableWorkerInfos.size() ,其中PARTITION_COUNT_MULTIPLIER表示Multiplier for the current workers squared,默认值为1 。
可见,partitionCount值为4(122)。创建的partitionOwnerList信息如下:
[(id=0,cur=Worker(hostname=giraphx, MRtaskID=1, port=30001),prev=null,ckpt_file=null),
(id=1,cur=Worker(hostname=giraphx, MRtaskID=2, port=30002),prev=null,ckpt_file=null),
(id=2,cur=Worker(hostname=giraphx, MRtaskID=1, port=30001),prev=null,ckpt_file=null),
(id=3,cur=Worker(hostname=giraphx, MRtaskID=2, port=30002),prev=null,ckpt_file=null)]
4、Master创建Znode:/_hadoopBsp/job_201404102333_0013/_applicationAttemptsDir/0/_superstepDir/-1/_partitionExchangeDir,用于后面的exchange partition。
5、Master最后在assignPartitionOwners()方法中
把masterinfo,chosenWorkerInfoList,partitionOwners等信息写入Znode中(作为Znode的data),该Znode的路径为: /_hadoopBsp/job_201404102333_0013/_applicationAttemptsDir/0/_superstepDir/-1/_addressesAndPartitions 。
Master调用barrierOnWorkerList()方法开始等待各个Worker完成数据加载。调用关系如下:
barrierOnWorkerList中创建znode,path=/_hadoopBsp/job_201404102333_0013/_vertexInputSplitDoneDir 。然后检查该znode的子节点数目是否等于workers的数目,若不等于,则线程陷入等待状态。后面某个worker完成数据加载后,会创建子node(如 /_hadoopBsp/job_201404102333_0013/_vertexInputSplitDoneDir/giraphx_1)来激活该线程继续判断。
6、当Master创建第5步的znode后,会激活worker。
每个worker从znode上读出data,data包含masterInfo,WorkerInfoList和partitionOwnerList,然后各个worker开始加载数据。
把partitionOwnerList复制给BSPServiceWorker类中的workerGraphPartitioner(默认为HashWorkerPartitioner类型)对象的partitionOwnerList变量,后续每个顶点把根据vertexID通过workerGraphPartitioner对象获取其对应的partitionOwner。
每个Worker从znode: /_hadoopBsp/job_201404102333_0013/_vertexInputSplitDir获取子节点,得到inputSplitPathList,内容如下:
[/_hadoopBsp/job_201404102333_0013/_vertexInputSplitDir/1,
/_hadoopBsp/job_201404102333_0013/_vertexInputSplitDir/0]
然后每个Worker创建N个InputsCallable线程读取数据。N=Min(NUM_INPUT_THREADS,maxInputSplitThread),其中NUM_INPUT_THREADS默认值为1,maxInputSplitThread=(InputSplitSize-1/maxWorkers +1
那么,默认每个worker就是创建一个线程来加载数据。
在InputSplitsHandler类中的reserveInputSplit()方法中,每个worker都是遍历inputSplitPathList,通过创建znode来保留(标识要处理)的split。代码及注释如下:
当用reserveInputSplit()方法获取某个znode后,loadSplitsCallable类的loadInputSplit方法就开始通过该znode获取其HDFS的路径信息,然后读入数据、重分布数据。
VertexInputSplitsCallable类的readInputSplit()方法如下:
7、每个worker加载完数据后,调用waitForOtherWorkers()方法等待其他workers都处理完split。
策略如下,每个worker在/_hadoopBsp/job_201404102333_0013/_vertexInputSplitDoneDir目录下创建子节点,后面追加自己的worker信息,如worker1、worker2创建的子节点分别如下:
/_hadoopBsp/job_201404102333_0013/_vertexInputSplitDoneDir/giraphx_1
/_hadoopBsp/job_201404102333_0013/_vertexInputSplitDoneDir/giraphx_2
创建完后,然后等待master创建/_hadoopBsp/job_201404102333_0013/_vertexInputSplitsAllDone。
8、从第5步骤可知,若master发现/_hadoopBsp/job_201404102333_0013/_vertexInputSplitDoneDir下的子节点数目等于workers的总数目,就会在coordinateInputSplits()方法中创建
_hadoopBsp/job_201404102333_0013/_vertexInputSplitsAllDone,告诉每个worker,所有的worker都处理完了split。
9、最后就是就行全局同步。
master创建znode,path=/_hadoopBsp/job_201404102333_0013/_applicationAttemptsDir/0/_superstepDir/-1/_workerFinishedDir ,然后再调用barrierOnWorkerList方法检查该znode的子节点数目是否等于workers的数目,若不等于,则线程陷入等待状态。等待worker创建子节点来激活该线程继续判断。
每个worker获取自身的Partition Stats,进入finishSuperStep方法中,等待所有的Request都被处理完;把自身的Aggregator信息发送给master;创建子节点,如/_hadoopBsp/job_201404102333_0013/_applicationAttemptsDir/0/_superstepDir/-1/_workerFinishedDir/giraphx_1,data为该worker的partitionStatsList和workerSentMessages统计量;
最后调用waitForOtherWorkers()方法等待master创建/_hadoopBsp/job_201404102333_0013/_applicationAttemptsDir/0/_superstepDir/-1/_superstepFinished 节点。
master发现/_hadoopBsp/job_201404102333_0013/_applicationAttemptsDir/0/_superstepDir/-1/_workerFinishedDir的子节点数目等于workers数目后,根据/_hadoopBsp/job_201404102333_0013/_applicationAttemptsDir/0/_superstepDir/-1/_workerFinishedDir子节点上的data收集每个worker发送的aggregator信息,汇总为globalStats。
Master若发现全局信息中(1)所有顶点都voteHalt且没有消息传递,或(2)达到最大迭代次数 时,设置 globalStats.setHaltComputation(true)。告诉works结束迭代。
master创建/_hadoopBsp/job_201404102333_0013/_applicationAttemptsDir/0/_superstepDir/-1/_superstepFinished 节点,data为globalStats。告诉所有workers当前超级步结束。
每个Worker检测到master创建/_hadoopBsp/job_201404102333_0013/_applicationAttemptsDir/0/_superstepDir/-1/_superstepFinished 节点后,读出该znode的数据,即全局的统计信息。然后决定是否继续下一次迭代。
10、同步之后开始下一个超级步。
11、master和workers同步过程总结。
(1)master创建znode A,然后检测A的子节点数目是否等于workers数目,不等于就陷入等待。某个worker创建一个子节点后,就会唤醒master进行检测一次。
(2)每个worker进行自己的工作,完成后,创建A的子节点A1。然后等待master创建znode B。
(3)若master检测到A的子节点数目等于workers的数目时,创建Znode B
(4)master创建B 节点后,会激活各个worker。同步结束,各个worker就可以开始下一个超步。
本质是通过znode B来进行全局同步的。
低调大师中文资讯倾力打造互联网数据资讯、行业资源、电子商务、移动互联网、网络营销平台。
持续更新报道IT业界、互联网、市场资讯、驱动更新,是最及时权威的产业资讯及硬件资讯报道平台。
转载内容版权归作者及来源网站所有,本站原创内容转载请注明来源。
- 上一篇
Apache NiFi之MySQL数据同步到HBase
一.说明 将Apache NiFi做为关系型数据与非关系型数据库的数据同步工具使用,在此场景中需要将mysql导出的avro数据格式转化为json入库HBase 二.开拔 Ⅰ).配置ExecuteSQLRecord a).选择ExecuteSQLRecord 在Processor中搜索ExecuteSQLRecord b).配置ExecuteSQLRecord 1.创建Database Connection Pool2.创建JsonRecordSetWriter3.配置SQL select query select Host,User,authentication_string from mysql.user; c).创建DBCPConnectionPool 在Database Connection Pool中选择DBCPConnectionPool d).配置DBCPConnectionPool 1.Database Connection URL: jdbc:mysql://hostname:3306/druid2.Database Driver Class Name: com.my...
- 下一篇
【干货合集 视频+资料下载】2019大数据技术公开课第三季 | 阿里巴巴大数据产品最新特性介绍
7月26日,阿里云飞天大数据平台在阿里云峰会上海站正式亮相。 飞天大数据平台是当前国内规模最大的计算平台。数据显示,目前飞天大数据平台可扩展至10万台计算集群,曾创下四项海量数据排序世界纪录。在阿里巴巴经济体中支撑了全局数据存储和计算,单日数据处理量从2015年100PB、2016年180PB、2017年320PB,到2018年超过600PB,仅用三年时间提升5倍。 2019大数据技术公开课第三季将为大家带来飞天大数据平台系列产品的最新特性介绍,扫描海报下方任一二维码进群,即可观看直播。 阿里巴巴大数据产品最新特性介绍—MaxCompute 8月13日 19:00-19:40 直播回看 >>>阅读文字版 >>>资料下载 >>>产品官网 >>>阿里巴巴大数据产品最新特
相关文章
文章评论
共有0条评论来说两句吧...
文章二维码
点击排行
-
Docker使用Oracle官方镜像安装(12C,18C,19C)
- Springboot2将连接池hikari替换为druid,体验最强大的数据库连接池
- CentOS8编译安装MySQL8.0.19
- Docker快速安装Oracle11G,搭建oracle11g学习环境
- SpringBoot2配置默认Tomcat设置,开启更多高级功能
- MySQL8.0.19开启GTID主从同步CentOS8
- CentOS7,8上快速安装Gitea,搭建Git服务器
- Jdk安装(Linux,MacOS,Windows),包含三大操作系统的最全安装
- SpringBoot2编写第一个Controller,响应你的http请求并返回结果
推荐阅读
最新文章
- Springboot2将连接池hikari替换为druid,体验最强大的数据库连接池
- Docker使用Oracle官方镜像安装(12C,18C,19C)
- CentOS6,CentOS7官方镜像安装Oracle11G
- SpringBoot2整合Redis,开启缓存,提高访问速度
- Jdk安装(Linux,MacOS,Windows),包含三大操作系统的最全安装
- SpringBoot2配置默认Tomcat设置,开启更多高级功能
- SpringBoot2整合MyBatis,连接MySql数据库做增删改查操作
- Hadoop3单机部署,实现最简伪集群
- MySQL8.0.19开启GTID主从同步CentOS8
- SpringBoot2编写第一个Controller,响应你的http请求并返回结果