Storm的数据处理编程单元:Bolt 学习整理
Bolt是Topology中的数据处理的单元,也是Storm针对处理过程的编程单元。Topology中所有的处理都是在这些Bolt中完成的,编程人员可以实现自定义的处理过程,例如,过滤、函数、聚集、连接等计算。如果是复杂的计算过程,往往需要多个步骤和使用多个Bolt。
Bolt可以将数据项发送至多个数据流(Stream)。编程人员首先可以使用OutputFieldsDeclarer类的declareStream()方法来声明多个流,指定数据将要发送到的流,然后使用SpoutOutputCollector的emit方法将数据发送。
当声明了一个Bolt的输入流后,可以从其他的组件中接收这些指定的流。当接收某个组件的所有流时,需要在程序中逐个声明接收的过程。InputDeclarer对象默认接收来自某组件默认的流。
//从名称为"1"的组件中接收默认的流。 declarer.shuffleGrouping("1")
IBolt 和 IComponent接口
IBolt接口:
//在组件的任务初被初始化时,由集群中的工作进程(worker)调用,prepare()用于实例化Bolt的已给运行时任务,被集群中的某一个进程调用,提供Bolt运行的环境。
//sormConf对象维护Storm中针对该Bolt的配置信息。(来自Topology);context对象是一个上下文对象,用于获取该组件运行时任务的信息。(例如Topology中该Bolt所有任务的位置,包括任务的id、组件id和输入输出信息等)
//collector对象用于从该Bolt发送数据项。数据项可以在任意时刻发送,包括调用open()和close()方法。
void prepare(java.util.Map stormConf,TopologyContext context,OutputCollector collector) //接收一个数据项并处理
//该方法用来接收一个数据项(Tuple),并可以将处理的结果作为新的数据项发送(emit),是Bolt需要实现的最重要的方法。
//参数imput是一个数据项对象,包含了众多的元数据(metadata),包括它来自的组件、流、任务等。数据项中的值,可以通过Tuple类的getValue()方法获得。
void execute(Tuple input) //在IBolt将关闭时调用 void cleanup()
Tuple类的方法,这个类的对象作为execute()方法的输入。(方法举例: int size() ; int fieldIndex(java.lang.String field) ; ......)
方法众多,可以整理分为以下五类:
1、获取属性的方法。 (size()、fieldIndex()和contains()三个方法)
2、获取元数据的方法。(getMessageId()、getSourceComponent()、getSourceTask()、getSourceStreamId()和getSourceGlobalStreamid()方法)
其中MessageId是在数据项被创建时,通过一定的规则赋值的。
3、根据域获取值的方法。(getValue()和多个get具体数据类型的方法)
4、根据域的名称获取值的方法。(这一类包括getFields()、getValues()和select()方法)
5、获取Tuple的值或域列表的方法。(getFields()、getValues()和select()方法)
分别获取该数据项的所有域列表、值列表和值列表子集。
简单的案例:
class SplitSentence implements IRichBolt { private OutputCollector collector; public void prepare(Map conf,TopologyContext context,OutputCollector collector){ this.collector = collector; } public void execute(Tuple tuple){ String sentence = tuple.getString(0); for(String word : sentence.split(" "){ collector.emit(new Values(word)); } } public void cleanup(){ } public void declareOutputFields(OutpuFieldsDeclarer declarer){ declarer.declare(new Fields("word")); } }
这里说下declareOutputFields()函数参数,声明了输出流的数据项的结构,也即Tuple的域。
结合上节给的Spout的示例,可以在Topology类的main函数中加入相关代码,增加Bolt。
Topology builder builder = new TopologyBuilder(); Builder.SetSpout ("SentenceGenSpout ",new TestWord Spout(),1); builder.setBolt("splitBoult",new SplitSentence(),2).shuffleGrouping("sentenceGenSpout");

低调大师中文资讯倾力打造互联网数据资讯、行业资源、电子商务、移动互联网、网络营销平台。
持续更新报道IT业界、互联网、市场资讯、驱动更新,是最及时权威的产业资讯及硬件资讯报道平台。
转载内容版权归作者及来源网站所有,本站原创内容转载请注明来源。
- 上一篇
Spark学习之编程进阶——累加器与广播(5)
Spark学习之编程进阶——累加器与广播(5) 1. Spark中两种类型的共享变量:累加器(accumulator)与广播变量(broadcast variable)。累加器对信息进行聚合,而广播变量用来高效分发较大的对象。 2. 共享变量是一种可以在Spark任务中使用的特殊类型的变量。 3. 累加器的用法: 通过在驱动器中调用SparkContext.accumulator(initialValue)方法,创建出存有初始值的累加器。返回值为org.apache.spark.Accumlator[T]对象,其中T是初始值initialValue的类型。 Spark闭包里的执行器代码可以使用累加器的+=方法(在Java中是add)增加累加器的值。 驱动器程序可以调用累加器的value属性(在Java中使用value()或setValue()来访问累加器的值。 Python中实现累加空行 file = sc.textFile(inputFile) #创建Accumulator[Int]并初始化为0 blankLines = sc.accumulator(0) def extractCa...
- 下一篇
Spark Streaming 的一些问题
Spark Streaming 的一些问题,做选型前关注这些问题可以有效的降低使用风险。 checkpoint checkpoint 是个很好的恢复机制。但是方案比较粗暴,直接通过序列化的机制写入到文件系统,导致代码变更和配置变更无法生效。实际场景是升级往往比系统崩溃的频率高太多。但是升级需要能够无缝的衔接上一次的偏移量。所以spark streaming在无法容忍数据有丢失的情况下,你需要自己记录偏移量,然后从上一次进行恢复。 我们目前是重写了相关的代码,每次记录偏移量,不过只有在升级的时候才会读取自己记录的偏移量,其他情况都是依然采用checkpoint机制。 Kafka 这个和Spark Streaming相关,也不太相关。说相关是因为Spark 对很多异常处理比较简单。很多是和Kafka配置相关的。我举个例子: 如果消息体太大了,超过 fetch.message.max.bytes=1m,那么Spark Streaming会直接抛出OffsetOutOfRangeException异常,然后停止服务。 对应的错误会从这行代码抛出: if (!iter.hasNext) { as...
相关文章
文章评论
共有0条评论来说两句吧...