Spark学习之Spark Streaming(9)
Spark学习之Spark Streaming(9)
1. Spark Streaming允许用户使用一套和批处理非常接近的API来编写流式计算应用,这就可以大量重用批处理应用的技术甚至代码。
2. Spark Streaming使用离散化(discretized steam)作为抽象表示,叫做DStream。DStream是随时间推移而收到的数据的序列。
3. DSteam支持两种操作:转换操作(transformation),会生成一个新的DStream;另一种是输出操作(output operation),可以把数据写入到外部系统中。
4. Spark Stream的简单例子
需求:使用maven或者sbt打包编译出来独立应用的形式运行。从一台服务器的7777端口接受一个以换行符分隔的多行文本,要从中筛选出包含单词error的行,并打印出来。
//Maven 索引
groupID = org.apache.spark
artifactID = spark-steaming_2.10
version = 1.2.0
//Scala流计算import声明
import org.apache.spark.streaming.StreamingContext
import org.apache.spark.streaming.StreamingContext._
import org.apache.spark.streaming.dstream.DStream
import org.apache.spark.streaming.Duration
import org.apache.spark.streaming.Seconds
//用Scala进行流式筛选,打印包含“error”的行
//从SparkConf创建StreamingContext并指定1秒钟的处理
val ssc = new SteamingContext(conf,Seconds(1))
//连接到本地机器7777端口上后,使用收到的数据创建DStream
val lines = ssc.socketTextStream("localhost",7777)
//从DStream中筛选出包含字符串“error”的行
val errorLines = lines.filter(_.contains("error"))
// 打印拥有“error”的行
errorLines.print()
//用Scala进行流式筛选,打印出包含“error”的行
ssc.start()
//等待作业完成
ssc.awaitTermination()
注意:一个Streaming context只能执行一次,所以只有在配置好所有DStream以及所需要的输出操作之后才启动。
最后:在Linux/Mac操作系统上运行流计算应用并提供数据
$spark-submit --class com.oreilly.learningsparkexamples.scala.streamingLogInput \
$ASSEMBLY_JAR local[4]
$ nc localhost 7777 # 使你可以键入输入的行来发送给服务器
Windows nc命令对应ncat
5. DStream 的转化操作可以分为两种:无状态(stateless)转化操作和有状态(stateful)转化操作。
5.1无状态转化操作中,每个批次的处理不依赖于之前批次的数据。
例如map()、filter()、reduceByKey()等。
5.2有状态转化操作中,需要使用之前批次的数据或者中间结果来计算当前批次的数据。
有状态转化操作包括基于滑动窗口的转化操作和追踪状态变化的转化操作。
6. 输出操作
输出操作指定了对数据经转化操作得到的数据所要执行的操作(例如把结果输出推入外部数据库或输出到屏幕上)。
7. 输入源包括:核心数据源、附加数据源、多数据源与集群规模。
8. Steaming用户界面http://localhost:4040可以查看运行详细信息。

低调大师中文资讯倾力打造互联网数据资讯、行业资源、电子商务、移动互联网、网络营销平台。
持续更新报道IT业界、互联网、市场资讯、驱动更新,是最及时权威的产业资讯及硬件资讯报道平台。
转载内容版权归作者及来源网站所有,本站原创内容转载请注明来源。
-
上一篇
Spark学习之Spark SQL(8)
Spark学习之Spark SQL(8) 1. Spark用来操作结构化和半结构化数据的接口——Spark SQL、 2. Spark SQL的三大功能 2.1 Spark SQL可以从各种结构化数据(例如JSON、Hive、Parquet等)中读取数据。 2.2 Spark SQL不仅支持在Spark程序内使用SQL语句进行查询,也支持从类似商业智能软件Tableau这样的外部工具中通过标准数据库连接器(JDBC/ODBC)连接Spark SQL进行查询。 2.3 当在Spark程序内使用Spark SQL时,Spark SQL支持SQ与常规的Python/Java/Scala代码高度整合,包括连接RDD与SQL表、公开的自定义SQL函数接口等。 3. SchemaRDD(1.3版本后为DataFrame)是存放Row对象的RDD,每个Row对象代表一行记录。SchemaRDD还包含记录的结果信息(即数据字段)。 4. 连接Spark SQL 带有Hive支持的Spark SQL的Maven索引 groupID =org.apache.spark artifactID = spark...
-
下一篇
Spark学习之基于MLlib的机器学习
Spark学习之基于MLlib的机器学习 1. 机器学习算法尝试根据训练数据(training data)使得表示算法行为的数学目标最大化,并以此来进行预测或作出决定。 2. MLlib完成文本分类任务步骤: (1)首先用字符串RDD来表示你的消息 (2)运行MLlib中的一个特征提取(feature extraction)算法来把文本数据转换为数值特征(适合机器学习算法处理);该操作会返回一个向量RDD。 (3)对向量RDD调用分类算法(比如逻辑回归);这步会返回一个模型对象,可以使用该对象对新的数据点进行分类。 (4)使用MLlib的评估函数在测试数据集上评估模型。 3. MLlib包含的主要数据类型: Vector LabeledPoint Rating 各种Model类 4. 操作向量 向量有两种:稠密向量和稀疏向量 稠密向量:把所有维度的值存放在一个浮点数数组中 稀疏向量:只把各维度的非零值存储下来 优先考虑稀疏向量,也是关键的优化手段 创建向量的方式在各语言上有一些细微差别 5. 算法 特征提取 TF-IDF(词频——逆文档频率)使用用来从文本文档(例如网页)中生成特向量的...
相关文章
文章评论
共有0条评论来说两句吧...
文章二维码
点击排行
推荐阅读
最新文章
- Docker安装Oracle12C,快速搭建Oracle学习环境
- CentOS7,CentOS8安装Elasticsearch6.8.6
- SpringBoot2初体验,简单认识spring boot2并且搭建基础工程
- CentOS8编译安装MySQL8.0.19
- MySQL数据库在高并发下的优化方案
- SpringBoot2配置默认Tomcat设置,开启更多高级功能
- CentOS6,7,8上安装Nginx,支持https2.0的开启
- MySQL8.0.19开启GTID主从同步CentOS8
- Springboot2将连接池hikari替换为druid,体验最强大的数据库连接池
- CentOS7,8上快速安装Gitea,搭建Git服务器