Storm的BaseBasicBolt源码解析ack机制
我们在学习ack机制的时候,我们知道Storm的Bolt有BaseBasicBolt和BaseRichBolt。
在BaseBasicBolt中,BasicOutputCollector在emit数据的时候,会自动和输入的tuple相关联,而在execute方法结束的时候那个输入tuple会被自动ack。
在使用BaseRichBolt需要在emit数据的时候,显示指定该数据的源tuple要加上第二个参数anchor tuple,以保持tracker链路,即collector.emit(oldTuple, newTuple);并且需要在execute执行成功后调用OutputCollector.ack(tuple), 当失败处理时,执行OutputCollector.fail(tuple);
那么我们来看看BasicBolt的源码是不是这样的,不能因为看到别人的帖子说是这样的,我们就这样任务,以讹传讹,我们要To see is to believe。
为了方便看源代码,我先上我们的继承类:
public class SplitSentenceBolt extends BaseBasicBolt { public void prepare(Map stormConf, TopologyContext context) { super.prepare(stormConf, context); }
//5:执行我们自己的逻辑处理方法,接收传入的参数。
public void execute(Tuple input, BasicOutputCollector collector) { String sentence = (String)input.getValueByField("sentence"); String[] words = sentence.split(" "); for (String word : words) { word = word.trim(); word = word.toLowerCase(); collector.emit(new Values(word,1));//这个地方就是调用OutputCollector的包装类,来发消息 } } public void declareOutputFields(OutputFieldsDeclarer declarer) { declarer.declare(new Fields("word","num")); } }
通过打断点,我们发现,bolt的task会创建这个类下面会标准执行顺序
public class BasicBoltExecutor implements IRichBolt { public static Logger LOG = LoggerFactory.getLogger(BasicBoltExecutor.class); private IBasicBolt _bolt; private transient BasicOutputCollector _collector; //1:创建该对象,然后把我们写的SplitSentenceBolt对象赋给父类IBasicBolt。 public BasicBoltExecutor(IBasicBolt bolt) { _bolt = bolt; } public void declareOutputFields(OutputFieldsDeclarer declarer) { _bolt.declareOutputFields(declarer);//这里就是调用SplitSentenceBolt对象的方法了。 } //2:给BasicOutputCollector _collector字段赋值,BasicOutputCollector就是对OutputCollector类的包装。 public void prepare(Map stormConf, TopologyContext context, OutputCollector collector) { _bolt.prepare(stormConf, context); _collector = new BasicOutputCollector(collector); } //3:然后程序执行该方法,input的值source: spout1:4, stream: default, id: {}, [+ - * % /] public void execute(Tuple input) { _collector.setContext(input);//把接收到的tuple值设置给BasicOutputCollector中inputTuple字段。 try { _bolt.execute(input, _collector);//这个地方是调用我们实现类SplitSentenceBolt的ececute方法。 _collector.getOutputter().ack(input);//这个地方就是响应 } catch(FailedException e) { if(e instanceof ReportedFailedException) { _collector.reportError(e); } _collector.getOutputter().fail(input);//这个地方就是响应 } } public void cleanup() { _bolt.cleanup(); } public Map<String, Object> getComponentConfiguration() { return _bolt.getComponentConfiguration(); } }
public class BasicOutputCollector implements IBasicOutputCollector { private OutputCollector out; private Tuple inputTuple; public BasicOutputCollector(OutputCollector out) { this.out = out; }
//4:把收到的tuple数据赋值给inputTuple,这个时候BasicOutputCollector对象的字段都具有值了。
public void setContext(Tuple inputTuple) { this.inputTuple = inputTuple; }
//6:这里我们发送新的(转换后的)tuple数据,看他内部的调用,其实他也会发送一个anchor tuple来保持tracker链路,
而这个anchor tuple就是bolt接收到转换前的源tuple数据。
public List<Integer> emit(List<Object> tuple) {
return emit(Utils.DEFAULT_STREAM_ID, tuple);
} public List<Integer> emit(String streamId, List<Object> tuple) { return out.emit(streamId, inputTuple, tuple); } public void emitDirect(int taskId, String streamId, List<Object> tuple) { out.emitDirect(taskId, streamId, inputTuple, tuple); } public void emitDirect(int taskId, List<Object> tuple) { emitDirect(taskId, Utils.DEFAULT_STREAM_ID, tuple); } protected IOutputCollector getOutputter() { return out; } public void reportError(Throwable t) { out.reportError(t); } }
这里大家不要纠结bolt的启动时从哪里开始的,我后面会讲的,这里我们关注的是,BasicBoltExecutor对象创建后的执行过程,以这我们来看执行的过程。在BasicBoltExecutor的execute方法中,我们看到了ack和fail方法会被自动调用的,当我们的程序抛出异常则会执行fail方法的。

低调大师中文资讯倾力打造互联网数据资讯、行业资源、电子商务、移动互联网、网络营销平台。
持续更新报道IT业界、互联网、市场资讯、驱动更新,是最及时权威的产业资讯及硬件资讯报道平台。
转载内容版权归作者及来源网站所有,本站原创内容转载请注明来源。
- 上一篇
[Presto]什么是Presto
1. Presto不是什么 虽然Presto一直被一些个人或者团体称为数据库,但是Presto并不是数据库。 千万不要以为Presto可以解析SQL,那么Presto就是一个标准的数据库。Presto并不是传统意义上的数据库。Presto并不是MySQL、PostgreSQL或者Oracle的代替品。Presto并不能用来处理在线事务。其实很多其他的数据库产品也是被用来设计为数据仓库或者数据分析工具,但是也不能处理在线事务。 2. Presto是什么 Presto通过使用分布式查询,可以快速高效的完成海量数据的查询。如果你需要处理TB或者PB级别的数据,那么你可能更希望借助于Hadoop和HDFS来完成这些数据的处理。作为Hive和Pig(Hive和Pig都是通过MapReduce的管道流来完成HDFS数据的查询)的替代者,Presto不仅可以访问HDFS,也可以操作不同的数据源,包括:RDBMS和其他的数据源(例如:Cassandra)。 Presto被设计为数据仓库和数据分析产品:数据分析、大规模数据聚集和生成报表。这些工作经常通常被认为是线上分析处理操作。 3. Presto系统...
- 下一篇
原来MaxCompute还能这么玩系列(2)—— 利用HiveServer2 Proxy实现MaxCompute与Hive生态工具的互通
注:MaxCompute原名ODPS,是阿里云自研的大数据计算平台,文中出现的MaxCompute与ODPS都指代同一平台,不做区分 什么是Hive Hive是一款经典的hadoop技术栈的数仓软件,可以让用户采用SQL来完成大数据量的计算分析。如果你对Hive还不熟悉,请移步Apache Hive官网获取进一步了解。MaxCompute在很多功能上与Hive相近,所以大部分MaxCompute的用户曾经也是Hive的用户。 什么是HiveServer2 既然提到HiveServer2,那得先介绍一下HiveServer1,我们通常也直接称之为HiveServer。HiveServer是基于Apache Thrift构建的一套服务,它支持远程客户端通过Thrift API向Hive提交请求。由于HiveServer1无法处理超过一个以上客
相关文章
文章评论
共有0条评论来说两句吧...
文章二维码
点击排行
推荐阅读
最新文章
- SpringBoot2更换Tomcat为Jetty,小型站点的福音
- Docker快速安装Oracle11G,搭建oracle11g学习环境
- SpringBoot2全家桶,快速入门学习开发网站教程
- CentOS7设置SWAP分区,小内存服务器的救世主
- Docker使用Oracle官方镜像安装(12C,18C,19C)
- CentOS8编译安装MySQL8.0.19
- SpringBoot2编写第一个Controller,响应你的http请求并返回结果
- SpringBoot2整合Redis,开启缓存,提高访问速度
- CentOS8,CentOS7,CentOS6编译安装Redis5.0.7
- Red5直播服务器,属于Java语言的直播服务器