Spark Streamming+Kafka提交offset实现有且仅有一次
我的原创地址:https://dongkelun.com/2018/06/20/sparkStreamingOffsetOnlyOnce/
前言
本文讲Spark Streamming使用Direct方式读取Kafka,并在输出(存储)操作之后提交offset到Kafka里实现程序读写操作有且仅有一次,即程序重启之后之前消费并且输出过的数据不再重复消费,接着上次消费的位置继续消费Kafka里的数据。
Spark Streamming+Kafka官方文档:http://spark.apache.org/docs/latest/streaming-kafka-0-10-integration.html
1、提交offset的程序
根据官方文档可知,在spark代码里可以获取对应的offset信息,并且可以提交offset存储到kafka中。
代码:
package com.dkl.leanring.spark.kafka
import org.apache.kafka.clients.consumer.ConsumerRecord
import org.apache.kafka.common.serialization.StringDeserializer
import org.apache.spark.streaming.kafka010._
import org.apache.spark.streaming.kafka010.LocationStrategies.PreferConsistent
import org.apache.spark.streaming.kafka010.ConsumerStrategies.Subscribe
import org.apache.spark.streaming.StreamingContext
import org.apache.spark.SparkConf
import org.apache.spark.streaming.Seconds
import org.apache.spark.TaskContext
object KafkaOffsetDemo {
def main(args: Array[String]) {
//创建sparkConf
val sparkConf = new SparkConf().setAppName("KafkaOffsetDemo").setMaster("local[2]")
// 创建StreamingContext batch size 为 1秒
val ssc = new StreamingContext(sparkConf, Seconds(1))
val kafkaParams = Map[String, Object](
"bootstrap.servers" -> "ambari.master.com:6667", //kafka集群地址
"key.deserializer" -> classOf[StringDeserializer],
"value.deserializer" -> classOf[StringDeserializer],
"group.id" -> "KafkaOffsetDemo", //消费者组名
"auto.offset.reset" -> "earliest", //当各分区下有已提交的offset时,从提交的offset开始消费;无提交的offset时,从头开始消费
"enable.auto.commit" -> (false: java.lang.Boolean)) //如果是true,则这个消费者的偏移量会在后台自动提交
val topics = Array("top1") //消费主题
//创建DStream,返回接收到的输入数据
val stream = KafkaUtils.createDirectStream[String, String](
ssc,
PreferConsistent,
Subscribe[String, String](topics, kafkaParams))
// 打印获取到的数据,因为1秒刷新一次,所以数据长度大于0时才打印
stream.foreachRDD(f => {
if (f.count > 0) {
println("=============================")
println("打印获取到的kafka里的内容")
f.foreach(f => {
val value = f.value()
println(value)
})
println("=============================")
println("打印offset的信息")
// offset
val offsetRanges = f.asInstanceOf[HasOffsetRanges].offsetRanges
//打印offset
f.foreachPartition { iter =>
val o: OffsetRange = offsetRanges(TaskContext.get.partitionId)
println(s"${o.topic} ${o.partition} ${o.fromOffset} ${o.untilOffset}")
}
println("=============================")
// 等输出操作完成后提交offset
stream.asInstanceOf[CanCommitOffsets].commitAsync(offsetRanges)
}
})
//启动
ssc.start()
//等待停止
ssc.awaitTermination()
}
}
说明:
* auto.offset.reset设置为earliest,即当各分区下有已提交的offset时,从提交的offset开始消费;无提交的offset时,从头开始,这样设置的目的是为了一开始可以获取到kafka对应主题下的所有的历史消息。
* enable.auto.commit 设置为false,如果是true,则这个消费者的偏移量会在后台自动提交,这样设置目的是为了后面自己提交offset,因为如果虽然获取到了消息,但是后面的转化操作并将结果写到如hive中并没有完成程序就挂了的话,这样是不能将这次的offset提交的,这样就可以等程序重启之后接着上次失败的地方继续消费
* group.id 是不能变得,也就是offset是和topic和group绑定的,如果换一个group的话,程序将从头消费所有的历史数据
* 这个api是将offset存储到kakfa的一个指定的topic里,名字为__consumer_offsets,而不是zookeeper中
2、测试程序
1、首先创建对应的topic
2、生产几条数据作为历史消息
bin/kafka-console-producer.sh --broker-list ambari.master.com:6667 --topic top1
3、启动上面的程序
4、继续生产几条数据
接下来先看一下结果:
由图可得,这样可以将历史数据全部打印出来,并且后面实时增加的数据,也打印出来了,且可以看到offset是在增加的,最后一个offset是202,那么接下来测试一下程序重启之后是否会接着之前的数据继续消费呢
5、停止程序
6、生产几条数据
7、启动程序
看一下结果:
可以看出,程序确实是接着上次消费的地方消费的,为了证实这一点,我将earliest和offset圈了起来,从offset可以看到是从上次的202开始消费的。
3、关于offset过期时间
kafka offset默认的过期时间是一天,当上面的程序挂掉,一天之内没有重启,也就是一天之内没有保存新的offset的话,那么之前的offset就会被删除,再重启程序,就会从头开始消费kafka里的所有历史数据,这种情况是有问题的,所以可以通过设置offsets.retention.minutes自定义offset过期时间,该设置单位为分钟,默认为1440。
修改kafka的offset过期时间详细信息见:https://dongkelun.com/2018/06/21/modifyKafkaOffsetTime/
4、自己保存offset
可以通过自己保存offset的信息到数据库里,然后需要时再取出来,根据得到的offset信息消费kafka里的数据,这样就不用担心offset的过期的问题了,因为没有自己写代码实现,所以先给出官网的示例代码:
// The details depend on your data store, but the general idea looks like this
// begin from the the offsets committed to the database
val fromOffsets = selectOffsetsFromYourDatabase.map { resultSet =>
new TopicPartition(resultSet.string("topic"), resultSet.int("partition")) -> resultSet.long("offset")
}.toMap
val stream = KafkaUtils.createDirectStream[String, String](
streamingContext,
PreferConsistent,
Assign[String, String](fromOffsets.keys.toList, kafkaParams, fromOffsets)
)
stream.foreachRDD { rdd =>
val offsetRanges = rdd.asInstanceOf[HasOffsetRanges].offsetRanges
val results = yourCalculation(rdd)
// begin your transaction
// update results
// update offsets where the end of existing offsets matches the beginning of this batch of offsets
// assert that offsets were updated correctly
// end your transaction
}

