storm从入门到放弃(一),storm介绍
背景:目前就职于国内最大的IT咨询公司,恰巧又是毕业季,所在部门招了20多个应届毕业生,本人要跟部门新人进行为期一个月的大数据入职培训,特此将整理的文档分享出来。
Storm是一个开源的分布式实时计算系统,可以简单、可靠的处理大量的数据流。Storm支持水平扩展,具有高容错性,保证每个消息都会得到处理。
Storm核心组件
Worker:运行具体处理组件逻辑的进程。Worker运行的任务类型只有两种,一种是Spout任务,一种是Bolt任务。
Task:worker中每一个spout/bolt的线程称为一个task. 在storm0.8之后,task不再与物理线程对应,不同spout/bolt的task可能会共享一个物理线程,该线程称为executor。
Storm编程模型
public class RandomSentenceSpout extends BaseRichSpout { public void nextTuple() { collector.emit(new Values("+ - * % /")); Utils.sleep(50000); } ...... } public class SplitSentenceBolt extends BaseBasicBolt { public void execute(Tuple input, BasicOutputCollector collector) { String sentence = (String)input.getValueByField("intsmaze"); System.out.println(Thread.currentThread().getId()+" "+sentence); } ...... } public class TwoBolt extends BaseBasicBolt { public void execute(Tuple input, BasicOutputCollector collector) { String sentence = (String)input.getValueByField("intsmaze"); System.out.println(Thread.currentThread().getId()+" "+sentence); } ...... } public class WordCountTopologyMain { public static void main(String[] args) throws Exception { TopologyBuilder builder = new TopologyBuilder(); builder.setSpout("spout1", new RandomSentenceSpout(),1); builder.setBolt("two", new TwoBolt(),1).shuffleGrouping("spout1"); builder.setBolt("split1", new SplitSentenceBolt(),2).shuffleGrouping("spout1"); Config conf = new Config(); conf.setDebug(false); conf.setMaxTaskParallelism(3); LocalCluster cluster = new LocalCluster(); cluster.submitTopology("word-count", conf, builder.createTopology()); } } }
可以发现spout每隔一段时间间隔发一份数据,这份数据会被两个bolt同时接收,而不是说这次A bolt接收下次B bolt接收。 同一个bolt业务逻辑如果设置了并行度,他们才会根据分组策略依次接收上游发来的消息。
----------------84 + - * % / 这个是tow bolt接收 ----------------78 + - * % / 这个是split1 bolt 中78线程接收的 ----------------80 + - * % / 这个是split1 bolt中线程80接收的。 ----------------84 + - * % / ----------------78 + - * % / ----------------84 + - * % /
Fields Grouping:按字段分组,比如按userid来分组,具有同样userid的tuple会被分到相同的Bolts里的一个task,而不同的userid则会被分配到不同的bolts里的task。
All Grouping:广播发送,对于每一个tuple,所有的bolts都会收到。
Global Grouping:全局分组, 这个tuple被分配到storm中的一个bolt的其中一个task。再具体一点就是分配给id值最低的那个task。
Non Grouping:不分组,这stream grouping个分组的意思是说stream不关心到底谁会收到它的tuple。目前这种分组和Shuffle grouping是一样的效果, 有一点不同的是storm会把这个bolt放到这个bolt的订阅者同一个线程里面去执行。
Direct Grouping: 直接分组, 这是一种比较特别的分组方法,用这种分组意味着消息的发送者指定由消息接收者的哪个task处理这个消息。只有被声明为Direct Stream的消息流可以声明这种分组方法。而且这种消息tuple必须使用emitDirect方法来发射。消息处理者可以通过TopologyContext来获取处理它的消息的task的id (OutputCollector.emit方法也会返回task的id)。
Local or shuffle grouping:如果目标bolt有一个或者多个task在同一个工作进程中,tuple将会被随机发生给这些tasks。否则,和普通的Shuffle Grouping行为一致。
conf.setNumWorkers(4) 表示设置了4个worker来执行整个topology的所有组件 builder.setBolt("boltA-intsmaze",new BoltA(), 4) ---->指明 boltA组件的线程数excutors总共有4个 builder.setBolt("boltB-intsmaze",new BoltB(), 4) ---->指明 boltB组件的线程数excutors总共有4个 builder.setSpout("randomSpout-intsmaze",new RandomSpout(), 2) ---->指明randomSpout组件的线程数excutors总共有2个 -----意味着整个topology中执行所有组件的总线程数为4+4+2=10个 ----worker数量是4个,有可能会出现这样的负载情况,worker-1有2个线程,worker-2有2个线程,worker-3有3个线程,worker-4有3个线程 如果指定某个组件的具体task并发实例数 builder.setSpout("randomspout-intsmaze", new RandomWordSpout(), 4).setNumTasks(8); ----意味着对于这个组件的执行线程excutor来说,一个excutor将执行8/4=2个task,默认情况一个线程执行一个task.

