task hook
在某些task事件发生时, 如果用户希望执行一些额外的逻辑, 就需要使用hook
当前定义如下事件, emit, cleanup, spoutAck……
用户只需要开发实现ITaskHook的类, 并将类名配置到(storm-conf TOPOLOGY-AUTO-TASK-HOOKS)
系统会在这些事件发生时, 自动调用所有注册的hook中的相应的functions
public interface ITaskHook {
void prepare(Map conf, TopologyContext context);
void cleanup();
void emit(EmitInfo info);
void spoutAck(SpoutAckInfo info);
void spoutFail(SpoutFailInfo info);
void boltExecute(BoltExecuteInfo info);
void boltAck(BoltAckInfo info);
void boltFail(BoltFailInfo info);
}
public class EmitInfo {
public List<Object> values;
public String stream;
public int taskId;
public Collection<Integer> outTasks;
public EmitInfo(List<Object> values, String stream, int taskId, Collection<Integer> outTasks) {
this.values = values;
this.stream = stream;
this.taskId = taskId;
this.outTasks = outTasks;
}
}
1. add hook
在mk-task的时候, 会从storm-conf配置里面读出hooks的class names
创建hook对象, 加入到TopologyContext的_hooks中
(defn mk-task [executor-data task-id]
(doseq [klass (storm-conf TOPOLOGY-AUTO-TASK-HOOKS)]
(.addTaskHook ^TopologyContext (:user-context task-data) (-> klass Class/forName .newInstance)))
)
public class TopologyContext extends WorkerTopologyContext implements IMetricsContext {
private List<ITaskHook> _hooks = new ArrayList<ITaskHook>();
public void addTaskHook(ITaskHook hook) {
hook.prepare(_stormConf, this);
_hooks.add(hook);
}
public Collection<ITaskHook> getHooks() {
return _hooks;
}
}
2. apply hook
当发生相应的事件时, 调用事先注册的hooks
下面的例子是在emit时, 调用相应的hooks
apply-hooks宏实现也很简单, 从topology context中取出hooks列表, 对每个hook调用emit(EmitInfo)
(apply-hooks user-context .emit (EmitInfo. values stream task-id out-tasks))
(defmacro apply-hooks [topology-context method-sym info-form]
(let [hook-sym (with-meta (gensym "hook") {:tag 'backtype.storm.hooks.ITaskHook})]
`(let [hooks# (get-context-hooks ~topology-context)]
(when-not (hooks-empty? hooks#)
(let [info# ~info-form]
(fast-list-iter [~hook-sym hooks#]
(~method-sym ~hook-sym info#)
))))))
本文章摘自博客园,原文发布日期:2013-07-30