Kafka实战-Kafka到Storm
1.概述
在《Kafka实战-Flume到Kafka》一文中给大家分享了Kafka的数据源生产,今天为大家介绍如何去实时消费Kafka中的数据。这里使用实时计算的模型——Storm。下面是今天分享的主要内容,如下所示:
- 数据消费
- Storm计算
- 预览截图
接下来,我们开始分享今天的内容。
2.数据消费
Kafka的数据消费,是由Storm去消费,通过KafkaSpout将数据输送到Storm,然后让Storm安装业务需求对接受的数据做实时处理,下面给大家介绍数据消费的流程图,如下图所示:
从图可以看出,Storm通过KafkaSpout获取Kafka集群中的数据,在经过Storm处理后,结果会被持久化到DB库中。
3.Storm计算
接着,我们使用Storm去计算,这里需要体检搭建部署好Storm集群,若是未搭建部署集群,大家可以参考我写的《Kafka实战-Storm Cluster》。这里就不多做赘述搭建的过程了,下面给大家介绍实现这部分的代码,关于KafkaSpout的代码如下所示:
- KafkaSpout类:
package cn.hadoop.hdfs.storm; import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.Properties; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import cn.hadoop.hdfs.conf.ConfigureAPI.KafkaProperties; import kafka.consumer.Consumer; import kafka.consumer.ConsumerConfig; import kafka.consumer.ConsumerIterator; import kafka.consumer.KafkaStream; import kafka.javaapi.consumer.ConsumerConnector; import backtype.storm.spout.SpoutOutputCollector; import backtype.storm.task.TopologyContext; import backtype.storm.topology.IRichSpout; import backtype.storm.topology.OutputFieldsDeclarer; import backtype.storm.tuple.Fields; import backtype.storm.tuple.Values; /** * @Date Jun 10, 2015 * * @Author dengjie * * @Note Data sources using KafkaSpout to consume Kafka */ public class KafkaSpout implements IRichSpout { /** * */ private static final long serialVersionUID = -7107773519958260350L; private static final Logger LOGGER = LoggerFactory.getLogger(KafkaSpout.class); SpoutOutputCollector collector; private ConsumerConnector consumer; private String topic; private static ConsumerConfig createConsumerConfig() { Properties props = new Properties(); props.put("zookeeper.connect", KafkaProperties.ZK); props.put("group.id", KafkaProperties.GROUP_ID); props.put("zookeeper.session.timeout.ms", "40000"); props.put("zookeeper.sync.time.ms", "200"); props.put("auto.commit.interval.ms", "1000"); return new ConsumerConfig(props); } public KafkaSpout(String topic) { this.topic = topic; } public void open(Map conf, TopologyContext context, SpoutOutputCollector collector) { this.collector = collector; } public void close() { // TODO Auto-generated method stub } public void activate() { this.consumer = Consumer.createJavaConsumerConnector(createConsumerConfig()); Map<String, Integer> topickMap = new HashMap<String, Integer>(); topickMap.put(topic, new Integer(1)); Map<String, List<KafkaStream<byte[], byte[]>>> streamMap = consumer.createMessageStreams(topickMap); KafkaStream<byte[], byte[]> stream = streamMap.get(topic).get(0); ConsumerIterator<byte[], byte[]> it = stream.iterator(); while (it.hasNext()) { String value = new String(it.next().message()); LOGGER.info("(consumer)==>" + value); collector.emit(new Values(value), value); } } public void deactivate() { // TODO Auto-generated method stub } public void nextTuple() { // TODO Auto-generated method stub } public void ack(Object msgId) { // TODO Auto-generated method stub } public void fail(Object msgId) { // TODO Auto-generated method stub } public void declareOutputFields(OutputFieldsDeclarer declarer) { declarer.declare(new Fields("KafkaSpout")); } public Map<String, Object> getComponentConfiguration() { // TODO Auto-generated method stub return null; } }
KafkaTopology类:
package cn.hadoop.hdfs.storm.client; import cn.hadoop.hdfs.storm.FileBlots; import cn.hadoop.hdfs.storm.KafkaSpout; import cn.hadoop.hdfs.storm.WordsCounterBlots; import backtype.storm.Config; import backtype.storm.LocalCluster; import backtype.storm.StormSubmitter; import backtype.storm.topology.TopologyBuilder; import backtype.storm.tuple.Fields; /** * @Date Jun 10, 2015 * * @Author dengjie * * @Note KafkaTopology Task */ public class KafkaTopology { public static void main(String[] args) { TopologyBuilder builder = new TopologyBuilder(); builder.setSpout("testGroup", new KafkaSpout("test")); builder.setBolt("file-blots", new FileBlots()).shuffleGrouping("testGroup"); builder.setBolt("words-counter", new WordsCounterBlots(), 2).fieldsGrouping("file-blots", new Fields("words")); Config config = new Config(); config.setDebug(true); if (args != null && args.length > 0) { // online commit Topology config.put(Config.NIMBUS_HOST, args[0]); config.setNumWorkers(3); try { StormSubmitter.submitTopologyWithProgressBar(KafkaTopology.class.getSimpleName(), config, builder.createTopology()); } catch (Exception e) { e.printStackTrace(); } } else { // Local commit jar LocalCluster local = new LocalCluster(); local.submitTopology("counter", config, builder.createTopology()); try { Thread.sleep(60000); } catch (InterruptedException e) { e.printStackTrace(); } local.shutdown(); } } }
4.预览截图
首先,我们启动Kafka集群,目前未生产任何消息,如下图所示:
接下来,我们启动Flume集群,开始收集日志信息,将数据输送到Kafka集群,如下图所示:
接下来,我们启动Storm UI来查看Storm提交的任务运行状况,如下图所示:
最后,将统计的结果持久化到Redis或者MySQL等DB中,结果如下图所示:
5.总结
这里给大家分享了数据的消费流程,并且给出了持久化的结果预览图,关于持久化的细节,后面有单独有一篇博客会详细的讲述,给大家分享其中的过程,这里大家熟悉下流程,预览结果即可。
6.结束语
这篇博客就和大家分享到这里,如果大家在研究学习的过程当中有什么问题,可以加群进行讨论或发送邮件给我,我会尽我所能为您解答,与君共勉!

低调大师中文资讯倾力打造互联网数据资讯、行业资源、电子商务、移动互联网、网络营销平台。
持续更新报道IT业界、互联网、市场资讯、驱动更新,是最及时权威的产业资讯及硬件资讯报道平台。
转载内容版权归作者及来源网站所有,本站原创内容转载请注明来源。
- 上一篇
Hadoop项目实战-用户行为分析之应用概述(二)
1.概述 本课程的视频教程地址:《项目整体概述》 本节给大家分享的主题如下图所示: 下面我开始为大家分享第二节的内容——《项目整体概述》,下面开始今天的分享内容。 2.内容 从本节开始,我们将进入到Hadoop项目的整体概述一节学习,本节课程为大家介绍的主要知识点有一下内容,如下图所示: 下面,我们首先来看看项目的整体流程,其流程如下图所示: 项目流程可以分为4个模块,他们分别是数据收集,集群存储,分析计算和结果处理。 下面我分别为大家讲解这4个模块的作用。 我们知道,在做统计时,数据源是前提,有了数据源我们才能在此基础上做相应的计算和分析。 收集数据一般都有专门的集群去负责收集这方面的工作。 在完成收集工作后,我们需要将这些文件集中起来,这里存储采用的是分布式文件系统(HDFS)。我们将收集的数据 按一定的规则分类,并存储在指定的HDFS文件系统中。从收集到存储,数据源的准备阶段就算完成了。接着,我们可以对数据源进行相关指标的分析与计算,在 Hadoop 2.x 版本后编程模型有了良好的拓展,除了支持MapReduce,还支持其以外的模型,如:Spark。另外,还有Hive,Pig,...
- 下一篇
ElasticSearch实战-日志监控平台
1.概述 在项目业务倍增的情况下,查询效率受到影响,这里我们经过讨论,引进了分布式搜索套件——ElasticSearch,通过分布式搜索来解决当下业务上存在的问题。下面给大家列出今天分析的目录: ElasticSearch 套件介绍 ElasticSearch 应用场景和案例 平台架构 下面开始今天的内容分享。 2.ElasticSearch 套件 2.1LogStash LogStash是一个开源的、免费的日志收集工具,属于Elastic家族的一员,负责将收集的日志信息输送到ElasticSearch,为ElasticSearch提供数据源。 2.2ElasticSearch ElasticSearch是一个开源的分布式搜索引擎,具备高可靠性,支持非常多的企业级搜索用例。像Solr4一样,是基于Lucene构 建的。支持时间索引和全文检索。官网:https://www.elastic.co 它对外提供一系列基于Java和HTTP的API,用于索引、检索、修改大多数配置。 2.3Kibana Kibana也是开源和免费的工具,同样也是Elastic家族的一员,它可以帮助我们汇总、分析和...
相关文章
文章评论
共有0条评论来说两句吧...
文章二维码
点击排行
推荐阅读
最新文章
- CentOS7编译安装Gcc9.2.0,解决mysql等软件编译问题
- SpringBoot2配置默认Tomcat设置,开启更多高级功能
- SpringBoot2整合Redis,开启缓存,提高访问速度
- Windows10,CentOS7,CentOS8安装Nodejs环境
- MySQL8.0.19开启GTID主从同步CentOS8
- Docker快速安装Oracle11G,搭建oracle11g学习环境
- SpringBoot2整合MyBatis,连接MySql数据库做增删改查操作
- Hadoop3单机部署,实现最简伪集群
- Eclipse初始化配置,告别卡顿、闪退、编译时间过长
- 设置Eclipse缩进为4个空格,增强代码规范