Storm Topology及分组原理
Storm的通信机制,需要满足如下一些条件以满足Storm的语义。
1、建立数据传输的缓冲区。在通信连接没有建立之前把发送的数据缓存起来。数据发送方可以在连接建立之前发送消息,而不需要等连接建立起来,可是的接收方是独立运行的。
2、在消息传输层保证消息最多只能发送一次,Storm系统有ACK机制,是的没有被发送成功的消息会被重发,若消息层面也重发,会导致消息发送多次。
这种消息机制由两个接口来定义,backtype.storm.messaging.IContext和backtype.storm.messaging.IConnection.
IContext负责客户端和服务器端建立的连接,主要有四个方法。
1、prepare(Map stormConf):总从Storm定义的prepare方法,可以接收storm的配置。
2、term():终止,方法会在worker卸载这个传输插件的时候调用,自定义实现时可以在这里释放占用的资源。
3、bind(String topologyId,int port):建立服务器端的连接。
4、connect(String stormId,String host,int port):建立一个客户端的连接。
IConnect定义了在IContext上发送、接收数据的接口。
1、recv(int flag):接收消息。
2、send(int taskId,byte[] payload):发送消息。
3、close():该连接关闭的时候调用,释放相关资源。
Topology原理整理
从运行时Topology的实际执行过程角度,作业是由多个组件的实例,也即任务,按照构造时简历的逻辑顺序呢和配置的并发度,形成的数据流图结构。
流(stream)是Storm中对传递的数据进行的抽象,流是时间上无限的数据项Tuple序列。Spout是Stream的源,为Topology从特定数据源获取数据项,并向作业中发射(emit)形成Stream。(项目中使用了kafkaspout,接收后进行数据校验再使用emit发送给bolt),bolt可以同时接受任意多个上游送达的Stream作为输入,进行数据的处理过程,也可以在bolt做完处理后执行(emit)发射新的Stream继续给下游的Bolt进行处理。
Stream中的Tuple可以被指定结构,由一个或多个域(field)组成。Tuple的定义不必是严格统一的,而是可以在每个spout,bolt中定义。默认情况下Tuple可以包含基本类型,如integers、longs、shorts、bytes、strings、doubles、floats、booleans和byte arrays.
流组模式
1、Shuffle Grouping 随机分组
public void createTopology(TopologyBuilder builder){ kafkaSpout kafkaspout = getKafkaSpout(topicName); //Topology中增加一个Spout builder.setSpout(...) //在Topology中增加一个Bolt,可设置并行度,以随机分组的方式发送,shuffleGrouping后的参数为源组建的Id builder.setBolet(boltName,new BlackListBolt(),3).shuffleGrouping(spoutName);
}
在这种流组模式下,源组件将其发送的数据项,以随机的方式向其所有目标组件发送,可以保证每个目标组件收到数量近似的Tuple。
2、All Grouping 副本分组
//allGrouping(java.lang.String componentId) //allGrouping(java.lang.String componentId,java.lang.String streamId) //参数streamId是声明的流的标识 builder.setBolet(boltName,new BlackListBolt(),3).allGrouping(spoutName,"signals“);
在这种模式下,源组件将其发送的数据项,以副本的形式向其所有目标组件发送,可以保证每个目标组件均收到同一个Tuple,就好比zookeeper的配置文件同步一样,每个bolt都会收到同一份。
3、Global Grouping 全局分组
这种模式下,源组件将其发送的数据项,全部发送给目标组件的某一个实例,而且该实例是这个组件中ID最小的那个任务。可以保证所有数据项只会被目标组件的一份实例(一个bolt)所处理
builder.setBolet(boltName,new BlackListBolt(),3).globalGrouping(SpoutName);
4.Fiellds Grouping 按域分组
builder.setBolet(boltName,new BlackListBolt(),3).fieldsGrouping(spoutName,new Field("域名");
源组件将其发送的数据项,按Tuple中指定域的值分组,向下游目标组件发送,可以保证拥有相同域组合的值的Tuple,被发送给同一个Bolt.
5、Direct Grouping 直接分组
builder.setSpout("kafkaSpout",topicSpout) builder.setBolt(boltname1,new boltName1(),1).shuffleGrouping("kafkaSpout"); //以直接分组的模式接收上述bolt发送的数据项 builder.setBolt(boltname2,new boltname2(),2).directGrouping(boltname1);
源组件将其发送的数据项,以直接指定目标组件的方式发送,可以使指定组件接收给定的Tuple.需要注意的是,接收bolt的executle()函数中,哟啊使用emitDirect()替代emit,用于向指定的具名流中发送数据项
构建Topology
构建TopologyBuilder主要给出了三类方法:创建Topology、增加bolt和增加Spout的方法。setBolt和setSpout接口各有不同多种重载方法,均返回用于声明组件输入的对象。
1、id:组件(spout、Bolt)的标识,字符串类型,若需要引用该组件,就使用这里指定的标识ID。比如使用"kafkaSpout"
2、bolt:添加的bolt对象,再setBolt的重载方法中,存在IRichBolt和IBasicBolt两类bolt参数,项目中用到的是IRichBolt,区别在于,BasicBolt用于非聚集处理,能够自动进行(anchoring)和(acking)
3、spout:添加的Spout对象,在setSpout方法中该参数是IRichSpout类型的Spout接口。
4、parallelism_hint:并行度,数值型参数。设置组件运行时将要被分配的线程数量。
参考:《Storm 大数据流式计算及应用实践》

低调大师中文资讯倾力打造互联网数据资讯、行业资源、电子商务、移动互联网、网络营销平台。
持续更新报道IT业界、互联网、市场资讯、驱动更新,是最及时权威的产业资讯及硬件资讯报道平台。
转载内容版权归作者及来源网站所有,本站原创内容转载请注明来源。
- 上一篇
MapReduce编程job概念原理
在Hadoop中,每个MapReduce任务都被初始化为一个job,每个job又可分为两个阶段:map阶段和reduce阶段。这两个阶段分别用两个函数来表示。Map函数接收一个<key,value>形式的输入,然后同样产生一个<ey,value>形式的中间输出,Hadoop会负责将所有具有相同中间key值的value集合在一起传递给reduce函数,reduce函数接收一个如<key,(list of values)>形式的输入,然后对这个value集合进行处理,每个reduce产生0或1个输出,reduce的输出也是<key,value>形式。 简易代码: public static class Map extends MapReduceBase implments Mapper<LongWritable,Text,Text,IntWritable>{ //设置常量1,用来形成<word,1>形式的输出 private fianll static IntWritable one = new IntWritabl...
- 下一篇
如何在eclipse调试mapreduce程序
如何在eclipse或myeclipse调试mapreduce程序,这个可能是初学mr程序者碰到的一个难题 在hadoop1.2.1后,在下载的源代码中找不到hadoop-eclipse-plugin相关的jar或源代码。 其实hadoop目前使用maven进行源代码的管理与调试,可以参考文献: http://blog.cloudera.com/blog/2012/08/developing-cdh-applications-with-maven-and-eclipse/ A sample POM for setting up a basic Maven project for CDH application development https://gist.github.com/jnatkins/3517129 注意:CDH是hadoop的封装版本,很稳定,并且更新也很快。 如果需要在eclipse下编写MR程序并进行调试,需要以下前提条件: 1:安装maven,建议安装maven3.0.4或上以版本 2:使用eclipse较新的版本,如Kepler Service Release ...
相关文章
文章评论
共有0条评论来说两句吧...
文章二维码
点击排行
推荐阅读
最新文章
- SpringBoot2配置默认Tomcat设置,开启更多高级功能
- Docker快速安装Oracle11G,搭建oracle11g学习环境
- SpringBoot2整合Redis,开启缓存,提高访问速度
- CentOS8安装MyCat,轻松搞定数据库的读写分离、垂直分库、水平分库
- Windows10,CentOS7,CentOS8安装Nodejs环境
- CentOS7设置SWAP分区,小内存服务器的救世主
- 设置Eclipse缩进为4个空格,增强代码规范
- CentOS关闭SELinux安全模块
- CentOS8,CentOS7,CentOS6编译安装Redis5.0.7
- Jdk安装(Linux,MacOS,Windows),包含三大操作系统的最全安装