1.全局定时器
1 import java.util.Map;
2
3 import backtype.storm.Config;
4 import backtype.storm.Constants;
5 import backtype.storm.LocalCluster;
6 import backtype.storm.spout.SpoutOutputCollector;
7 import backtype.storm.task.OutputCollector;
8 import backtype.storm.task.TopologyContext;
9 import backtype.storm.topology.OutputFieldsDeclarer;
10 import backtype.storm.topology.TopologyBuilder;
11 import backtype.storm.topology.base.BaseRichBolt;
12 import backtype.storm.topology.base.BaseRichSpout;
13 import backtype.storm.tuple.Fields;
14 import backtype.storm.tuple.Tuple;
15 import backtype.storm.tuple.Values;
16 import backtype.storm.utils.Utils;
17
18 /**
19 * 全局定时器
20 *
21 * 数字累加求和
22 * 先添加storm依赖
23 *
24 * @author Administrator
25 *
26 */
27 public class LocalTopologySumTimer1 {
28
29
30 /**
31 * spout需要继承baserichspout,实现未实现的方法
32 * @author Administrator
33 *
34 */
35 public static class MySpout extends BaseRichSpout{
36 private Map conf;
37 private TopologyContext context;
38 private SpoutOutputCollector collector;
39
40 /**
41 * 初始化方法,只会执行一次
42 * 在这里面可以写一个初始化的代码
43 * Map conf:其实里面保存的是topology的一些配置信息
44 * TopologyContext context:topology的上下文,类似于servletcontext
45 * SpoutOutputCollector collector:发射器,负责向外发射数据(tuple)
46 */
47 @Override
48 public void open(Map conf, TopologyContext context,
49 SpoutOutputCollector collector) {
50 this.conf = conf;
51 this.context = context;
52 this.collector = collector;
53 }
54
55 int num = 1;
56 /**
57 * 这个方法是spout中最重要的方法,
58 * 这个方法会被storm框架循环调用,可以理解为这个方法是在一个while循环之内
59 * 每调用一次,会向外发射一条数据
60 */
61 @Override
62 public void nextTuple() {
63 System.out.println("spout发射:"+num);
64 //把数据封装到values中,称为一个tuple,发射出去
65 this.collector.emit(new Values(num++));
66 Utils.sleep(1000);
67 }
68
69 /**
70 * 声明输出字段
71 */
72 @Override
73 public void declareOutputFields(OutputFieldsDeclarer declarer) {
74 //给values中的数据起个名字,方便后面的bolt从这个values中取数据
75 //fields中定义的参数和values中传递的数值是一一对应的
76 declarer.declare(new Fields("num"));
77 }
78
79 }
80
81
82 /**
83 * 自定义bolt需要实现baserichbolt
84 * @author Administrator
85 *
86 */
87 public static class MyBolt extends BaseRichBolt{
88 private Map stormConf;
89 private TopologyContext context;
90 private OutputCollector collector;
91
92 /**
93 * 和spout中的open方法意义一样
94 */
95 @Override
96 public void prepare(Map stormConf, TopologyContext context,
97 OutputCollector collector) {
98 this.stormConf = stormConf;
99 this.context = context;
100 this.collector = collector;
101 }
102
103 int sum = 0;
104 /**
105 * 是bolt中最重要的方法,当spout发射一个tuple出来,execute也会被调用,需要对spout发射出来的tuple进行处理
106 */
107 @Override
108 public void execute(Tuple input) {
109 if(input.getSourceComponent().equals(Constants.SYSTEM_COMPONENT_ID)){
110 //如果满足,就说明这个tuple是系统几倍的组件发送的,也就意味着定时时间到了
111 System.out.println("定时任务执行了。");
112
113 }else{//这个地方必须要做判断,否则让系统级别的tuple去取"num"会取不到报错的.
114 //这个地方的逻辑可以将产生的数据封装成一个map或者是list放在内存中.到达定时任务的时候取出来,使用batch批处理向数据库中操作.
115 //然后再把集合中的数据清空...之后再添加.
116
117 //input.getInteger(0);//也可以根据角标获取tuple中的数据
118 Integer value = input.getIntegerByField("num");
119 sum+=value;
120 System.out.println("和:"+sum);
121 }
122
123 }
124
125 /**
126 * 声明输出字段
127 */
128 @Override
129 public void declareOutputFields(OutputFieldsDeclarer declarer) {
130 //在这没必要定义了,因为execute方法中没有向外发射tuple,所以就不需要声明了。
131 //如果nextTuple或者execute方法中向外发射了tuple,那么declareOutputFields必须要声明,否则不需要声明
132 }
133
134 }
135 /**
136 * 注意:在组装topology的时候,组件的id在定义的时候,名称不能以__开头。__是系统保留的
137 * @param args
138 */
139 public static void main(String[] args) {
140 //组装topology
141 TopologyBuilder topologyBuilder = new TopologyBuilder();
142 topologyBuilder.setSpout("spout1", new MySpout());
143 //.shuffleGrouping("spout1"); 表示让MyBolt接收MySpout发射出来的tuple
144 topologyBuilder.setBolt("bolt1", new MyBolt()).shuffleGrouping("spout1");
145
146 //创建本地storm集群
147 LocalCluster localCluster = new LocalCluster();
148 Config config = new Config();
149 //下面这样设置就是一个全局的定时任务 还有局部的定时任务.
150 config.put(Config.TOPOLOGY_TICK_TUPLE_FREQ_SECS, 5);//表示每隔5秒storm会给Topology中的所有bolt发射一个系统级别的tuple
151 //前面的单词计数的例子 我们可能只需要在最后一个CountBolt中做定时任务 SpiltBolt中不需要做定时任务 但是两个Bolt中都可以收到这个系统级别的tuple
152 //所以需要每个Bolt中都做判断...SplitBolt可以加上一个判断 没有方法体...if(input.getSourceComponent().equals(Constants.SYSTEM_COMPONENT_ID)){ }
153 //否则会出错...从系统级别的tuple取你定义的值 取不到 报错.
154 localCluster.submitTopology("sumTopology", config, topologyBuilder.createTopology());
155
156 }
157
158
159 }
局部定时器
1 /**
2 * 局部定时器
3 *
4 * 数字累加求和
5 * 先添加storm依赖
6 *
7 * @author Administrator
8 *
9 */
10 public class LocalTopologySumTimer2 {
11
12
13 /**
14 * spout需要继承baserichspout,实现未实现的方法
15 * @author Administrator
16 *
17 */
18 public static class MySpout extends BaseRichSpout{
19 private Map conf;
20 private TopologyContext context;
21 private SpoutOutputCollector collector;
22
23 /**
24 * 初始化方法,只会执行一次
25 * 在这里面可以写一个初始化的代码
26 * Map conf:其实里面保存的是topology的一些配置信息
27 * TopologyContext context:topology的上下文,类似于servletcontext
28 * SpoutOutputCollector collector:发射器,负责向外发射数据(tuple)
29 */
30 @Override
31 public void open(Map conf, TopologyContext context,
32 SpoutOutputCollector collector) {
33 this.conf = conf;
34 this.context = context;
35 this.collector = collector;
36 }
37
38 int num = 1;
39 /**
40 * 这个方法是spout中最重要的方法,
41 * 这个方法会被storm框架循环调用,可以理解为这个方法是在一个while循环之内
42 * 每调用一次,会向外发射一条数据
43 */
44 @Override
45 public void nextTuple() {
46 System.out.println("spout发射:"+num);
47 //把数据封装到values中,称为一个tuple,发射出去
48 this.collector.emit(new Values(num++));
49 Utils.sleep(1000);
50 }
51
52 /**
53 * 声明输出字段
54 */
55 @Override
56 public void declareOutputFields(OutputFieldsDeclarer declarer) {
57 //给values中的数据起个名字,方便后面的bolt从这个values中取数据
58 //fields中定义的参数和values中传递的数值是一一对应的
59 declarer.declare(new Fields("num"));
60 }
61
62 }
63
64
65 /**
66 * 自定义bolt需要实现baserichbolt
67 * @author Administrator
68 *
69 */
70 public static class MyBolt extends BaseRichBolt{
71 private Map stormConf;
72 private TopologyContext context;
73 private OutputCollector collector;
74
75 /**
76 * 和spout中的open方法意义一样
77 */
78 @Override
79 public void prepare(Map stormConf, TopologyContext context,
80 OutputCollector collector) {
81 this.stormConf = stormConf;
82 this.context = context;
83 this.collector = collector;
84 }
85
86 int sum = 0;
87 /**
88 * 是bolt中最重要的方法,当spout发射一个tuple出来,execute也会被调用,需要对spout发射出来的tuple进行处理
89 */
90 @Override
91 public void execute(Tuple input) {
92 if(input.getSourceComponent().equals(Constants.SYSTEM_COMPONENT_ID)){
93 //如果满足,就说明这个tuple是系统几倍的组件发送的,也就意味着定时时间到了
94 System.out.println("定时任务执行了。");
95
96 }else{
97 //input.getInteger(0);//也可以根据角标获取tuple中的数据
98 Integer value = input.getIntegerByField("num");
99 sum+=value;
100 System.out.println("和:"+sum);
101 }
102
103 }
104
105 /**
106 * 声明输出字段
107 */
108 @Override
109 public void declareOutputFields(OutputFieldsDeclarer declarer) {
110 //在这没必要定义了,因为execute方法中没有向外发射tuple,所以就不需要声明了。
111 //如果nextTuple或者execute方法中向外发射了tuple,那么declareOutputFields必须要声明,否则不需要声明
112 }
113
114 /**
115 * 局部定时任务
116 * 只针对当前的bolt 对其他的bolt中没有影响
117 * 加对系统级别tuple的判断只需要在当前bolt中判断就可以...其他bolt不需要..
118 * 这种在工作中最常用....
119 * 全局定时任务在 main方法中 设置 局部的定时任务只需要在Bolt类中覆盖getComponentConfiguration()方法
120 * 这个还是比较有用,有意思的
121 */
122 @Override
123 public Map<String, Object> getComponentConfiguration() {
124 HashMap<String, Object> hashMap = new HashMap<String, Object>();
125 hashMap.put(Config.TOPOLOGY_TICK_TUPLE_FREQ_SECS, 5);
126 return hashMap;
127 }
128 }
129 /**
130 * 注意:在组装topology的时候,组件的id在定义的时候,名称不能以__开头。__是系统保留的
131 * @param args
132 */
133 public static void main(String[] args) {
134 //组装topology
135 TopologyBuilder topologyBuilder = new TopologyBuilder();
136 topologyBuilder.setSpout("spout1", new MySpout());
137 //.shuffleGrouping("spout1"); 表示让MyBolt接收MySpout发射出来的tuple
138 topologyBuilder.setBolt("bolt1", new MyBolt()).shuffleGrouping("spout1");
139
140 //创建本地storm集群
141 LocalCluster localCluster = new LocalCluster();
142 Config config = new Config();
143 localCluster.submitTopology("sumTopology", config, topologyBuilder.createTopology());
144
145 }
146
147
148 }
本文转自SummerChill博客园博客,原文链接:http://www.cnblogs.com/DreamDrive/p/6671496.html,如需转载请自行联系原作者