Shuffle Grouping: 随机分组, 随机派发stream里面的tuple, 保证bolt中的每个任务接收到的tuple数目相同.(它能实现较好的负载均衡)
Fields Grouping:按字段分组, 比如按userid来分组, 具有同样userid的tuple会被分到同一任务, 而不同的userid则会被分配到不同的任务
All Grouping: 广播发送,对于每一个tuple,Bolts中的所有任务都会收到.
Global Grouping: 全局分组,这个tuple被分配到storm中的一个bolt的其中一个task.再具体一点就是分配给id值最低的那个task.
Non Grouping: 随机分派,意思是说stream不关心到底谁会收到它的tuple.目前他和Shuffle grouping是一样的效果,
Direct Grouping: 直接分组,这是一种比较特别的分组方法,用这种分组意味着消息的发送者具体由消息接收者的哪个task处理这个消息.只有被声明为Direct Stream的消息流可以声明这种分组方法.而且这种消息tuple必须使用emitDirect方法来发射.消息处理者可以通过TopologyContext来或者处理它的消息的taskid (OutputCollector.emit方法也会返回taskid)
Fields Grouping 的代码
1 /**
2 * 数字累加求和
3 * 先添加storm依赖
4 */
5 public class LocalTopologySumFieldsGrouping {
6 /**
7 * spout需要继承baserichspout,实现未实现的方法
8 * @author Administrator
9 *
10 */
11 public static class MySpout extends BaseRichSpout{
12 private Map conf;
13 private TopologyContext context;
14 private SpoutOutputCollector collector;
15
16 /**
17 * 初始化方法,只会执行一次
18 * 在这里面可以写一个初始化的代码
19 * Map conf:其实里面保存的是topology的一些配置信息
20 * TopologyContext context:topology的上下文,类似于servletcontext
21 * SpoutOutputCollector collector:发射器,负责向外发射数据(tuple)
22 */
23 @Override
24 public void open(Map conf, TopologyContext context,
25 SpoutOutputCollector collector) {
26 this.conf = conf;
27 this.context = context;
28 this.collector = collector;
29 }
30
31 int num = 1;
32 /**
33 * 这个方法是spout中最重要的方法,
34 * 这个方法会被storm框架循环调用,可以理解为这个方法是在一个while循环之内
35 * 每调用一次,会向外发射一条数据
36 */
37 @Override
38 public void nextTuple() {
39 System.out.println("spout发射:"+num);
40 //把数据封装到values中,称为一个tuple,发射出去
41 this.collector.emit(new Values(num++,num%2));
42 Utils.sleep(1000);
43 }
44
45 /**
46 * 声明输出字段
47 */
48 @Override
49 public void declareOutputFields(OutputFieldsDeclarer declarer) {
50 //给values中的数据起个名字,方便后面的bolt从这个values中取数据
51 //fields中定义的参数和values中传递的数值是一一对应的
52 declarer.declare(new Fields("num","flag"));
53 }
54 }
55
56 /**
57 * 自定义bolt需要实现baserichbolt
58 * @author Administrator
59 *
60 */
61 public static class MyBolt extends BaseRichBolt{
62 private Map stormConf;
63 private TopologyContext context;
64 private OutputCollector collector;
65
66 /**
67 * 和spout中的open方法意义一样
68 */
69 @Override
70 public void prepare(Map stormConf, TopologyContext context,
71 OutputCollector collector) {
72 this.stormConf = stormConf;
73 this.context = context;
74 this.collector = collector;
75 }
76
77 int sum = 0;
78 /**
79 * 是bolt中最重要的方法,当spout发射一个tuple出来,execute也会被调用,需要对spout发射出来的tuple进行处理
80 */
81 @Override
82 public void execute(Tuple input) {
83 //input.getInteger(0);//也可以根据角标获取tuple中的数据
84 Integer value = input.getIntegerByField("num");
85 System.out.println("线程id:"+Thread.currentThread().getId()+",值:"+value);
86 //sum+=value;
87 //System.out.println("和:"+sum);
88 }
89
90 /**
91 * 声明输出字段
92 */
93 @Override
94 public void declareOutputFields(OutputFieldsDeclarer declarer) {
95 //在这没必要定义了,因为execute方法中没有向外发射tuple,所以就不需要声明了。
96 //如果nextTuple或者execute方法中向外发射了tuple,那么declareOutputFields必须要声明,否则不需要声明
97 }
98
99 }
100 /**
101 * 注意:在组装topology的时候,组件的id在定义的时候,名称不能以__开头。__是系统保留的
102 * @param args
103 */
104 public static void main(String[] args) {
105 //组装topology
106 TopologyBuilder topologyBuilder = new TopologyBuilder();
107 topologyBuilder.setSpout("spout1", new MySpout());
108 //.shuffleGrouping("spout1"); 表示让MyBolt接收MySpout发射出来的tuple
109 topologyBuilder.setBolt("bolt1", new MyBolt(),3).fieldsGrouping("spout1", new Fields("flag"));
110
111 //创建本地storm集群
112 LocalCluster localCluster = new LocalCluster();
113 localCluster.submitTopology("sumTopology", new Config(), topologyBuilder.createTopology());
114 }
115
116 }
ShuffleGrouping代码
1 /**
2 * 数字累加求和
3 * 先添加storm依赖
4 */
5 public class LocalTopologySumShufferGrouping {
6 /**
7 * spout需要继承baserichspout,实现未实现的方法
8 * @author Administrator
9 *
10 */
11 public static class MySpout extends BaseRichSpout{
12 private Map conf;
13 private TopologyContext context;
14 private SpoutOutputCollector collector;
15
16 /**
17 * 初始化方法,只会执行一次
18 * 在这里面可以写一个初始化的代码
19 * Map conf:其实里面保存的是topology的一些配置信息
20 * TopologyContext context:topology的上下文,类似于servletcontext
21 * SpoutOutputCollector collector:发射器,负责向外发射数据(tuple)
22 */
23 @Override
24 public void open(Map conf, TopologyContext context,
25 SpoutOutputCollector collector) {
26 this.conf = conf;
27 this.context = context;
28 this.collector = collector;
29 }
30
31 int num = 1;
32 /**
33 * 这个方法是spout中最重要的方法,
34 * 这个方法会被storm框架循环调用,可以理解为这个方法是在一个while循环之内
35 * 每调用一次,会向外发射一条数据
36 */
37 @Override
38 public void nextTuple() {
39 System.out.println("spout发射:"+num);
40 //把数据封装到values中,称为一个tuple,发射出去
41 this.collector.emit(new Values(num++));
42 Utils.sleep(1000);
43 }
44
45 /**
46 * 声明输出字段
47 */
48 @Override
49 public void declareOutputFields(OutputFieldsDeclarer declarer) {
50 //给values中的数据起个名字,方便后面的bolt从这个values中取数据
51 //fields中定义的参数和values中传递的数值是一一对应的
52 declarer.declare(new Fields("num"));
53 }
54
55 }
56
57
58 /**
59 * 自定义bolt需要实现baserichbolt
60 * @author Administrator
61 *
62 */
63 public static class MyBolt extends BaseRichBolt{
64 private Map stormConf;
65 private TopologyContext context;
66 private OutputCollector collector;
67
68 /**
69 * 和spout中的open方法意义一样
70 */
71 @Override
72 public void prepare(Map stormConf, TopologyContext context,
73 OutputCollector collector) {
74 this.stormConf = stormConf;
75 this.context = context;
76 this.collector = collector;
77 }
78
79 int sum = 0;
80 /**
81 * 是bolt中最重要的方法,当spout发射一个tuple出来,execute也会被调用,需要对spout发射出来的tuple进行处理
82 */
83 @Override
84 public void execute(Tuple input) {
85 //input.getInteger(0);//也可以根据角标获取tuple中的数据
86 Integer value = input.getIntegerByField("num");
87 System.out.println("线程id:"+Thread.currentThread().getId()+",值:"+value);//这样可以知道哪个线程接收到这个数据了.
88 //sum+=value;
89 //System.out.println("和:"+sum);
90 }
91
92 /**
93 * 声明输出字段
94 */
95 @Override
96 public void declareOutputFields(OutputFieldsDeclarer declarer) {
97 //在这没必要定义了,因为execute方法中没有向外发射tuple,所以就不需要声明了。
98 //如果nextTuple或者execute方法中向外发射了tuple,那么declareOutputFields必须要声明,否则不需要声明
99 }
100
101 }
102 /**
103 * 注意:在组装topology的时候,组件的id在定义的时候,名称不能以__开头。__是系统保留的
104 * @param args
105 */
106 public static void main(String[] args) {
107 //组装topology
108 TopologyBuilder topologyBuilder = new TopologyBuilder();
109 topologyBuilder.setSpout("spout1", new MySpout());
110 //.shuffleGrouping("spout1"); 表示让MyBolt接收MySpout发射出来的tuple
111 topologyBuilder.setBolt("bolt1", new MyBolt(),3).globalGrouping("spout1");
112
113 //创建本地storm集群
114 LocalCluster localCluster = new LocalCluster();
115 localCluster.submitTopology("sumTopology", new Config(), topologyBuilder.createTopology());
116 }
117 }
本文转自SummerChill博客园博客,原文链接:http://www.cnblogs.com/DreamDrive/p/5793975.html,如需转载请自行联系原作者