Spark-Spark Streaming-广告点击的在线黑名单过滤
任务
广告点击的在线黑名单过滤
使用 nc -lk 9999
在数据发送端口输入若干数据,比如:
1375864674543 Tom 1375864674553 Spy 1375864674571 Andy 1375864688436 Cheater 1375864784240 Kelvin 1375864853892 Steven 1375864979347 John
代码
import org.apache.spark.SparkConf import org.apache.spark.streaming.StreamingContext import org.apache.spark.streaming.Seconds object OnlineBlackListFilter { def main(args: Array[String]){ /** * 第1步:创建Spark的配置对象SparkConf,设置Spark程序的运行时的配置信息, * 为local,则代表Spark程序在本地运行,特别适合于机器配置条件非常差(例如 */ // 创建SparkConf对象 val conf = new SparkConf() // 设置应用程序的名称,在程序运行的监控界面可以看到名称 conf.setAppName("OnlineBlackListFilter") // 此时,程序在Spark集群 conf.setMaster("spark://Master:7077") val ssc = new StreamingContext(conf, Seconds(30)) /** * 黑名单数据准备,实际上黑名单一般都是动态的,例如在Redis或者数据库中, * 黑名单的生成往往有复杂的业务逻辑,具体情况算法不同, * 但是在Spark Streaming进行处理的时候每次都能够访问完整的信息。 */ val blackList = Array(("Spy", true),("Cheater", true)) val blackListRDD = ssc.sparkContext.parallelize(blackList, 8) val adsClickStream = ssc.socketTextStream("Master", 9999) /** * 此处模拟的广告点击的每条数据的格式为:time、name * 此处map操作的结果是name、(time,name)的格式 */ val adsClickStreamFormatted = adsClickStream.map { ads => (ads.split(" ")(1), ads) } adsClickStreamFormatted.transform(userClickRDD => { // 通过leftOuterJoin操作既保留了左侧用户广告点击内容的RDD的所有内容, // 又获得了相应点击内容是否在黑名单中 val joinedBlackListRDD = userClickRDD.leftOuterJoin(blackListRDD) /** * 进行filter过滤的时候,其输入元素是一个Tuple:(name,((time,name), boolean)) * 其中第一个元素是黑名单的名称,第二元素的第二个元素是进行leftOuterJoin的时候是否存在的值。 * 如果存在的话,表明当前广告点击是黑名单,需要过滤掉,否则的话是有效点击内容; */ val validClicked = joinedBlackListRDD.filter(joinedItem => { if(joinedItem._2._2.getOrElse(false)) { false } else { true } }) validClicked.map(validClick => {validClick._2._1}) }).print /** * 计算后的有效数据一般都会写入Kafka中,下游的计费系统会从kafka中pull到有效数据进行计费 */ ssc.start() ssc.awaitTermination() } } } **注:** //把程序的Batch Interval设置从30秒改成300秒: val ssc = new StreamingContext(conf, Seconds(300))
用spark-submit运行前面生成的jar包
/usr/local/spark/spark-1.6.0-bin-hadoop2.6/bin/spark-submit --class com.test.spark.sparkstreaming.Filter --master spark://Master:7077 /root/Documents/SparkApps/Filter.jar
分析
- 5个job
Job 0:不体现业务逻辑代码,对后面计算的负载均衡的考虑
Job 0包含有Stage 0、Stage 1。
比如Stage 1,其中的Aggregated Metrics by Executor部分:
Stage在所有Executor上都存在.
Job 1:运行时间比较长,耗时1.5分钟
Stage 2,Aggregated Metrics By Executor部分:
Stage 2只在Worker上的一个Executor执行,而且执行了1.5分钟(4个worker),从业务处理的角度看,我们发送的那么一点数据,没有必要去启动一个运行1.5分钟的任务吧。那这个任务是做什么呢? 从DAG Visualization部分,就知道此Job实际就是启动了一个接收数据的Receiver:
Receiver是通过一个Job来启动的。那肯定有一个Action来触发它
Tasks部分:
只有一个Worker运行此Job,是用于接收数据。
Locality Level是PROCESS_LOCAL,原来是内存节点。所以,默认情况下,数据接收不会使用磁盘,而是直接使用内存中的数据。Spark Streaming应用程序启动后,自己会启动一些Job。默认启动了一个Job来接收数据,为后续处理做准备。一个Spark应用程序中可以启动很多Job,而这些不同的Job之间可以相互配合。Job 2:看Details可以发现有我们程序的主要业务逻辑,体现在Stag 3、Stag4、Stag 5中
Stag3、Stage4的详情,2个Stage都是用4个Executor执行的,所有数据处理是在4台机器上进行的。
:
Spark Core处理的每一步都是基于RDD的,RDD之间有依赖关系。上图中的RDD的DAG显示的是有3个Action,会触发3个job,RDD自下向上依 赖,RDD产生job就会具体的执行。从DSteam Graph中可以看到,DStream的逻辑与RDD基本一致,它就是在RDD的基础上加上了时间的依赖。RDD的DAG又可以叫空间维度,也就是说整个 Spark Streaming多了一个时间维度,也可以成为时空维度。
从这个角度来讲,可以将Spark Streaming放在坐标系中。其中Y轴就是对RDD的操作,RDD的依赖关系构成了整个job的逻辑,而X轴就是时间。随着时间的流逝,固定的时间间 隔(Batch Interval)就会生成一个job实例,进而在集群中运行。
对于Spark Streaming来说,当不同的数据来源的数据流进来的时候,基于固定的时间间隔,会形成一系列固定不变的数据集或event集合(例如来自flume 和kafka)。而这正好与RDD基于固定的数据集不谋而合,事实上,由DStream基于固定的时间间隔行程的RDD Graph正是基于某一个batch的数据集的。
从上图中可以看出,在每一个batch上,空间维度的RDD依赖关系都是一样 的,不同的是这个五个batch流入的数据规模和内容不一样,所以说生成的是不同的RDD依赖关系的实例,所以说RDD的Graph脱胎于DStream 的Graph,也就是说DStream就是RDD的模版,不同的时间间隔,生成不同的RDD Graph实例。
从Spark Streaming本身出发:
1.需要RDD DAG的生成模版:DStream Graph
2需要基于Timeline的job控制器
3需要inputStreamings和outputStreamings,代表数据的输入和输出
4具体的job运行在Spark Cluster之上,由于streaming不管集群是否可以消化掉,此时系统容错就至关重要
5事务处理,我们希望流进来的数据一定会被处理,而且只处理一次。在处理出现崩溃的情况下如何保证Exactly once的事务语意
从这里可以看出,DStream就是Spark Streaming的核心,就想Spark Core的核心是RDD,它也有dependency和compute。更为关键的是下面的代码:
这是一个HashMap,以时间为key,以RDD为value,这也正应证了随着时间流逝,不断的生成RDD,产生依赖关系的job,并通过jobScheduler在集群上运行。再次验证了DStream就是RDD的模版。
DStream可以说是逻辑级别的,RDD就是物理级别的,DStream所表达的最终都是通过RDD的转化实现的。前者是更高级别的抽象,后者是底层的实现。DStream实际上就是在时间维度上对RDD集合的封装,DStream与RDD的关系就是随着时间流逝不断的产生RDD,对DStream的操作就是在固定时间上操作RDD。
总结:
在 空间维度上的业务逻辑作用于DStream,随着时间的流逝,每个Batch Interval形成了具体的数据集,产生了RDD,对RDD进行transform操作,进而形成了RDD的依赖关系RDD DAG,形成job。然后jobScheduler根据时间调度,基于RDD的依赖关系,把作业发布到Spark Cluster上去运行,不断的产生Spark作业。

低调大师中文资讯倾力打造互联网数据资讯、行业资源、电子商务、移动互联网、网络营销平台。
持续更新报道IT业界、互联网、市场资讯、驱动更新,是最及时权威的产业资讯及硬件资讯报道平台。
转载内容版权归作者及来源网站所有,本站原创内容转载请注明来源。
- 上一篇
Spark-Caching /Checkpointing
功能: cacheing和checkpointing这2种操作是都是用来防止rdd(弹性分布式数据集)每次被引用时被重复计算带来的时间和空间上不必要的损失。 区别: Caching cache 机制保证了需要访问重复数据的应用(如迭代型算法和交互式应用)可以运行的更快。有多种级别的持久化策略让开发者选择,使开发者能够对空间和计算成本进行权衡,同时能指定out of memory时对rdd的操作(缓存在内存或者磁盘,并且可以指定在内存不够的情况下按照FIFO的策略选取一部分block交换到磁盘来产生空余空间)。因此Spark不但可以对rdd重复计算还能在节点发生故障时重新计算丢失的分区。最后,被缓存的rdd存在于一个running的应用的生命周期内,如果这个应用终止了,那么缓存的rdd也会同时被删除。 Checkpointing checkpointing把rdd存储到一个可靠的存储系统(例如HDFS,S3)。checkpoint一个rdd有点类似于Hadoop中把中间计算结果存储到磁盘,损失部分执行性能来获得更好的从运行过程中出现failures时recover的能力。因为rdd是ch...
- 下一篇
Spark-SparkSQL深入学习系列一(转自OopsOutOfMemory)
/**Spark SQL源码分析系列文章*/ 自从去年SparkSubmit 2013 Michael Armbrust分享了他的Catalyst,到至今1年多了,Spark SQL的贡献者从几人到了几十人,而且发展速度异常迅猛,究其原因,个人认为有以下2点: 1、整合:将SQL类型的查询语言整合到 Spark 的核心RDD概念里。这样可以应用于多种任务,流处理,批处理,包括机器学习里都可以引入Sql。 2、效率:因为Shark受到hive的编程模型限制,无法再继续优化来适应Spark模型里。 前一段时间测试过Shark,并且对Spark SQL也进行了一些测试,但是还是忍不住对Spark SQL一探究竟,就从源代码的角度来看一下Spark SQL的核心执行流程吧。 一、引子 先来看一段简单的Spark SQL程序: [java] view plain copy 1.valsqlContext=neworg.apache.spark.sql.SQLContext(sc) 2.importsqlContext._ 3.caseclassPerson(name:String...
相关文章
文章评论
共有0条评论来说两句吧...
文章二维码
点击排行
推荐阅读
最新文章
- MySQL8.0.19开启GTID主从同步CentOS8
- CentOS8安装Docker,最新的服务器搭配容器使用
- Jdk安装(Linux,MacOS,Windows),包含三大操作系统的最全安装
- Hadoop3单机部署,实现最简伪集群
- SpringBoot2配置默认Tomcat设置,开启更多高级功能
- CentOS8,CentOS7,CentOS6编译安装Redis5.0.7
- CentOS6,7,8上安装Nginx,支持https2.0的开启
- CentOS关闭SELinux安全模块
- Linux系统CentOS6、CentOS7手动修改IP地址
- SpringBoot2整合MyBatis,连接MySql数据库做增删改查操作