SparkStreaming 手动维护kafka Offset到Mysql实例
官网详解地址
http://spark.apache.org/docs/latest/streaming-kafka-0-10-integration.html
手动提交offset,以保证数据不会丢失,尤其是在网络抖动严重的情况下,但是如果kafka挂掉重启后,可能会造成一些其他问题,
例如找不到保存的offset,这个具体问题再具体分析,先上代码。
import java.sql.{DriverManager, ResultSet}
import org.apache.kafka.clients.consumer.ConsumerRecord
import org.apache.kafka.common.TopicPartition
import org.apache.kafka.common.serialization.StringDeserializer
import org.apache.spark.streaming.dstream.InputDStream
import org.apache.spark.streaming.kafka010.{OffsetRange, _}
import org.apache.spark.streaming.{Seconds, StreamingContext}
import org.apache.spark.{SparkConf, SparkContext}
import scala.collection.mutable
/**
*
- 使用Spark-Kafka-0-10版本整合,并手动提交偏移量,维护到MySQL中
*/
object SparkKafkaTest2 {
def main(args: Array[String]): Unit = {
//1.创建StreamingContext val conf = new SparkConf().setAppName("wc").setMaster("local[*]") val sc = new SparkContext(conf) sc.setLogLevel("WARN") val ssc = new StreamingContext(sc,Seconds(5)) //准备连接Kafka的参数 val kafkaParams = Map[String, Object]( "bootstrap.servers" -> "server1:9092,server2:9092,server3:9092", "key.deserializer" -> classOf[StringDeserializer], "value.deserializer" -> classOf[StringDeserializer], "group.id" -> "SparkKafkaTest", "auto.offset.reset" -> "latest", "enable.auto.commit" -> (false: java.lang.Boolean)
val topics = Array("spark_kafka_test").toSet
val recordDStream: DStream[ConsumerRecord[String, String]] = if (offsetMap.size > 0) { //有记录offset println("MySQL中记录了offset,则从该offset处开始消费") KafkaUtils.createDirectStream[String, String]( ssc, PreferConsistent, //位置策略,源码强烈推荐使用该策略,会让Spark的Executor和Kafka的Broker均匀对应 Subscribe[String, String](topics, kafkaParams, offsetMap)) //消费策略,源码强烈推荐使用该策略 } else { //没有记录offset println("没有记录offset,则直接连接,从latest开始消费") KafkaUtils.createDirectStream[String, String]( ssc, PreferConsistent, //位置策略,源码强烈推荐使用该策略,会让Spark的Executor和Kafka的Broker均匀对应 Subscribe[String, String](topics, kafkaParams)) //消费策略,源码强烈推荐使用该策略 } recordDStream.foreachRDD { messages => if (messages.count() > 0) { //当前这一时间批次有数据 messages.foreachPartition { messageIter => messageIter.foreach { message => //println(message.toString()) } } val offsetRanges: Array[OffsetRange] = messages.asInstanceOf[HasOffsetRanges].offsetRanges for (o <- offsetRanges) { println(s"topic=${o.topic},partition=${o.partition},fromOffset=${o.fromOffset},untilOffset=${o.untilOffset}") } //手动提交offset,默认提交到Checkpoint中 //recordDStream.asInstanceOf[CanCommitOffsets].commitAsync(offsetRanges) //实际中偏移量可以提交到MySQL/Redis中 saveOffsetRanges("SparkKafkaTest", offsetRanges) } }
ssc.start()
ssc.awaitTermination()
}
/**
- 从数据库读取偏移量
*/
def getOffsetMap(groupid: String, topic: String) = {
Class.forName("com.mysql.jdbc.Driver") val connection = DriverManager.getConnection("jdbc:mysql://172.31.98.108:3306/bj_pfdh?characterEncoding=UTF-8", "root", "iflytek@web") val sqlselect = connection.prepareStatement(""" select * from kafka_offset where groupid=? and topic =? """) sqlselect.setString(1, groupid) sqlselect.setString(2, topic) val rs: ResultSet = sqlselect.executeQuery() val offsetMap = mutable.Map[TopicPartition, Long]() while (rs.next()) { offsetMap += new TopicPartition(rs.getString("topic"), rs.getInt("partition")) -> rs.getLong("offset") } rs.close() sqlselect.close() connection.close() offsetMap
}
/**
- 将偏移量保存到数据库
*/
def saveOffsetRanges(groupid: String, offsetRange: Array[OffsetRange]) = {
val connection = DriverManager.getConnection("jdbc:mysql://172.31.98.108:3306/bj_pfdh?characterEncoding=UTF-8", "root", "iflytek@web") //replace into表示之前有就替换,没有就插入 val select_ps = connection.prepareStatement(""" select count(*) as count from kafka_offset where `groupid`=? and `topic`=? and `partition`=? """) val update_ps = connection.prepareStatement(""" update kafka_offset set `offset`=? where `groupid`=? and `topic`=? and `partition`=? """) val insert_ps = connection.prepareStatement(""" INSERT INTO kafka_offset(`groupid`, `topic`, `partition`, `offset`) VALUE(?,?,?,?) """) for (o <- offsetRange) { select_ps.setString(1, groupid) select_ps.setString(2, o.topic) select_ps.setInt(3, o.partition) val select_resut = select_ps.executeQuery() // println(select_resut.)// .getInt("count")) while (select_resut.next()) { println(select_resut.getInt("count")) if (select_resut.getInt("count") > 0) { //update update_ps.setLong(1, o.untilOffset) update_ps.setString(2, groupid) update_ps.setString(3, o.topic) update_ps.setInt(4, o.partition) update_ps.executeUpdate() } else { //insert insert_ps.setString(1, groupid) insert_ps.setString(2, o.topic) insert_ps.setInt(3, o.partition) insert_ps.setLong(4, o.untilOffset) insert_ps.executeUpdate() } } } select_ps.close() update_ps.close() insert_ps.close() connection.close()
}
如果报错连不上数据库或连接数据库地址失败,请查看是否添加了mysql客户端jar包。
--------五维空间s
低调大师中文资讯倾力打造互联网数据资讯、行业资源、电子商务、移动互联网、网络营销平台。
持续更新报道IT业界、互联网、市场资讯、驱动更新,是最及时权威的产业资讯及硬件资讯报道平台。
转载内容版权归作者及来源网站所有,本站原创内容转载请注明来源。
- 上一篇
Github架构师解读C/C++应用包管理的Why和How
一、背景 本文整理自Johannes Nicolai在JFrog 2019用户大会上的讲演《DevOps for Non-Hipsters(aka C/C++ programmers)》。 Johannes Nicolai是Github的解决方案架构师,主要负责德语区的用户。他和很多制造业的用户(多数使用C/C++)交流,询问他们在DevOps或持续交付方面的挑战,通常会得到如下的描述: 在嵌入式C/C++领域,花费几十个小时完成一个完整的DevOps流水线并不少见。为某一个提交运行单独的构建和测试几乎是不可能的,通常每次构建都包含了几百个同事所有的提交。而构建时间长的主要原因在于交付包包含了大量的依赖包,而每次构建这些依赖包都需要从头开始重新构建。上述的描述并不仅限于德语区,Johannes询问了美国制造业的用户,也得到了类似的反馈。 从业界的发展来看,声明式包管理能够很好的解决上述的问题。在交付包中通过声明描述所需的依赖包,在构建时根据声明从包管理系统中获取相应的依赖包,这样能够大大缩短构建时间。Java或JavaScript的开发者很熟悉这样的方式。 对于像Java或JavaS...
- 下一篇
【云栖号在线课堂】重磅推荐:对话达摩院自动驾驶实验室负责人王刚,揭秘自动驾驶之路上的“能”与“不能”
受疫情影响,如何轻松实现在家办公和学习?不用慌,云栖号在线课堂,每天都有产品技术专家分享,带你快速入门云计算!下面就给大家推荐关于CIO学院攻“疫”技术培训及2020大数据技术公开课第二季的精品课程! CIO学院攻“疫”技术培训 “开展技术普惠公益,与广大技术人共同学习成长!”是阿里CIO学院“技术攻疫(公益)大咖讲开设的初衷。该系列课程邀请到李飞飞、贾扬清、丁险峰、华先胜、王刚、金榕、小邪、五福、司罗、肖力、施尧耘、吴翰清等数十位技术大咖,与大家共同探讨人工智能、云计算、企业安全体系……的技术与实践。CIO学院攻“疫”技术培训第一期 达摩院自动驾驶实验室负责人王刚介绍无人驾驶的经典发展路线、面对的核心问题、最新到技术成果以及从黑科技到商业化的未来。阿里巴巴集团副总裁、达摩院高级研究员金榕从深度学习到AI三大关键技术方向,看AI技术应用中的困局、破局,以及未来。 【课程目录】 讲师 直播主题 观看视频 王刚 自动驾驶之路上的“能”与“不能” 点击观看 金榕 困局与破局:从深度学习到AI三大关键技术 点击观看 CIO学院攻“疫”技术培训第二期 云可以带来哪些切实的好处?如何用好云?在云的...
相关文章
文章评论
共有0条评论来说两句吧...
文章二维码
点击排行
推荐阅读
最新文章
- CentOS7编译安装Gcc9.2.0,解决mysql等软件编译问题
- Docker快速安装Oracle11G,搭建oracle11g学习环境
- CentOS8编译安装MySQL8.0.19
- MySQL8.0.19开启GTID主从同步CentOS8
- SpringBoot2全家桶,快速入门学习开发网站教程
- CentOS7,CentOS8安装Elasticsearch6.8.6
- CentOS8,CentOS7,CentOS6编译安装Redis5.0.7
- Red5直播服务器,属于Java语言的直播服务器
- CentOS8安装MyCat,轻松搞定数据库的读写分离、垂直分库、水平分库
- SpringBoot2整合MyBatis,连接MySql数据库做增删改查操作