您现在的位置是:首页 > 文章详情

storm 读取不到对应的kafka数据

日期:2018-05-02点击:576

坑一:pom文件主要内容:注意里面 需要 使用 “exclusion”排除相关的依赖

 <properties> <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding> <maven.compiler.source>1.7</maven.compiler.source> <maven.compiler.target>1.7</maven.compiler.target> <storm.version>1.1.1</storm.version> <kafka.version>0.9.0.0</kafka.version> </properties> <dependencies> <!--storm-core依赖--> <dependency> <groupId>org.apache.storm</groupId> <artifactId>storm-core</artifactId> <version>${storm.version}</version> <exclusions> <exclusion> <groupId>org.slf4j</groupId> <artifactId>log4j-over-slf4j</artifactId> </exclusion> <exclusion> <groupId>org.slf4j</groupId> <artifactId>slf4j-api</artifactId> </exclusion> </exclusions> </dependency> <!--storm-kafka 依赖--> <dependency> <groupId>org.apache.storm</groupId> <artifactId>storm-kafka</artifactId> <version>${storm.version}</version> </dependency> <!-- kafka 依赖--> <dependency> <groupId>org.apache.kafka</groupId> <artifactId>kafka_2.11</artifactId> <version>${kafka.version}</version> <exclusions> <exclusion> <groupId>org.apache.zookeeper</groupId> <artifactId>zookeeper</artifactId> </exclusion> <exclusion> <groupId>org.slf4j</groupId> <artifactId>slf4j-log4j12</artifactId> </exclusion> </exclusions> </dependency> <dependency> <groupId>commons-io</groupId> <artifactId>commons-io</artifactId> <version>2.4</version> </dependency> <dependency> <groupId>org.apache.kafka</groupId> <artifactId>kafka-clients</artifactId> <version>${kafka.version}</version> </dependency> </dependencies> 

坑二: input.getBinaryByField(“bytes”); 里面一定要写成bytes,这是上游kafkaSpout 传递过来,源码中也可以看到。
对应位置如下图

这里写图片描述

业务代码体现:
public void execute(Tuple input) {
try {
byte[] bytes = input.getBinaryByField(“bytes”);
String value = new String(bytes);
System.out.println(“value ” + value);
this.collector.ack(input);
} catch (Exception e) {
e.printStackTrace();
this.collector.fail(input);
}
}

坑三:本地测试是,一直接收不到kafkaSpout发送过来的消息:
1)问题是已经连接上了kafka,也读到了对应的分区
2)推断可能是上游的数据发送不过来—》 可能原因shuffleGrouping时 的参数传递错误。
3)最终发现 原来就是SPOUT_ID 获取错了
应该将下面代码中的

 String SPOUT_ID = kafkaSpout.getClass().getSimpleName() 替换成 String SPOUT_ID = KafkaSpout.class.getSimpleName(); 即可。 // kafka 使用的zk hosts BrokerHosts hosts = new ZkHosts("hadoop000:2181"); // 指定的kafak的一个根目录,存储的是kafkaSpout读取数据的位置信息(offset) SpoutConfig spoutConfig = new SpoutConfig(hosts, topicName, "/" + topicName, UUID.randomUUID().toString()); spoutConfig.startOffsetTime = kafka.api.OffsetRequest.LatestTime(); // 设置从最近的消息开始消费 KafkaSpout kafkaSpout = new KafkaSpout(spoutConfig); String SPOUT_ID = kafkaSpout.getClass().getSimpleName(); builder.setSpout(SPOUT_ID, kafkaSpout); String BOLD_ID = LogProcessBolt.class.getSimpleName(); builder.setBolt(BOLD_ID, new LogProcessBolt()).shuffleGrouping(SPOUT_ID); LocalCluster cluster = new LocalCluster(); cluster.submitTopology("StormToKafkaTopology", new Config(), builder.createTopology()); 

坑四: storm重复消费kafak数据:

官网解释如下: 

这里写图片描述

代码中配置为如下即可 SpoutConfig spoutConfig = new SpoutConfig(hosts, topicName, "/" + topicName, UUID.randomUUID().toString()); spoutConfig.startOffsetTime = kafka.api.OffsetRequest.LatestTime(); // 设置从最近的消息开始消费 

坑五: storm消费数据,ack,fail这些比配,如果出现问题还可以重试

这里写图片描述

原文链接:https://yq.aliyun.com/articles/624489
关注公众号

低调大师中文资讯倾力打造互联网数据资讯、行业资源、电子商务、移动互联网、网络营销平台。

持续更新报道IT业界、互联网、市场资讯、驱动更新,是最及时权威的产业资讯及硬件资讯报道平台。

转载内容版权归作者及来源网站所有,本站原创内容转载请注明来源。

文章评论

共有0条评论来说两句吧...

文章二维码

扫描即可查看该文章

点击排行

推荐阅读

最新文章