Storm之Collector-p1
IBasicOutputCollector.java
List<Integer> emit(String streamId, List<Object> tuple);
提交一系列的tuple,返回接收到这些tuple的taskId
void emitDirect(int taskId, String streamId, List<Object> tuple);
直接向某个task提交一系列的tuple
BasicOutputCollector.java
这个类里封装了一个OutputCollector的代理和一个inputTuple
OutputCollector是在构造函数里传入的,在Bolt处理完tuple之后调用此类的emit方法时,方法内部会调用封装的OutputCollector来进行emit,
最终的emit是OutputCollector的emit
此类还提供了两个emit方法的重载,目的是在没有指定streamId的时候提供一个默认名为“default”的streamId.
此类的emit方法没有提供anchors参数,每次bolt执行完之后进行emit时会自动将输入tuples和输出tuples关联,如果不需要关联,则可不用此类。
IOutputCollector.java
List<Integer> emit(String streamId, Collection<Tuple> anchors, List<Object> tuple);
提交一系列的tuple,返回接收到这些tuple的taskId,anchors参数是指接收到的tuples
void emitDirect(int taskId, String streamId, Collection<Tuple> anchors, List<Object> tuple);
直接向某个task提交一系列tuple,同样的也会附带上输入的tuple
void ack(Tuple input);
ack某个tuple
void fail(Tuple input);
fail某个tuple
OutputCollector.java
封装了一个IOutputCollector的代理,该代理在构造函数时传递进来被初始化。
提供了多个emit方法的重载,基本上包括有(单个anchor,无anchor,无指定streamId,只有输出的tuple)这些
emitDirect重载的方式也基本上一样,都是为了使用方便来做的。
当然,最终的emit,ack和fail都是通过代理来实现的。
CoordinatedOutputCollector.java
这个类比较奇葩,是定义在CoordinatedBolt的内部类,只有CoordinatedBolt这个类使用。
封装了一个IOutputCollector代理,该代理在构造函数时被初始化。
此类没有重载emit和emitDirect方法,但是在emit和emitDirect方法内部会调用一个名为updateTaskCounts的方法
private void updateTaskCounts(Object id, List<Integer> tasks) { synchronized(_tracked) { TrackingInfo track = _tracked.get(id); if (track != null) { Map<Integer, Integer> taskEmittedTuples = track.taskEmittedTuples; for(Integer task: tasks) { int newCount = get(taskEmittedTuples, task, 0) + 1; taskEmittedTuples.put(task, newCount); } } } }
这个方法主要是更新目标task和向其发送的tuple数量关系,其关系维护在_tracked变量里,关系链为
tuple_id —> task_id —> num
public void ack(Tuple tuple) { Object id = tuple.getValue(0); synchronized(_tracked) { TrackingInfo track = _tracked.get(id); if (track != null) track.receivedTuples++; } boolean failed = checkFinishId(tuple, TupleType.REGULAR); if(failed) { _delegate.fail(tuple); } else { _delegate.ack(tuple); } }
将收到的tupleID对应的跟踪信息中receivedTuples(已接收数量)+1 ,然后检查是否已经处理完该tupleID对应的任务,如果检查失败就fail回上一个bolt
TupleType.REGULAR是为了保证不是传递ID的也不是传递数量的流。
public void fail(Tuple tuple) { Object id = tuple.getValue(0); synchronized(_tracked) { TrackingInfo track = _tracked.get(id); if (track != null) track.failed = true; } checkFinishId(tuple, TupleType.REGULAR); _delegate.fail(tuple); }
设置跟踪信息failed,然后checkFinishId方法中会fail所有应该ack的tuples,然后删除这个tupleID对应的跟踪信息

低调大师中文资讯倾力打造互联网数据资讯、行业资源、电子商务、移动互联网、网络营销平台。
持续更新报道IT业界、互联网、市场资讯、驱动更新,是最及时权威的产业资讯及硬件资讯报道平台。
转载内容版权归作者及来源网站所有,本站原创内容转载请注明来源。
- 上一篇
Storm之Bolt-接口
IBolt: bolt接口类,定义了常用的几个接口,IBolt的实现类在client上被创建,然后序列化到拓扑里并被提交到集群的master上,之后nimbus会启动worker进行反序列化,调用prepare进行准备完毕之后就开始处理tuples 如果是在java里定义bolts ,建议实现IRichBolt.java接口类,IRichBolt.java同时继承了IComponent.java接口,提供了更多对拓扑进行操作的方法。 /** * 当集群中的worker初始化一个跟当前Bolt相关的task时候被调用,此方法提供和准备bolt执行时的环境. * @param stormConf 此bolt使用的storm配置,合并了本机和集群的配置,将会提供给topology * @param context task的上下文,可以获取taskId,componentId,input,output等 * @param collector 用于任意时刻提交bolt里的tuples,collector是线程安全的,应当保存在Bolt里. */ void prepare(Ma...
- 下一篇
DataXceiver error processing unknown operation src: /127.0.0.1:36479 ...
异常信息如下: 2015-12-0917:39:20,310ERRORdatanode.DataNode(DataXceiver.java:run(278))-hadoop07:50010:DataXceivererrorprocessingunknownoperationsrc:/127.0.0.1:36479dst:/127.0.0.1:50010 java.io.EOFException atjava.io.DataInputStream.readShort(DataInputStream.java:315) atorg.apache.hadoop.hdfs.protocol.datatransfer.Receiver.readOp(Receiver.java:58) atorg.apache.hadoop.hdfs.server.datanode.DataXceiver.run(DataXceiver.java:227) atjava.lang.Thread.run(Thread.java:745) 原因: Ambari 每分钟会向datanode发送"ping"连接一下去确保...
相关文章
文章评论
共有0条评论来说两句吧...
文章二维码
点击排行
推荐阅读
最新文章
- Eclipse初始化配置,告别卡顿、闪退、编译时间过长
- CentOS6,CentOS7官方镜像安装Oracle11G
- SpringBoot2整合MyBatis,连接MySql数据库做增删改查操作
- Docker使用Oracle官方镜像安装(12C,18C,19C)
- Hadoop3单机部署,实现最简伪集群
- CentOS7编译安装Gcc9.2.0,解决mysql等软件编译问题
- SpringBoot2配置默认Tomcat设置,开启更多高级功能
- SpringBoot2整合Redis,开启缓存,提高访问速度
- Windows10,CentOS7,CentOS8安装Nodejs环境
- MySQL8.0.19开启GTID主从同步CentOS8