Spark Streaming连接Kafka入门教程
我的原创地址:https://dongkelun.com/2018/05/17/sparkKafka/
前言
首先要安装好kafka,这里不做kafka安装的介绍,本文是Spark Streaming入门教程,只是简单的介绍如何利用spark 连接kafka,并消费数据,由于博主也是才学,所以其中代码以实现为主,可能并不是最好的实现方式。
1、对应依赖
根据kafka版本选择对应的依赖,我的kafka版本为0.10.1,spark版本2.2.1,然后在maven仓库找到对应的依赖。
(Kafka项目在版本0.8和0.10之间引入了新的消费者API,因此有两个独立的相应Spark Streaming软件包可用)
<dependency> <groupId>org.apache.spark</groupId> <artifactId>spark-streaming-kafka-0-10_2.11</artifactId> <version>2.2.1</version> </dependency>
我用的是sbt,对应的依赖:
"org.apache.spark" % "spark-streaming-kafka-0-10_2.11" % "2.2.1"
2、下载依赖
在命令行执行
sbt eclipse
(我用的是eclipse sbt,具体可看我的其他博客,具体命令根据自己的实际情况)
3、创建topic
创建测试用topic top1
bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic top1
4、启动程序
下好依赖之后,根据官方文档提供的示例进行代码测试
下面的代码示例,主要实现spark 连接kafka,并将接收的数据打印出来,没有实现复杂的功能。
package com.dkl.leanring.spark.kafka import org.apache.spark._ import org.apache.spark.streaming._ import org.apache.spark.streaming.StreamingContext._ import org.apache.kafka.common.serialization.StringDeserializer import org.apache.spark.streaming.kafka010._ import org.apache.spark.streaming.kafka010.LocationStrategies.PreferConsistent import org.apache.spark.streaming.kafka010.ConsumerStrategies.Subscribe object KafaDemo { def main(args: Array[String]): Unit = { val conf = new SparkConf().setMaster("local[2]").setAppName("KafaDemo") //刷新时间设置为1秒 val ssc = new StreamingContext(conf, Seconds(1)) //消费者配置 val kafkaParams = Map[String, Object]( "bootstrap.servers" -> "10.180.29.180:6667", //kafka集群地址 "key.deserializer" -> classOf[StringDeserializer], "value.deserializer" -> classOf[StringDeserializer], "group.id" -> "group", //消费者组名 "auto.offset.reset" -> "latest", //latest自动重置偏移量为最新的偏移量 "enable.auto.commit" -> (false: java.lang.Boolean)) //如果是true,则这个消费者的偏移量会在后台自动提交 val topics = Array("top1") //消费主题,可以同时消费多个 //创建DStream,返回接收到的输入数据 val stream = KafkaUtils.createDirectStream[String, String]( ssc, PreferConsistent, Subscribe[String, String](topics, kafkaParams)) //打印获取到的数据,因为1秒刷新一次,所以数据长度大于0时才打印 stream.foreachRDD(f => { if (f.count > 0) f.foreach(f => println(f.value())) }) ssc.start(); ssc.awaitTermination(); } }
启动上面的程序(本地eclipse启动即可)
需要记住的要点
当在本地运行一个 Spark Streaming 程序的时候,不要使用 “local” 或者 “local[1]” 作为 master 的 URL 。这两种方法中的任何一个都意味着只有一个线程将用于运行本地任务。如果你正在使用一个基于接收器(receiver)的输入离散流(input DStream)(例如, sockets ,Kafka ,Flume 等),则该单独的线程将用于运行接收器(receiver),而没有留下任何的线程用于处理接收到的数据。因此,在本地运行时,总是用 “local[n]” 作为 master URL ,其中的 n > 运行接收器的数量。
将逻辑扩展到集群上去运行,分配给 Spark Streaming 应用程序的内核(core)的内核数必须大于接收器(receiver)的数量。否则系统将接收数据,但是无法处理它。
我一开始没有看到官网提醒的这一点,将示例中的local[2]改为local,现在已经在代码里改回local[2]了,但是下面的截图没有替换,注意下。
5、发送消息
运行producer
bin/kafka-console-producer.sh --broker-list localhost:6667 --topic top1
然后依次发送下面几个消息
hadoop spark kafka 中文测试
6、结果
然后在eclipse console就可以看到对应的数据了。
hadoop spark kafka 中文测试
为了直观的展示和理解,附上截图:
发送消息
结果
低调大师中文资讯倾力打造互联网数据资讯、行业资源、电子商务、移动互联网、网络营销平台。
持续更新报道IT业界、互联网、市场资讯、驱动更新,是最及时权威的产业资讯及硬件资讯报道平台。
转载内容版权归作者及来源网站所有,本站原创内容转载请注明来源。
- 上一篇
使用cu管家管理Maxcompute多项目
使用cu管家管理Maxcompute多项目 我们在maxcompute的实际使用中,采用了多项目的方式,具体可参看之前写的一篇博客:MaxCompute多团队协同数据开发项目管理最佳实践虽然有很多优点,但是实际使用过程中还需要maxcompute管家来配合管理这些项目: 1. 各个项目资源监控 能够一目了然的看到每个项目使用资源的情况 每个时段计算资源和存储资源的使用情况,有利于我们发现一些性能比较差的任务 如图红框的那个时段可能会有性能较差的任务 2. 错峰运行(系统状态) 会有这样的一些任务:运行时间比较长,消耗资源比较大,但是产出时间不需要那么及时,这时候我们可以将这些任务定时在自己资源相对比较空闲的时段运行 如图:类似的任务可放在17点半以后执行 3. 计算资源分配和隔离(Quota设置) 对于计算资源使用比较大的用户来说,显然购买预付费资源要划算很多,如果所有项目都平等的使用这个资源池,则难免会发生资源争抢的情况,而有些重要的业务线(项目),需要优先分配更多的计算资源; 团队开发人员的水平参差不齐,一个性能比较差的查询很可能影响到多个项目的任务,我们目前是每个项目一个quot...
- 下一篇
spark ML之特征处理(1)
版权声明:本文由董可伦首发于https://dongkelun.com,非商业转载请注明作者及原创出处。商业转载请联系作者本人。 https://blog.csdn.net/dkl12/article/details/80366311 我的原创地址:https://dongkelun.com/2018/05/17/sparkMlFeatureProcessing1/ 前言 最近在学习总结机器学习常用算法,在看spark机器学习决策树的官方示例时,发现用到了几个特征处理的类,之前没学习过,所以查了一下,感觉spark在特征处理方面的类还是挺多的,所以准备总结记录一下相关的用法,首先总结一下决策树中用到的几种。 1、VectorIndexer 根据源码注释,VectorIndexer是用于在“向量”的数据集中索引分类特征列的类(Class for indexing categorical feature columns in a dataset of Vector),这看起来不太好理解,直接看用法,举例说明就好了。 1.1 数据 我们用普通的数据格式即可: data1.txt 1,-1.0...
相关文章
文章评论
共有0条评论来说两句吧...
文章二维码
点击排行
推荐阅读
最新文章
- CentOS6,7,8上安装Nginx,支持https2.0的开启
- SpringBoot2整合Redis,开启缓存,提高访问速度
- CentOS7编译安装Cmake3.16.3,解决mysql等软件编译问题
- CentOS7编译安装Gcc9.2.0,解决mysql等软件编译问题
- Docker快速安装Oracle11G,搭建oracle11g学习环境
- CentOS8编译安装MySQL8.0.19
- MySQL8.0.19开启GTID主从同步CentOS8
- SpringBoot2全家桶,快速入门学习开发网站教程
- CentOS8,CentOS7,CentOS6编译安装Redis5.0.7
- CentOS7,CentOS8安装Elasticsearch6.8.6