Spark(十) -- Spark Streaming API编程
本文测试的Spark版本是1.3.1
Spark Streaming编程模型:
第一步:
需要一个StreamingContext对象,该对象是Spark Streaming操作的入口 ,而构建一个StreamingContext对象需要两个参数:
1、SparkConf对象:该对象是配置Spark 程序设置的,例如集群的Master节点,程序名等信息
2、Seconds对象:该对象设置了StreamingContext多久读取一次数据流
第二步:
构建好入口对象之后,直接调用该入口的方法读取各种不同方式传输过来的数据流,如:Socket,HDFS等方式。并会将数据转换成DStream对象进行统一操作
第三步:
DStream本身是一种RDD序列,Streaming接受数据流之后会进行切片,每个片都是一个RDD,而这些RDD最后都会包装到一个DStream对象中统一操作。在这个步骤中,进行对数据的业务处理
第四步:
调用入口对象的start和awaitTermination开始读取数据流
下面分别使用不同的Spark Streaming 处理方式完成WordCount单词计数
HDFS文件测试
object HDFSWordCount { def main(args: Array[String]) { //参数设置 if (args.length < 2) { System.err.println("Usgae : <spark master> <hdfs path>") System.exit(1) } //第一步:创建StreamingContext入口 val sparkConf = new SparkConf().setMaster(args(0)).setAppName("HDFSWordCount") val streaming = new StreamingContext(sparkConf,Seconds(10)) //第二步:调用textFileStream读取指定路径的文件 val data = streaming.textFileStream(args(1)) //第三步,数据业务处理 //使用flatMap将数据map之后的分切压成一个DStream val words = data.flatMap(_.split(" ")) val wordCount = words.map(x => (x,1)).reduceByKey(_+_) wordCount.print() //第四步 streaming.start() streaming.awaitTermination() }
Socket数据流测试
object NetworkWordCount { def main(args: Array[String]) { if (args.length < 3) { System.err.println("Usage : <spark master> <hostname> <port>") System.exit(1) } val sparkConf = new SparkConf().setMaster(args(0)).setAppName("NetworkWordCount") val streaming = new StreamingContext(sparkConf,Seconds(10)) //参数:1、主机名;2、端口号;3、存储级别 val data = streaming.socketTextStream(args(1),args(2).toInt,StorageLevel.MEMORY_AND_DISK_SER) val words = lines.flatMap(_.split(" ")) val wordCount = words.map(x => (x, 1)).reduceByKey(_ + _) wordCount.print() streaming.start() streaming.awaitTermination() }
可以看到,对于同一中业务处理逻辑来说,不同的数据来源只要调用不同的方法接收即可,转换成DStream之后的处理步骤是一模一样的
下面的代码时配合测试Socket数据的,使用java命令执行jar包,传入参数:1、端口号;2、产生数据的频率(毫秒)
即可在指定的端口上产生数据提供Spark Streaming接收
package Streaming import java.net.ServerSocket import java.io.PrintWriter object Logger { def generateContent(index: Int): String = { import scala.collection.mutable.ListBuffer val charList = ListBuffer[Char]() for (i <- 65 to 90) { charList += i.toChar } val charArray = charList.toArray charArray(index).toString() } def index = { import java.util.Random val ran = new Random ran.nextInt(7) } def main(args: Array[String]): Unit = { if (args.length != 2) { System.err.println("Usage:<port> <millisecond>") System.exit(1) } val listener = new ServerSocket(args(0).toInt) while (true) { val socket = listener.accept() new Thread() { override def run = { println("Get client connected from:" + socket.getInetAddress) val out = new PrintWriter(socket.getOutputStream(), true) while (true) { Thread.sleep(args(1).toLong) val content = generateContent(index) println(content) out.write(content + '\n') out.flush() } socket.close() } }.start() } } }
在上述的例子中,文中使用的是Seconds(10)
也就是说每10秒钟处理一次数据
第一个10秒处理的结果是不会影响到第二个10秒的
但是有时候我们需要进行汇通统计,要用到之前几个10秒阶段的数据怎么办?
这里要用到一个updateStateByKey方法,该方法会保存上次计算数据的状态,以供下次计算使用。
上代码:
object StatefulWordCount { def main(args: Array[String]) { if (args.length < 3) { System.err.println("Usage : <spark master> <hostname> <port>") System.exit(1) } //定义一个匿名函数,并赋值给updateFunc //该函数是updateStateByKey方法的参数,该方法要求传入一个匿名参数且参数格式为values:Seq[Int],state:Option[Int] //其中values是当前的数据,state表示之前的数据 //这个匿名函数的作用就是将各个10秒阶段的结果累加汇总 val updateFunc = (values:Seq[Int],state:Option[Int]) => { val now = values.foldLeft(0)(_+_) val old = state.getOrElse(0) Some(now + old) } val conf = new SparkConf().setAppName("StatefulWordCount").setMaster(args(0)) val streaming = new StreamingContext(conf, Seconds(10)) //checkpoint会将数据放在指定的路径上,这个操作是必须的,为了保护数据,如果不设置会报异常 streaming.checkpoint(".") val lines = streaming.socketTextStream(args(1), args(2).toInt, StorageLevel.MEMORY_AND_DISK_SER) val words = lines.flatMap(_.split(" ")) val wordDStream = words.map(x => (x, 1)) //在这里将updateFunc传入 val stateDStream = wordDStream.updateStateByKey(updateFunc) stateDStream.print() streaming.start() streaming.awaitTermination() }
在Spark Streaming中还有一个window的概念,即滑动窗体
下图是官方文档中给出的解释:
使用滑动窗体要设置两个指定参数:
1、窗体长度
2、滑动时间
例如,设置一个窗体长度为5,滑动时间为2,意味着,每2秒处理上一个5秒内的数据流
这样的处理可以应用在例如微博统计最热搜索词
每2秒钟统计一次过去5秒内的最热搜索词
统计最热搜索词实例代码:
object WindowWordCount { def main(args: Array[String]) { if (args.length < 3) { System.err.println("Usage : <spark master> <hostname> <port> <Streaming Seconds> <Window Seconds> <Slide Seconds>") System.exit(1) } val conf = new SparkConf().setAppName("WindowWordCount").setMaster(args(0)) val streaming = new StreamingContext(conf, Seconds(args(3).toInt)) //checkpoint会将数据放在指定的路径上,这个操作是必须的,为了保护数据,如果不设置会报异常 streaming.checkpoint(".") val lines = streaming.socketTextStream(args(1), args(2).toInt, StorageLevel.MEMORY_ONLY) val words = lines.flatMap(_.split(" ")) //map操作之后数据的格式为: //(a,1)(b,1)...(n,1)格式 //调用reduceByKeyAndWindow替代普通的reduceByKey //最后两个参数分别是窗体长度和滑动时间 val wordCount = words.map(x => (x, 1)).reduceByKeyAndWindow(_ + _, _ - _, Seconds(args(4).toInt), Seconds(args(5).toInt)) //对结果进行降序排序 //由于DStream本身不具备RDD的一些操作,调用transform方法可以让RDD的一些操作(例如sortByKey等)作用在其之上,返回的仍然是一个DStream对象 val sorted = wordCount.map { case (char, count) => (count, char) }.transform(_.sortByKey(false)).map { case (count, char) => (char, count) } sorted.print() streaming.start() streaming.awaitTermination() } }
reduceByKeyAndWindow有两种使用方法:
1、educeByKeyAndWindow(_ + _, Seconds(5),seconds(1))
2、reduceByKeyAndWindow(_ + , - _, Seconds(5),seconds(1))
二者的区别见下图:
第一种是简单粗暴的直接累加
而第二种方式就显得比较文雅和高效了
例如现在计算t+4的累积数据
第一种方式是,直接从t+…+(t+4)
第二种处理为,用已经计算好的(t+3)的数据加上(t+4)的数据,在减去(t-1)的数据,就可以得到和第一种方式一样的结果,但是中间复用了三个数据(t+1,t+2,t+3)
以上为Spark Streaming API的简单使用

低调大师中文资讯倾力打造互联网数据资讯、行业资源、电子商务、移动互联网、网络营销平台。
持续更新报道IT业界、互联网、市场资讯、驱动更新,是最及时权威的产业资讯及硬件资讯报道平台。
转载内容版权归作者及来源网站所有,本站原创内容转载请注明来源。
- 上一篇
Spark(九) -- SparkSQL API编程
版权声明:本文为博主原创文章,未经博主允许不得转载。 https://blog.csdn.net/qq1010885678/article/details/45957991 本文测试的Spark版本是1.3.1 Text文本文件测试 一个简单的person.txt文件内容为: JChubby,13 Looky,14 LL,15 分别是Name和Age 在Idea中新建Object,原始代码如下: object TextFile{ def main(args:Array[String]){ } } SparkSQL编程模型: 第一步: 需要一个SQLContext对象,该对象是SparkSQL操作的入口 而构建一个SQLContext对象需要一个SparkContext 第二步: 构建好入口对象之后,要引入隐式转换的方法,作用是将读取到的各种文件转换成DataFrame,DataFrame是SparkSQL上进行统一操作的数据类型 第三步: 根据数据的格式,构建一个样例类。作用是提供将读取到的各种各样的数据类型隐式转换成一个统一的数据格式,方便编程 第四步: 使用SQLContext对象...
- 下一篇
2015中国大数据的市场容量有多大?
新兴产业的出现和发展有两种基本模式。一种是需求导向型,实际应用中出现了明显的痛点,必须要解决,不然就有人一直痛。另一种是技术导向型,革命性的技术先出现,慢慢地新技术扩大了用户的想象空间,进而激发出新的需求。大数据从概念提出到今天形成一个完整的产业,基本上属于第二种模式。 Hadoop生态系统下的技术(包括 pig,hive,spark,storm,hbase等)是目前大数据业界中事实上的标准。但在hadoop从互联网产业走出之前,大数据本身还不能称之为一个“产业”,因为它没有形成足够大的规模。所以大数据并不是指数据量有多大,是GB,TB还是PB,这其实没有关系。真正意义上的大数据是指 hadoop体系技术从互联网行业被引入到其它行业,进而得到快速、广泛、多维度、多层次的大量普及应用。大数据之大,在于应用规模的大,而不是数据量的大。现在大数据的应用已经远远超越了互联网行业,包括公安、智慧城市、医疗、交通、教育、通信、游戏、服装、地产、旅游、保险、银行、证券、食品安全、海事、零售、气象等等--世界正快速进入全面数据服务的时代! 大数据产业发展最快的一个是美国,另一个就是中国。有关中国大数据...
相关文章
文章评论
共有0条评论来说两句吧...
文章二维码
点击排行
推荐阅读
最新文章
- Linux系统CentOS6、CentOS7手动修改IP地址
- Red5直播服务器,属于Java语言的直播服务器
- Docker安装Oracle12C,快速搭建Oracle学习环境
- SpringBoot2整合MyBatis,连接MySql数据库做增删改查操作
- Windows10,CentOS7,CentOS8安装MongoDB4.0.16
- CentOS7编译安装Cmake3.16.3,解决mysql等软件编译问题
- SpringBoot2更换Tomcat为Jetty,小型站点的福音
- CentOS7安装Docker,走上虚拟化容器引擎之路
- CentOS6,7,8上安装Nginx,支持https2.0的开启
- Windows10,CentOS7,CentOS8安装Nodejs环境