Spark Streaming Crash 如何保证Exactly Once Semantics
前言
- 数据接收。我在用的过程中确实产生了问题。
- 应用的可靠性。因为SS是7*24小时运行的问题,我想知道如果它Crash了,会不会丢数据。
SS 自身可以做到at least once语义
CheckPoint 机制
org.apache.spark.streaming.Checkpoint
val master = ssc.sc.master val framework = ssc.sc.appName val jars = ssc.sc.jars val graph = ssc.graph val checkpointDir = ssc.checkpointDir val checkpointDuration = ssc.checkpointDurationval pendingTimes = ssc.scheduler.getPendingTimes().toArray val delaySeconds = MetadataCleaner.getDelaySeconds(ssc.conf) val sparkConfPairs = ssc.conf.getAll
其他的都比较容易理解,最重要的是 graph,该类全路径名是: org.apache.spark.streaming.DStreamGraph
private val inputStreams = new ArrayBuffer[InputDStream[_]]() private val outputStreams = new ArrayBuffer[DStream[_]]()
protected[streaming] override val checkpointData = new DirectKafkaInputDStreamCheckpointData
data.asInstanceOf[mutable.HashMap[Time, Array[OffsetRange.OffsetRangeTuple]]]
两个结论
- checkpoint 是非常高效的。没有涉及到实际数据的存储。一般大小只有几十K,因为只存了Kafka的偏移量等信息。
- checkpoint 采用的是序列化机制,尤其是DStreamGraph的引入,里面包含了可能如ForeachRDD等,而ForeachRDD里面的函数应该也会被序列化。如果采用了CheckPoint机制,而你的程序包做了做了变更,恢复后可能会有一定的问题。
- 产生jobs
- 成功则提交jobs 然后异步执行
- 失败则会发出一个失败的事件
- 无论成功或者失败,都会发出一个 DoCheckpoint 事件。
- 当任务运行完成后,还会再调用一次DoCheckpoint 事件。
- 我们没有处理一条数据
- 我们可能只处理了部分数据
- 我们处理了全部数据
业务需要做事务,保证 Exactly Once 语义
- 幂等操作
- 业务代码需要自身添加事物操作
dstream.foreachRDD { (rdd, time) => rdd.foreachPartition { partitionIterator => val partitionId = TaskContext.get.partitionId() val uniqueId = generateUniqueId(time.milliseconds, partitionId) // use this uniqueId to transactionally commit the data in partitionIterator } }
后话

低调大师中文资讯倾力打造互联网数据资讯、行业资源、电子商务、移动互联网、网络营销平台。
持续更新报道IT业界、互联网、市场资讯、驱动更新,是最及时权威的产业资讯及硬件资讯报道平台。
转载内容版权归作者及来源网站所有,本站原创内容转载请注明来源。
- 上一篇
(课程)基于Spark的机器学习经验
Hi,大家好!我是祝威廉,本来微博也想叫祝威廉的,可惜被人占了,于是改名叫·祝威廉二世。然后总感觉哪里不对。目前在乐视云数据部门里从事实时计算,数据平台、搜索和推荐等多个方向。曾从事基础框架,搜索研发四年,大数据平台架构、推荐三年多,个人时间现专注于集群自动化部署,服务管理,资源自动化调度等方向。 今天会和大家分享三个主题。 不过限于时间,第三个只是会简单提及下, 等未来有机会可以更详细的分享。 如何基于Spark做机器学习(Spark-Shell其实也算的上即席查询了) 基于Spark做新词发现(依托Spark的强大计算能力) 基于Spark做智能问答(Spark上的算法支持) 其中这些内容在我之前写的一篇描述工作经历的文章 我的工作都有提及到,当然,可能不如今天分享的这么详细。 如何基于spark做机器学习 Spark发展到1.5版本,算是全平台了,实时批计算,批处理,算法库,SQL,hadoop能做的,基本他都能做,而且做的比Hadoop好。 当然,这里我要提及的是,Spark依然是Hadoop生态圈的一员,他替换的也仅仅是MR的计算模型而已。资源调度依赖于Yarn,存储则依赖于...
- 下一篇
Spark Streaming Direct Approach (No Receivers) 分析
前言 这个算是Spark Streaming 接收数据相关的第三篇文章了。 前面两篇是: Spark Streaming 数据产生与导入相关的内存分析 Spark Streaming 数据接收优化 Spark Streaming 接受数据的方式有两种: Receiver-based Approach Direct Approach (No Receivers) 上面提到的两篇文章讲的是 Receiver-based Approach 。 而这篇文章则重点会分析Direct Approach (No Receivers) 。 个人认为,DirectApproach 更符合Spark的思维。我们知道,RDD的概念是一个不变的,分区的数据集合。我们将kafka数据源包裹成了一个KafkaRDD,RDD里的partition 对应的数据源为kafka的partition。唯一的区别是数据在Kafka里而不是事先被放到Spark内存里。其实包括FileInputStream里也是把每个文件映射成一个RDD,比较好奇,为什么一开始会有Receiver-based Approach,额外添加了Rec...
相关文章
文章评论
共有0条评论来说两句吧...
文章二维码
点击排行
推荐阅读
最新文章
- Docker安装Oracle12C,快速搭建Oracle学习环境
- Docker快速安装Oracle11G,搭建oracle11g学习环境
- Linux系统CentOS6、CentOS7手动修改IP地址
- Springboot2将连接池hikari替换为druid,体验最强大的数据库连接池
- SpringBoot2整合MyBatis,连接MySql数据库做增删改查操作
- CentOS6,7,8上安装Nginx,支持https2.0的开启
- SpringBoot2配置默认Tomcat设置,开启更多高级功能
- CentOS7,8上快速安装Gitea,搭建Git服务器
- CentOS7,CentOS8安装Elasticsearch6.8.6
- Windows10,CentOS7,CentOS8安装Nodejs环境