Storm-源码分析-Topology Submit-Task
mk-task, 比较简单, 因为task只是概念上的结构, 不象其他worker, executor都需要创建进程或线程
所以其核心其实就是mk-task-data,
1. 创建TopologyContext对象, 其实就是把之前的topology对象和worker-data混合到一起, 便于task在执行时可以取到需要的topology信息.
2. 创建task-object, spout-object或bolt-object, 封装相应的逻辑, 如nextTuple, execute
3. 生成tasks-fn, 名字起的不好,让人误解执行了task的功能, 其实就是做些emit之间的准备工作, 其中最重要的就是调用grouper去产生targets task, 当然还包含些metrics, hooks的调用.
说白了其实mk-tasks, 没做啥事
(defn mk-task [executor-data task-id] (let [task-data (mk-task-data executor-data task-id) ;;1 mk-task-data storm-conf (:storm-conf executor-data)] (doseq [klass (storm-conf TOPOLOGY-AUTO-TASK-HOOKS)] ;; add预定义的hooks (.addTaskHook ^TopologyContext (:user-context task-data) (-> klass Class/forName .newInstance))) ;; when this is called, the threads for the executor haven't been started yet, ;; so we won't be risking trampling on the single-threaded claim strategy disruptor queue (send-unanchored task-data SYSTEM-STREAM-ID ["startup"]) ;;向SYSTEM-STREAM, 发送startup通知,谁会接收SYSTEM-STREAM…? task-data ))
1 mk-task-data
(defn mk-task-data [executor-data task-id] (recursive-map :executor-data executor-data :task-id task-id :system-context (system-topology-context (:worker executor-data) executor-data task-id) :user-context (user-topology-context (:worker executor-data) executor-data task-id) :builtin-metrics (builtin-metrics/make-data (:type executor-data)) :tasks-fn (mk-tasks-fn <>) :object (get-task-object (.getRawTopology ^TopologyContext (:system-context <>)) (:component-id executor-data))))
1.1 TopologyContext
Storm-源码分析-Topology Submit-Task-TopologyContext
:system-context, :user-context, 只是context中的topology对象不同, system为system-topology!
1.2 builtin-metrics/make-data
这里的builtin-metrics用来记录spout或bolt的执行状况的metrics
1.3 mk-tasks-fn
返回tasks-fn, 这个函数主要用于做emit之前的准备工作, 返回target tasks list
1. 调用grouper, 产生target tasks
2. 执行emit hook
3. 满足sampler条件时, 更新stats和buildin-metrics
task-fn, 两种不同参数版本
[^String stream ^List values], 这个版本好理解些, 就是将stream对应的component的target tasks都算上(一个stream可能有多个out component, 一份数据需要发到多个bolt处理)
[^Integer out-task-id ^String stream ^List values], 指定out-task-id, 即direct grouping
这里对out-task-id做了验证
out-task-id (if grouping out-task-id), 即out-task-id->component->grouper不为nil(为:direct?), 即验证这个stream确实有到该out-task-id对应component
如果验证失败, 将out-task-id置nil
(defn mk-tasks-fn [task-data] (let [task-id (:task-id task-data) executor-data (:executor-data task-data) component-id (:component-id executor-data) ^WorkerTopologyContext worker-context (:worker-context executor-data) storm-conf (:storm-conf executor-data) emit-sampler (mk-stats-sampler storm-conf) stream->component->grouper (:stream->component->grouper executor-data) ;;Storm-源码分析-Streaming Grouping user-context (:user-context task-data) executor-stats (:stats executor-data) debug? (= true (storm-conf TOPOLOGY-DEBUG))] (fn ([^Integer out-task-id ^String stream ^List values] (when debug? (log-message "Emitting direct: " out-task-id "; " component-id " " stream " " values)) (let [target-component (.getComponentId worker-context out-task-id) component->grouping (get stream->component->grouper stream) grouping (get component->grouping target-component) out-task-id (if grouping out-task-id)] (when (and (not-nil? grouping) (not= :direct grouping)) (throw (IllegalArgumentException. "Cannot emitDirect to a task expecting a regular grouping"))) (apply-hooks user-context .emit (EmitInfo. values stream task-id [out-task-id])) (when (emit-sampler) (builtin-metrics/emitted-tuple! (:builtin-metrics task-data) executor-stats stream) (stats/emitted-tuple! executor-stats stream) (if out-task-id (stats/transferred-tuples! executor-stats stream 1) (builtin-metrics/transferred-tuple! (:builtin-metrics task-data) executor-stats stream 1))) (if out-task-id [out-task-id]) )) ([^String stream ^List values] (when debug? (log-message "Emitting: " component-id " " stream " " values)) (let [out-tasks (ArrayList.)] (fast-map-iter [[out-component grouper] (get stream->component->grouper stream)] (when (= :direct grouper) ;; TODO: this is wrong, need to check how the stream was declared (throw (IllegalArgumentException. "Cannot do regular emit to direct stream"))) (let [comp-tasks (grouper task-id values)] ;;执行grouper, 产生target tasks (if (or (sequential? comp-tasks) (instance? Collection comp-tasks)) (.addAll out-tasks comp-tasks) (.add out-tasks comp-tasks) ))) (apply-hooks user-context .emit (EmitInfo. values stream task-id out-tasks)) ;;执行事先注册的emit hook (when (emit-sampler) ;;满足抽样条件时, 更新stats和buildin-metrics中的emitted和transferred metric (stats/emitted-tuple! executor-stats stream) (builtin-metrics/emitted-tuple! (:builtin-metrics task-data) executor-stats stream) (stats/transferred-tuples! executor-stats stream (count out-tasks)) (builtin-metrics/transferred-tuple! (:builtin-metrics task-data) executor-stats stream (count out-tasks))) out-tasks))) ))
1.4 get-task-object
取出component的对象,
比如对于Spout, 取出SpoutSpec中的ComponentObject spout_object, 包含了spout的逻辑, 比如nextTuple()
(defn- get-task-object [^TopologyContext topology component-id] (let [spouts (.get_spouts topology) bolts (.get_bolts topology) state-spouts (.get_state_spouts topology) obj (Utils/getSetComponentObject (cond (contains? spouts component-id) (.get_spout_object ^SpoutSpec (get spouts component-id)) (contains? bolts component-id) (.get_bolt_object ^Bolt (get bolts component-id)) (contains? state-spouts component-id) (.get_state_spout_object ^StateSpoutSpec (get state-spouts component-id)) true (throw-runtime "Could not find " component-id " in " topology))) obj (if (instance? ShellComponent obj) (if (contains? spouts component-id) (ShellSpout. obj) (ShellBolt. obj)) obj ) obj (if (instance? JavaObject obj) (thrift/instantiate-java-object obj) obj )] obj ))
本文章摘自博客园,原文发布日期:2013-07-31
低调大师中文资讯倾力打造互联网数据资讯、行业资源、电子商务、移动互联网、网络营销平台。
持续更新报道IT业界、互联网、市场资讯、驱动更新,是最及时权威的产业资讯及硬件资讯报道平台。
转载内容版权归作者及来源网站所有,本站原创内容转载请注明来源。
- 上一篇
性能追求之路——MaxCompute2.0(原ODPS)的前世今生
在2017云栖大会大数据专场,阿里云高级专家云郎分享了《大数据计算服务MaxCompute产品最新动态》。他首先介绍了MaxCompute的发展历程和技术架构,然后对MaxCompute 2.0版本新特性和新技术进行了详细介绍。最后,分享了基于MaxCompute平台构建完整大数据应用架构、构建新型数据仓库、实现个性化推荐的实践。 本文根据直播视频整理而成。 首先看三个简单的问题。在阿里内部已经有了30多个业务单元,是一个典型的数据驱动公司。数据在整个业务创新、业务变更的过程中起了非常重要的作用。所以,是什么技术支撑了阿里巴巴集团内部的数据计算要求?经历了很多次双十一,双11背后的大数据平台是什么?在阿里云,第1个运行在阿里云飞天平台上的云服务是什么?答案非常简单,就是阿里云大数据平台MaxCompute(原名ODPS,https
- 下一篇
《Log4j 2 官方文档》多余性(Additivity)
如果我们希望输出com.foo.Bar的TRACE等级的日志,而不像影响其他日志的输出。简单的改变日志等级是不能达到我们想要的目的;但是修改也很简单,只要我们添加一个新的Logger定义就可以达到目标。 <Logger name="com.foo.Bar" level="TRACE"/> <Root level="ERROR"> <AppenderRef ref="STDOUT"> </Root> 这个配置达到了我们想要的目标,所有com.foo.Bar的日志都会被输出,而其他组件的日志仅仅会输出ERROR等级的日志。 在上面的例子,所有com.foo.Bar的日志都会被输出到控制台。这是因为为com.foo.Bar配置的Logger没有设定任何的Appender。 请看如下的配置 <?xml version="1.0" encoding="UTF-8"?> <Configuration status="WARN"> <Appenders> <Console name="Console" targ...
相关文章
文章评论
共有0条评论来说两句吧...
文章二维码
点击排行
推荐阅读
最新文章
- Windows10,CentOS7,CentOS8安装MongoDB4.0.16
- CentOS6,7,8上安装Nginx,支持https2.0的开启
- SpringBoot2编写第一个Controller,响应你的http请求并返回结果
- SpringBoot2全家桶,快速入门学习开发网站教程
- SpringBoot2初体验,简单认识spring boot2并且搭建基础工程
- CentOS7编译安装Cmake3.16.3,解决mysql等软件编译问题
- CentOS关闭SELinux安全模块
- SpringBoot2配置默认Tomcat设置,开启更多高级功能
- CentOS7设置SWAP分区,小内存服务器的救世主
- SpringBoot2整合Thymeleaf,官方推荐html解决方案