sparkStreaming与Kafka整合
createStream那几个参数折腾了我好久。。网上都是一带而过,最终才搞懂..关于sparkStreaming的还是太少,最终尝试成功。。。
首先启动zookeeper
./bin/zookeeper-server-start.sh config/zookeeper.properties &
启动kafka
bin/kafka-server-start.sh config/server.properties &
创建一个topic
./kafka-topics.sh --create --zookeeper 192.168.77.133:2181 \ --replication-factor 1\ --partitions 1\ --topic yangsy
随后启动一个终端为9092的提供者
./kafka-console-producer.sh --broker-list 192.168.77.133:9092 --topic yangsy
代码如下:
import org.apache.spark.SparkConf import org.apache.spark.storage.StorageLevel import org.apache.spark.streaming.kafka.KafkaUtils import org.apache.spark.streaming.{Seconds, StreamingContext} /** * Created by root on 11/28/15. */ object SparkStreaming { def main(args: Array[String]) { /* val sparkConf = new SparkConf().setMaster("local").setAppName("cocapp") .set("spark.executor.memory", "1g") val sc = new StreamingContext(sparkConf, Seconds(20)) val lines = sc.textFileStream("/usr/local/spark-1.4.0-bin-2.5.0-cdh5.2.1/streaming") val words = lines.flatMap(_.split(" ")) val wordCounts = words.map(x => (x, 1)).reduceByKey(_ + _) wordCounts.print() sc.start() sc.awaitTermination()*/ //zookeeper的地址 val zkQuorum = "192.168.77.133:2181"
//group_id可以通过kafka的conf下的consumer.properties中查找 val group ="test-consumer-group"
//创建的topic 可以是一个或多个 val topics = "yangsy" val sparkConf = new SparkConf().setMaster("local").setAppName("cocapp").set("spark.executor.memory", "1g") val sc = new StreamingContext(sparkConf, Seconds(2)) val numThreads = 2 val topicpMap = topics.split(",").map((_, numThreads.toInt)).toMap
//StorageLevel.MEMORY_AND_DISK_SER为存储的级别 val lines = KafkaUtils.createStream(sc, zkQuorum, group, topicpMap, StorageLevel.MEMORY_AND_DISK_SER).map(_._2)
//对于收到的消息进行wordcount val words = lines.flatMap(_.split(" ")) val pairs = words.map(word => (word, 1)) val wordCounts = pairs.reduceByKey(_ + _) wordCounts.print() sc.start() sc.awaitTermination() } }
随后再你启动的kafka的生产者终端随便输入消息,我这里设置的参数是每2秒获取一次,统计一次单词个数~OK~

低调大师中文资讯倾力打造互联网数据资讯、行业资源、电子商务、移动互联网、网络营销平台。
持续更新报道IT业界、互联网、市场资讯、驱动更新,是最及时权威的产业资讯及硬件资讯报道平台。
转载内容版权归作者及来源网站所有,本站原创内容转载请注明来源。
- 上一篇
spark RDD transformation与action函数整理
1.创建RDD val lines = sc.parallelize(List("pandas","i like pandas")) 2.加载本地文件到RDD val linesRDD = sc.textFile("yangsy.txt") 3.过滤 filter 需要注意的是 filter并不会在原有RDD上过滤,而是根据filter的内容重新创建了一个RDD val spark = linesRDD.filter(line => line.contains("damowang")) 4.count() 也是aciton操作 由于spark为懒加载 之前的语句不管对错其实都没执行 只有到调用action 如count() first() foreach()等操作的时候 才会真正去执行 spark.count() 5.foreach(println) 输出查看数据 (使用take可获取少量数据,如果工程项目中为DataFrame,可以调用show(1)) 这里提到一个东西,就是调用collect()函数 这个函数会将所有数据加载到driver端,一般数据量巨大的时候还是不要调用c...
- 下一篇
MapReduce InputFormat之FileInputFormat
一:简单认识InputFormat类 InputFormat主要用于描述输入数据的格式,提供了以下两个功能: 1)、数据切分,按照某个策略将输入数据且分成若干个split,以便确定Map Task的个数即Mapper的个数,在MapReduce框架中,一个split就意味着需要一个Map Task; 2)为Mapper提供输入数据,即给定一个split,(使用其中的RecordReader对象)将之解析为一个个的key/value键值对。 下面我们先来看以下1.0版本中的老的InputFormat接口: Java代码 publicinterfaceInputFormat<K,V>{ //获取所有的split分片 publicInputSplit[]getSplits(JobConfjob,intnumSplits)throwsIOException; //获取读取split的RecordReader对象,实际上是由RecordReader对象将 //split解析成一个个的key/value对儿 publicRecordReader<K,V>getRec...
相关文章
文章评论
共有0条评论来说两句吧...
文章二维码
点击排行
推荐阅读
最新文章
- CentOS7安装Docker,走上虚拟化容器引擎之路
- SpringBoot2全家桶,快速入门学习开发网站教程
- MySQL8.0.19开启GTID主从同步CentOS8
- CentOS8,CentOS7,CentOS6编译安装Redis5.0.7
- CentOS7,CentOS8安装Elasticsearch6.8.6
- CentOS8编译安装MySQL8.0.19
- SpringBoot2整合MyBatis,连接MySql数据库做增删改查操作
- Springboot2将连接池hikari替换为druid,体验最强大的数据库连接池
- 设置Eclipse缩进为4个空格,增强代码规范
- SpringBoot2整合Thymeleaf,官方推荐html解决方案