低调大师中文资讯倾力打造互联网数据资讯、行业资源、电子商务、移动互联网、网络营销平台。
持续更新报道IT业界、互联网、市场资讯、驱动更新,是最及时权威的产业资讯及硬件资讯报道平台。
转载内容版权归作者及来源网站所有,本站原创内容转载请注明来源。
-
上一篇
云数据库产品月刊·6月刊
【云数据库大事件】 阿里云云数据库Redis团队获得“开源社区技术贡献领袖奖” LC3开源峰会(LinuxCon + ContainerCon + CloudOpen)25-27日在北京国家会议中心举行,做为集三大行业知名大会于一身的盛会,LC3让数千位开发人员、运营专家、企业、合规人员、法律人士和其他专业人士汇聚一堂,国内外多家名企都现身大会做讲座宣传,本次发布会上,彭立勋做了AliSQL的演讲,在2016 云栖大会阿里云宣布开源AliSQL之后,当前在GitHub上已经累计发布了10个版本了。另外MongoShake跨数据中心做数据复制,是做容灾和多活的重点工具,此次进行开源,吸引了大量客户的关注。发布会上,中国Redis技术社区对阿里云等三家公司进行了颁奖,阿里云云数据库Redis团队获得“开源社区技术贡献领袖奖”,社区主席重点就云数据库Redis团队为redis开源做出的贡献表示感谢。会议的详情视频回顾资料及PPT等资料等汇总请见:https://yq.aliyun.com/webinar/play/459 阿里云在LC3开源峰会上宣布正式开源基于MongoDB的容灾和多活解...
-
下一篇
SparkStreaming+Kafka 实现统计基于缓存的实时uv
版权声明:本文由董可伦首发于https://dongkelun.com,非商业转载请注明作者及原创出处。商业转载请联系作者本人。 https://blog.csdn.net/dkl12/article/details/80943294 我的原创地址:https://dongkelun.com/2018/06/25/KafkaUV/ 前言 本文利用SparkStreaming+Kafka实现实时的统计uv,即独立访客,一个用户一天内访问多次算一次,这个看起来要对用户去重,其实只要按照WordCount的思路,最后输出key的数量即可,所以可以利用SparkStreaming+Kafka 实现基于缓存的实时wordcount程序,这里稍加改动,如果uv数量增加的话就打印uv的数量(key的数量)。 1、数据 数据是我随机在kafka里生产的几条,用户以空格区分开(因为用的之前单词统计的程序) 2、kafka topic 首先在kafka建一个程序用到topic:KafkaUV bin/kafka-topics.sh --create --zookeeper localhost:2181 -...
相关文章
文章评论
共有0条评论来说两句吧...
文章二维码
点击排行
推荐阅读
最新文章
- SpringBoot2整合MyBatis,连接MySql数据库做增删改查操作
- SpringBoot2整合Thymeleaf,官方推荐html解决方案
- Windows10,CentOS7,CentOS8安装Nodejs环境
- Docker使用Oracle官方镜像安装(12C,18C,19C)
- MySQL8.0.19开启GTID主从同步CentOS8
- CentOS7,8上快速安装Gitea,搭建Git服务器
- SpringBoot2全家桶,快速入门学习开发网站教程
- Springboot2将连接池hikari替换为druid,体验最强大的数据库连接池
- Dcoker安装(在线仓库),最新的服务器搭配容器使用
- SpringBoot2初体验,简单认识spring boot2并且搭建基础工程