首先定义一系列metric相关的interface, IMetric, IReducer, ICombiner (backtype.storm.metric.api)
在task中, 创建一系列builtin-metrics, (backtype.storm.daemon.builtin-metrics), 并注册到topology context里面
task会不断的利用如spout-acked-tuple!的functions去更新这些builtin-metrics
task会定期将builtin-metrics里面的统计数据通过METRICS-STREAM发送给metric-bolt (backtype.storm.metric.MetricsConsumerBolt, 该bolt会创建实现backtype.storm.metric.api.IMetricsConsumer的对象, 用于计算出metrics)
然后如何使用这些metrics?
由于这是builtin metrics, 是不会被外界使用的
如果处理这些metrics, 取决于_metricsConsumer.handleDataPoints, 这里的_metricsConsumer是通过topology's configuration配置的
比如backtype.storm.metric.LoggingMetricsConsumer, 如果使用这个consumer就会将metrics写入log中
1. backtype.storm.metric.api
IMetric
package backtype.storm.metric.api;
public interface IMetric {
public Object getValueAndReset(); ;;取得当前值并恢复初始状态
}
CountMetric, 计数, reset时清零
AssignableMetric, 赋值, 不用reset
MultiCountMetric, 使用hashmap记录多个count, reset时分别对每个count对象调用getValueAndReset
public class CountMetric implements IMetric {
long _value = 0;
public CountMetric() {
}
public void incr() {
_value++;
}
public void incrBy(long incrementBy) {
_value += incrementBy;
}
public Object getValueAndReset() {
long ret = _value;
_value = 0;
return ret;
}
}
ICombiner
public interface ICombiner<T> {
public T identity();
public T combine(T a, T b);
}
CombinedMetric, 结合ICombiner和IMetric
public class CombinedMetric implements IMetric {
private final ICombiner _combiner;
private Object _value;
public CombinedMetric(ICombiner combiner) {
_combiner = combiner;
_value = _combiner.identity();
}
public void update(Object value) {
_value = _combiner.combine(_value, value);
}
public Object getValueAndReset() {
Object ret = _value;
_value = _combiner.identity();
return ret;
}
}
IReducer
public interface IReducer<T> {
T init();
T reduce(T accumulator, Object input);
Object extractResult(T accumulator);
}
实现IReducer接口, 实现平均数Reducer, reduce里面累加和计数, extractResult里面acc/count求平均数
class MeanReducerState {
public int count = 0;
public double sum = 0.0;
}
public class MeanReducer implements IReducer<MeanReducerState> {
public MeanReducerState init() {
return new MeanReducerState();
}
public MeanReducerState reduce(MeanReducerState acc, Object input) {
acc.count++;
if(input instanceof Double) {
acc.sum += (Double)input;
} else if(input instanceof Long) {
acc.sum += ((Long)input).doubleValue();
} else if(input instanceof Integer) {
acc.sum += ((Integer)input).doubleValue();
} else {
throw new RuntimeException(
"MeanReducer::reduce called with unsupported input type `" + input.getClass()
+ "`. Supported types are Double, Long, Integer.");
}
return acc;
}
public Object extractResult(MeanReducerState acc) {
if(acc.count > 0) {
return new Double(acc.sum / (double)acc.count);
} else {
return null;
}
}
}
ReducedMetric
结合IReducer和IMetric
public class ReducedMetric implements IMetric {
private final IReducer _reducer;
private Object _accumulator;
public ReducedMetric(IReducer reducer) {
_reducer = reducer;
_accumulator = _reducer.init();
}
public void update(Object value) {
_accumulator = _reducer.reduce(_accumulator, value);
}
public Object getValueAndReset() {
Object ret = _reducer.extractResult(_accumulator);
_accumulator = _reducer.init();
return ret;
}
}
IMetricsConsumer
这个interface, 内嵌TaskInfo和DataPoint类
handleDataPoints, 添加逻辑以处理task对应的一系列DataPoint
public interface IMetricsConsumer {
public static class TaskInfo {
public TaskInfo() {}
public TaskInfo(String srcWorkerHost, int srcWorkerPort, String srcComponentId, int srcTaskId, long timestamp, int updateIntervalSecs) {
this.srcWorkerHost = srcWorkerHost;
this.srcWorkerPort = srcWorkerPort;
this.srcComponentId = srcComponentId;
this.srcTaskId = srcTaskId;
this.timestamp = timestamp;
this.updateIntervalSecs = updateIntervalSecs;
}
public String srcWorkerHost;
public int srcWorkerPort;
public String srcComponentId;
public int srcTaskId;
public long timestamp;
public int updateIntervalSecs;
}
public static class DataPoint {
public DataPoint() {}
public DataPoint(String name, Object value) {
this.name = name;
this.value = value;
}
@Override
public String toString() {
return "[" + name + " = " + value + "]";
}
public String name;
public Object value;
}
void prepare(Map stormConf, Object registrationArgument, TopologyContext context, IErrorReporter errorReporter);
void handleDataPoints(TaskInfo taskInfo, Collection<DataPoint> dataPoints);
void cleanup();
}
2. backtype.storm.daemon.builtin-metrics
定义Spout和Bolt所需要的一些metric, 主要两个record, BuiltinSpoutMetrics和BuiltinBoltMetrics, [metric-name, metric-object]的hashmap
(defrecord BuiltinSpoutMetrics [^MultiCountMetric ack-count
^MultiReducedMetric complete-latency
^MultiCountMetric fail-count
^MultiCountMetric emit-count
^MultiCountMetric transfer-count])
(defrecord BuiltinBoltMetrics [^MultiCountMetric ack-count
^MultiReducedMetric process-latency
^MultiCountMetric fail-count
^MultiCountMetric execute-count
^MultiReducedMetric execute-latency
^MultiCountMetric emit-count
^MultiCountMetric transfer-count])
(defn make-data [executor-type]
(condp = executor-type
:spout (BuiltinSpoutMetrics. (MultiCountMetric.)
(MultiReducedMetric. (MeanReducer.))
(MultiCountMetric.)
(MultiCountMetric.)
(MultiCountMetric.))
:bolt (BuiltinBoltMetrics. (MultiCountMetric.)
(MultiReducedMetric. (MeanReducer.))
(MultiCountMetric.)
(MultiCountMetric.)
(MultiReducedMetric. (MeanReducer.))
(MultiCountMetric.)
(MultiCountMetric.))))
(defn register-all [builtin-metrics storm-conf topology-context]
(doseq [[kw imetric] builtin-metrics]
(.registerMetric topology-context (str "__" (name kw)) imetric
(int (get storm-conf Config/TOPOLOGY_BUILTIN_METRICS_BUCKET_SIZE_SECS)))))
在mk-task-data的时候, 调用make-data来创建相应的metrics,
:builtin-metrics (builtin-metrics/make-data (:type executor-data))
并在executor的mk-threads中, 会将这些builtin-metrics注册到topologycontext中去,
(builtin-metrics/register-all (:builtin-metrics task-data) storm-conf (:user-context task-data))
上面完成的builtin-metrics的创建和注册, 接着定义了一系列用于更新metrics的functions,
以spout-acked-tuple!为例, 需要更新MultiCountMetric ack-count和MultiReducedMetric complete-latency
.scope从MultiCountMetric取出某个CountMetric, 然后incrBy来将stats的rate增加到count上
(defn spout-acked-tuple! [^BuiltinSpoutMetrics m stats stream latency-ms]
(-> m .ack-count (.scope stream) (.incrBy (stats-rate stats)))
(-> m .complete-latency (.scope stream) (.update latency-ms)))
3. backtype.storm.metric
MetricsConsumerBolt
创建实现IMetricsConsumer的对象, 并在execute里面调用handleDataPoints
package backtype.storm.metric;
public class MetricsConsumerBolt implements IBolt {
IMetricsConsumer _metricsConsumer;
String _consumerClassName;
OutputCollector _collector;
Object _registrationArgument;
public MetricsConsumerBolt(String consumerClassName, Object registrationArgument) {
_consumerClassName = consumerClassName;
_registrationArgument = registrationArgument;
}
@Override
public void prepare(Map stormConf, TopologyContext context, OutputCollector collector) {
try {
_metricsConsumer = (IMetricsConsumer)Class.forName(_consumerClassName).newInstance();
} catch (Exception e) {
throw new RuntimeException("Could not instantiate a class listed in config under section " +
Config.TOPOLOGY_METRICS_CONSUMER_REGISTER + " with fully qualified name " + _consumerClassName, e);
}
_metricsConsumer.prepare(stormConf, _registrationArgument, context, (IErrorReporter)collector);
_collector = collector;
}
@Override
public void execute(Tuple input) {
_metricsConsumer.handleDataPoints((IMetricsConsumer.TaskInfo)input.getValue(0), (Collection)input.getValue(1));
_collector.ack(input);
}
@Override
public void cleanup() {
_metricsConsumer.cleanup();
}
}
SystemBolt
SystemBolt, 根据comments里面说的, 每个worker都有一个, taskid=-1
定义些system相关的metric, 并注册到topologycontext里面
需要使用Java调用clojure, 所以需要import下面的package
import clojure.lang.AFn;
import clojure.lang.IFn; //funtion
import clojure.lang.RT; //run-time
并且用到些用于监控memory和JVM的java package
java.lang.management.MemoryUsage, 表示内存使用量快照的MemoryUsage对象
java.lang.management.GarbageCollectorMXBean, 用于Java虚拟机的垃圾回收的管理接口, 比如发生的回收的总次数, 和累计回收时间
java.lang.management.RuntimeMXBean, 用于Java 虚拟机的运行时系统的管理接口
这个bolt的特点是, 只有prepare实现了逻辑, 并且通过_prepareWasCalled保证prepare只被执行一次
prepare中的逻辑, 主要就是定义各种metric, 并且通过registerMetric注册到TopologyContext中
metic包含, JVM的运行时间, 开始时间, memory情况, 和每个GarbageCollector的情况
注册的这些system metrics也会一起被发送到MetricsConsumerBolt进行处理
这应该用spout实现, 为啥用bolt实现?
// There is one task inside one executor for each worker of the topology.
// TaskID is always -1, therefore you can only send-unanchored tuples to co-located SystemBolt.
// This bolt was conceived to export worker stats via metrics api.
public class SystemBolt implements IBolt {
private static Logger LOG = LoggerFactory.getLogger(SystemBolt.class);
private static boolean _prepareWasCalled = false;
private static class MemoryUsageMetric implements IMetric {
IFn _getUsage;
public MemoryUsageMetric(IFn getUsage) {
_getUsage = getUsage;
}
@Override
public Object getValueAndReset() {
MemoryUsage memUsage = (MemoryUsage)_getUsage.invoke();
HashMap m = new HashMap();
m.put("maxBytes", memUsage.getMax());
m.put("committedBytes", memUsage.getCommitted());
m.put("initBytes", memUsage.getInit());
m.put("usedBytes", memUsage.getUsed());
m.put("virtualFreeBytes", memUsage.getMax() - memUsage.getUsed());
m.put("unusedBytes", memUsage.getCommitted() - memUsage.getUsed());
return m;
}
}
// canonically the metrics data exported is time bucketed when doing counts.
// convert the absolute values here into time buckets.
private static class GarbageCollectorMetric implements IMetric {
GarbageCollectorMXBean _gcBean;
Long _collectionCount;
Long _collectionTime;
public GarbageCollectorMetric(GarbageCollectorMXBean gcBean) {
_gcBean = gcBean;
}
@Override
public Object getValueAndReset() {
Long collectionCountP = _gcBean.getCollectionCount();
Long collectionTimeP = _gcBean.getCollectionTime();
Map ret = null;
if(_collectionCount!=null && _collectionTime!=null) {
ret = new HashMap();
ret.put("count", collectionCountP - _collectionCount);
ret.put("timeMs", collectionTimeP - _collectionTime);
}
_collectionCount = collectionCountP;
_collectionTime = collectionTimeP;
return ret;
}
}
@Override
public void prepare(final Map stormConf, TopologyContext context, OutputCollector collector) {
if(_prepareWasCalled && stormConf.get(Config.STORM_CLUSTER_MODE) != "local") {
throw new RuntimeException("A single worker should have 1 SystemBolt instance.");
}
_prepareWasCalled = true;
int bucketSize = RT.intCast(stormConf.get(Config.TOPOLOGY_BUILTIN_METRICS_BUCKET_SIZE_SECS));
final RuntimeMXBean jvmRT = ManagementFactory.getRuntimeMXBean();
context.registerMetric("uptimeSecs", new IMetric() {
@Override
public Object getValueAndReset() {
return jvmRT.getUptime()/1000.0;
}
}, bucketSize);
context.registerMetric("startTimeSecs", new IMetric() {
@Override
public Object getValueAndReset() {
return jvmRT.getStartTime()/1000.0;
}
}, bucketSize);
context.registerMetric("newWorkerEvent", new IMetric() {
boolean doEvent = true;
@Override
public Object getValueAndReset() {
if (doEvent) {
doEvent = false;
return 1;
} else return 0;
}
}, bucketSize);
final MemoryMXBean jvmMemRT = ManagementFactory.getMemoryMXBean();
context.registerMetric("memory/heap", new MemoryUsageMetric(new AFn() {
public Object invoke() {
return jvmMemRT.getHeapMemoryUsage();
}
}), bucketSize);
context.registerMetric("memory/nonHeap", new MemoryUsageMetric(new AFn() {
public Object invoke() {
return jvmMemRT.getNonHeapMemoryUsage();
}
}), bucketSize);
for(GarbageCollectorMXBean b : ManagementFactory.getGarbageCollectorMXBeans()) {
context.registerMetric("GC/" + b.getName().replaceAll("\\W", ""), new GarbageCollectorMetric(b), bucketSize);
}
}
@Override
public void execute(Tuple input) {
throw new RuntimeException("Non-system tuples should never be sent to __system bolt.");
}
@Override
public void cleanup() {
}
}
4. system-topology!
这里会动态的往topology里面, 加入metric-component (MetricsConsumerBolt) 和system-component (SystemBolt), 以及相应的steam信息
system-topology!会往topology加上些东西
1. acker, 后面再说
2. metric-bolt, input是所有component的tasks发来的METRICS-STREAM, 没有output
3. system-bolt, 没有input, output是两个TICK-STREAM
4. 给所有component, 增加额外的输出metrics-stream, system-stream
(defn system-topology! [storm-conf ^StormTopology topology]
(validate-basic! topology)
(let [ret (.deepCopy topology)]
(add-acker! storm-conf ret)
(add-metric-components! storm-conf ret)
(add-system-components! storm-conf ret)
(add-metric-streams! ret)
(add-system-streams! ret)
(validate-structure! ret)
ret
))
4.1 增加component
看下thrift中的定义, 往topology里面增加一个blot component, 其实就是往hashmap中增加一组[string, Bolt]
关键就是看看如何使用thrift/mk-bolt-spec*来创建blot spec
struct StormTopology {
1: required map<string, SpoutSpec> spouts;
2: required map<string, Bolt> bolts;
3: required map<string, StateSpoutSpec> state_spouts;
}
struct Bolt {
1: required ComponentObject bolt_object;
2: required ComponentCommon common;
}
struct ComponentCommon {
1: required map<GlobalStreamId, Grouping> inputs;
2: required map<string, StreamInfo> streams; //key is stream id, outputs
3: optional i32 parallelism_hint; //how many threads across the cluster should be dedicated to this component
4: optional string json_conf;
}
struct StreamInfo {
1: required list<string> output_fields;
2: required bool direct;
}
(defn add-metric-components! [storm-conf ^StormTopology topology]
(doseq [[comp-id bolt-spec] (metrics-consumer-bolt-specs storm-conf topology)] ;;从metrics-consumer-bolt-specs中可以看出该bolt会以METRICS-STREAM-ID为输入, 且没有输出
(.put_to_bolts topology comp-id bolt-spec)))
(defn add-system-components! [conf ^StormTopology topology]
(let [system-bolt-spec (thrift/mk-bolt-spec*
{} ;;input为空, 没有输入
(SystemBolt.) ;;object
{SYSTEM-TICK-STREAM-ID (thrift/output-fields ["rate_secs"])
METRICS-TICK-STREAM-ID (thrift/output-fields ["interval"])} ;;output, 定义两个output streams, 但代码中并没有emit
:p 0
:conf {TOPOLOGY-TASKS 0})]
(.put_to_bolts topology SYSTEM-COMPONENT-ID system-bolt-spec)))
metric-components
首先, topology里面所有的component(包含system component), 都需要往metics-bolt发送统计数据, 所以component-ids-that-emit-metrics就是all-components-ids+SYSTEM-COMPONENT-ID
那么对于任意一个comp, 都会对metics-bolt产生如下输入, {[comp-id METRICS-STREAM-ID] :shuffle} (采用:suffle grouping方式)
然后, 用thrift/mk-bolt-spec*来定义创建bolt的fn, mk-bolt-spec
最后, 调用mk-bolt-spec来创建metics-bolt的spec, 参考上面的定义
关键就是, 创建MetricsConsumerBolt对象, 需要从storm-conf里面读出, MetricsConsumer的实现类和参数
这个bolt负责, 将从各个task接收到的数据, 调用handleDataPoints生成metircs, 参考前面的定义
(defn metrics-consumer-bolt-specs [storm-conf topology]
(let [component-ids-that-emit-metrics (cons SYSTEM-COMPONENT-ID (keys (all-components topology)))
inputs (->> (for [comp-id component-ids-that-emit-metrics]
{[comp-id METRICS-STREAM-ID] :shuffle})
(into {}))
mk-bolt-spec (fn [class arg p]
(thrift/mk-bolt-spec*
inputs ;;inputs集合
(backtype.storm.metric.MetricsConsumerBolt. class arg) ;;object
{} ;;output为空
:p p :conf {TOPOLOGY-TASKS p}))]
(map
(fn [component-id register]
[component-id (mk-bolt-spec (get register "class")
(get register "argument")
(or (get register "parallelism.hint") 1))])
(metrics-consumer-register-ids storm-conf)
(get storm-conf TOPOLOGY-METRICS-CONSUMER-REGISTER))))
4.2 增加stream
给每个component增加两个output stream
METRICS-STREAM-ID, 发送给metric-blot, 数据结构为output-fields ["task-info" "data-points"]
SYSTEM-STREAM-ID, ,数据结构为output-fields ["event"]
(defn add-metric-streams! [^StormTopology topology]
(doseq [[_ component] (all-components topology)
:let [common (.get_common component)]]
(.put_to_streams common METRICS-STREAM-ID
(thrift/output-fields ["task-info" "data-points"]))))
(defn add-system-streams! [^StormTopology topology]
(doseq [[_ component] (all-components topology)
:let [common (.get_common component)]]
(.put_to_streams common SYSTEM-STREAM-ID (thrift/output-fields ["event"]))))
本文章摘自博客园,原文发布日期:2013-07-30