storm 读取不到对应的kafka数据
坑一:pom文件主要内容:注意里面 需要 使用 “exclusion”排除相关的依赖
<properties>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<maven.compiler.source>1.7</maven.compiler.source>
<maven.compiler.target>1.7</maven.compiler.target>
<storm.version>1.1.1</storm.version>
<kafka.version>0.9.0.0</kafka.version>
</properties>
<dependencies>
<!--storm-core依赖-->
<dependency>
<groupId>org.apache.storm</groupId>
<artifactId>storm-core</artifactId>
<version>${storm.version}</version>
<exclusions>
<exclusion>
<groupId>org.slf4j</groupId>
<artifactId>log4j-over-slf4j</artifactId>
</exclusion>
<exclusion>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-api</artifactId>
</exclusion>
</exclusions>
</dependency>
<!--storm-kafka 依赖-->
<dependency>
<groupId>org.apache.storm</groupId>
<artifactId>storm-kafka</artifactId>
<version>${storm.version}</version>
</dependency>
<!-- kafka 依赖-->
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka_2.11</artifactId>
<version>${kafka.version}</version>
<exclusions>
<exclusion>
<groupId>org.apache.zookeeper</groupId>
<artifactId>zookeeper</artifactId>
</exclusion>
<exclusion>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-log4j12</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>commons-io</groupId>
<artifactId>commons-io</artifactId>
<version>2.4</version>
</dependency>
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>${kafka.version}</version>
</dependency>
</dependencies>
坑二: input.getBinaryByField(“bytes”); 里面一定要写成bytes,这是上游kafkaSpout 传递过来,源码中也可以看到。
对应位置如下图
业务代码体现:
public void execute(Tuple input) {
try {
byte[] bytes = input.getBinaryByField(“bytes”);
String value = new String(bytes);
System.out.println(“value ” + value);
this.collector.ack(input);
} catch (Exception e) {
e.printStackTrace();
this.collector.fail(input);
}
}
坑三:本地测试是,一直接收不到kafkaSpout发送过来的消息:
1)问题是已经连接上了kafka,也读到了对应的分区
2)推断可能是上游的数据发送不过来—》 可能原因shuffleGrouping时 的参数传递错误。
3)最终发现 原来就是SPOUT_ID 获取错了
应该将下面代码中的
String SPOUT_ID = kafkaSpout.getClass().getSimpleName()
替换成
String SPOUT_ID = KafkaSpout.class.getSimpleName();
即可。
// kafka 使用的zk hosts
BrokerHosts hosts = new ZkHosts("hadoop000:2181");
// 指定的kafak的一个根目录,存储的是kafkaSpout读取数据的位置信息(offset)
SpoutConfig spoutConfig = new SpoutConfig(hosts, topicName, "/" + topicName, UUID.randomUUID().toString());
spoutConfig.startOffsetTime = kafka.api.OffsetRequest.LatestTime(); // 设置从最近的消息开始消费
KafkaSpout kafkaSpout = new KafkaSpout(spoutConfig);
String SPOUT_ID = kafkaSpout.getClass().getSimpleName();
builder.setSpout(SPOUT_ID, kafkaSpout);
String BOLD_ID = LogProcessBolt.class.getSimpleName();
builder.setBolt(BOLD_ID, new LogProcessBolt()).shuffleGrouping(SPOUT_ID);
LocalCluster cluster = new LocalCluster();
cluster.submitTopology("StormToKafkaTopology", new Config(), builder.createTopology());
坑四: storm重复消费kafak数据:
官网解释如下:
代码中配置为如下即可
SpoutConfig spoutConfig = new SpoutConfig(hosts, topicName, "/" + topicName, UUID.randomUUID().toString());
spoutConfig.startOffsetTime = kafka.api.OffsetRequest.LatestTime(); // 设置从最近的消息开始消费
坑五: storm消费数据,ack,fail这些比配,如果出现问题还可以重试
关注公众号
低调大师中文资讯倾力打造互联网数据资讯、行业资源、电子商务、移动互联网、网络营销平台。
持续更新报道IT业界、互联网、市场资讯、驱动更新,是最及时权威的产业资讯及硬件资讯报道平台。
转载内容版权归作者及来源网站所有,本站原创内容转载请注明来源。
-
上一篇
云数据库产品月刊·4月刊
云数据库产品月刊旨在将数据库行业最新动态传播给广大数据库用户,月刊每月初发布,重点解读数据库行业最新态势、阿里云数据库最新上线功能以及典型数据库行业应用案例,干货多多,建议您订阅关注,让您时刻走在数据库行业前沿。 【本月重点】 1.阿里云新一代自研关系型云数据库POLARDB商用上线。 2.云数据库MySQL 5.7 高可用版发布,支持SSD本地盘、SSD云盘。 3.云数据库SQLServer2016&2012 发布双机高可用版,中国Region全开。 4.云数据库Redis混合存储规格系列公测开放。 5.云数据库MongoDB发布秒级监控功能。 【产品迭代详情】 关系型云数据库POLARDB、MySQL、SQLServer、PostgreSQ
-
下一篇
Spark2.1.0之初体验
版权声明:本文为博主原创文章,未经博主允许不得转载。 https://blog.csdn.net/beliefer/article/details/80042366 在《Spark2.1.0之运行环境准备》一文中,已经介绍了如何准备好基本的Spark运行环境,现在是时候实践一下,以便于在使用过程中提升读者对于Spark最直接的感触!本文通过Spark的基本使用,让读者对Spark能有初步的认识,便于引导读者逐步深入学习。 运行spark-shell 在《Spark2.1.0之运行环境准备》一文曾经简单运行了spark-shell,并用下图进行了展示(此处再次展示此图)。 图1执行spark-shell进入Scala命令行 图1中显示了很多信息,这里进行一些说明: 在安装完Spark 2.1.0后,如果没有明确指定log4j的配置,那么Spark会使用core模块的org/apache/spark/目录下的log4j-defaults.properties作为log4j的默认配置。log4j-defaults.properties指定的Spark日志级别为WARN。用户可以到S...
相关文章
文章评论
共有0条评论来说两句吧...
文章二维码
点击排行
推荐阅读
最新文章
- CentOS7设置SWAP分区,小内存服务器的救世主
- CentOS6,7,8上安装Nginx,支持https2.0的开启
- CentOS7编译安装Gcc9.2.0,解决mysql等软件编译问题
- Linux系统CentOS6、CentOS7手动修改IP地址
- Dcoker安装(在线仓库),最新的服务器搭配容器使用
- Docker快速安装Oracle11G,搭建oracle11g学习环境
- Docker安装Oracle12C,快速搭建Oracle学习环境
- Docker容器配置,解决镜像无法拉取问题
- Docker使用Oracle官方镜像安装(12C,18C,19C)
- MySQL数据库中FOR UPDATE的使用

微信收款码
支付宝收款码