大数据实时流处理零数据丢失
大数据实时流处理零数据丢失
1.整体流程:
a)kafka:作为流处理程序的生产者 b)sparkStreaming:作为消费者,设置合理batch c)DB:输出到redis/ES
2.存在问题:
雪崩效应: kill 出现,导致的数据丢失 sparkStreaming程序挂掉了,到知道的数据丢失 解决: 1.使用checkpoint。维护太麻烦,流程序修改后需要删除checkpoint下的数据才可以,但是这回导致数据丢失或者重复。 2.官方建议:同时启动新旧两个流程序,然后认为关掉旧的,并记录偏移量,然后新的流程序从指定的偏移量消费。 3.自己保存偏移量到外部设备,每次启动流程序先读取外设,然后判断,offset的读取方式是earliest 还是指定偏移量 4.配置kafak限速参数,以及流程序的序列化方式。 5.关于偏移量的存储,可以存mysql,redis,HBASE,kafka,zk。。。 6.关于数据重复,为了达到exactly once, 需要跟实际的业务结合。例如最后的结果如果是写入到HBase,可以将偏移量作为HBase对应的业务表中的一个列,实现事务性,幂等。
代码:
1.pom文件: <properties> <scala.version>2.11.8</scala.version> <spark.version>2.3.1</spark.version> <kafka.version>0.10.0.0</kafka.version> </properties> <repositories> <repository> <id>scala-tools.org</id> <name>Scala-Tools Maven2 Repository</name> <url>http://scala-tools.org/repo-releases</url> </repository> <repository> <id>scalikeJDBC</id> <name>scalikeJDBC</name> <url>https://mvnrepository.com/artifact/org.scalikejdbc/scalikejdbc</url> </repository> </repositories> <pluginRepositories> <pluginRepository> <id>scala-tools.org</id> <name>Scala-Tools Maven2 Repository</name> <url>http://scala-tools.org/repo-releases</url> </pluginRepository> </pluginRepositories> <dependencies> <dependency> <groupId>org.apache.spark</groupId> <artifactId>spark-streaming_2.11</artifactId> <version>${spark.version}</version> </dependency> <dependency> <groupId>org.apache.spark</groupId> <artifactId>spark-sql_2.11</artifactId> <version>${spark.version}</version> </dependency> <dependency> <groupId>org.apache.spark</groupId> <artifactId>spark-core_2.11</artifactId> <version>${spark.version}</version> </dependency> <dependency> <groupId>org.apache.spark</groupId> <artifactId>spark-streaming-kafka-0-10_2.11</artifactId> <version>${spark.version}</version> </dependency> <!--sparksql -kafka--> <dependency> <groupId>org.apache.spark</groupId> <artifactId>spark-sql-kafka-0-10_2.11</artifactId> <version>${spark.version}</version> </dependency> <dependency> <groupId>org.apache.kafka</groupId> <artifactId>kafka_2.11</artifactId> <version>${kafka.version}</version> </dependency> <dependency> <groupId>org.apache.kafka</groupId> <artifactId>kafka-clients</artifactId> <version>${kafka.version}</version> </dependency> <!--json解析--> <dependency> <groupId>com.alibaba</groupId> <artifactId>fastjson</artifactId> <version>1.2.36</version> </dependency> <dependency> <groupId>mysql</groupId> <artifactId>mysql-connector-java</artifactId> <version>5.1.40</version> </dependency> <!--redis--> <dependency> <groupId>redis.clients</groupId> <artifactId>jedis</artifactId> <version>2.9.0</version> </dependency> <!--config--> <dependency> <groupId>com.typesafe</groupId> <artifactId>config</artifactId> <version>1.3.1</version> </dependency> <!--RDBMS访问--> <dependency> <groupId>org.scalikejdbc</groupId> <artifactId>scalikejdbc_2.11</artifactId> <version>2.5.0</version> </dependency> <dependency> <groupId>org.scalikejdbc</groupId> <artifactId>scalikejdbc-core_2.11</artifactId> <version>2.5.0</version> </dependency> <dependency> <groupId>org.scalikejdbc</groupId> <artifactId>scalikejdbc-config_2.11</artifactId> <version>2.5.0</version> </dependency> <dependency> <groupId>org.scala-lang</groupId> <artifactId>scala-library</artifactId> <version>${scala.version}</version> </dependency>
2.kafka topic的创建:
a) bin/kafka-topics.sh --create --zookeeper pj-01:2181 --replication-factor 1 --partitions 1 --topic saprkProcess b) 修改kafka的分区数目:提高spark的并行度,提升消息的处理吞吐量。 bin/kafka-topics.sh --alter --zookeeper pj-01:2181 --topic saprkProcess --partitions 3 c) note:在0.8版本,修改完kafka 对应的topic的partition的后,流程序是侦测不到,导致数据丢失。其实,在kafka的logs下面数据已经有了,所以需要在代码中判断是否有新的分区,并获取到新的分区,然后将其偏移量初始化为0. 但是在0.10版本后,就不存在这个问题了。
3.模拟kafka生产者:
package Producer; import org.apache.kafka.clients.producer.KafkaProducer; import org.apache.kafka.clients.producer.ProducerRecord; import java.util.Properties; import java.util.Random; import java.util.UUID; public class KafkaProducerApp { public static void main(String[] args) throws InterruptedException { final Properties props = new Properties(); //Assign topicName to string variable String topics = "saprkProcess"; // create instance for properties to access producer configs //Assign localhost id props.put("bootstrap.servers", "pj-01:9092"); //Set acknowledgements for producer requests. props.put("acks", "all"); //If the request fails, the producer can automatically retry, props.put("retries", 0); //Specify buffer size in config props.put("batch.size", 16384); //Reduce the no of requests less than 0 props.put("linger.ms", 1); //The buffer.memory controls the total amount of memory available to the producer for buffering. props.put("buffer.memory", 33554432); props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer"); props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer"); KafkaProducer<String, String> producer = new KafkaProducer<String, String>(props); String value[] = {"alibaba","baidu","tencent","facebook","amazon","apple","google","linkedin","twitter","ant","ICBC","Lakers","cleveland"}; int i = value.length; final Random random = new Random(); int ret = 100; while (ret>=0) { Thread.sleep(10); final ProducerRecord<String, String> msg = new ProducerRecord<String, String>( topics // , (new Random()).nextInt(3) ,0 ,System.currentTimeMillis() , UUID.randomUUID().toString().substring(6, 14).replace("-", "") , value[random.nextInt(i)] ); producer.send(msg); System.out.println("msg = " + "alibaba -A: "+msg); ret --; } } }
4.创建:mysql 偏移量表:
a)这里MySQL相关操作使用的scalikeJDBC,这个是针对scala的一款db库,操作api简单明了,默认数据库配置通过读取resources下的(applicaiton.conf/application.json,applicaiton.properties)文件。 b)更新偏移量的api使用replace into: note:如果没有设置主键,默认replace操作就是新增,不会与表中已存在的数据进行逻辑比对。 01:所以下面建表语句中:使用topic,groupid,partition作为联合主键,来确定一条记录。 02:因为我们的流程序一经启动:一般就是7*24,所以表名暂定为streaming_offset_24。这个自定义程度很高。 建表语句: create table streaming_offset_24(topic varchar(50),groupid varchar(20), partitions int, offset bigint, primary key(topic,groupid,partitions));
5.流程序: spark streaming 处理demo:
package pjnet import org.apache.kafka.common.TopicPartition import org.apache.kafka.common.serialization.StringDeserializer import org.apache.log4j.{Level, Logger} import org.apache.spark.SparkConf import org.apache.spark.rdd.RDD import org.apache.spark.streaming.kafka010._ import org.apache.spark.streaming.{Seconds, StreamingContext} import scalikejdbc.config.DBs import scalikejdbc.{DB, _} object StreamingProApp { Logger.getLogger("org.apache").setLevel(Level.WARN) //设置日志显示 def main(args: Array[String]): Unit = { val conf = new SparkConf().setMaster("local[*]").setAppName("StreamingProApp") //指定每秒钟每个分区kafka拉取消息的速率 .set("spark.streaming.kafka.maxRatePerPartition", "100") // 修改序列化为Kyro,减少shuffle量 .set("spark.serilizer", "org.apache.spark.serializer.KryoSerializer") // 开启rdd的压缩 .set("spark.rdd.compress", "true") // 设置批次的处理时间 val ssc = new StreamingContext(conf, Seconds(10)) //一参数设置 val groupId = "1" //也可以通过读取配置文件,使用typesafe的 ConfigFactory读取 val kafkaParams = Map[String, Object]( "bootstrap.servers" -> "pj-01:9092", "key.deserializer" -> classOf[StringDeserializer], "value.deserializer" -> classOf[StringDeserializer], "group.id" -> groupId, "auto.offset.reset" -> "earliest", "enable.auto.commit" -> (false: java.lang.Boolean) //自己维护偏移量。连接kafka的集群。 ) val topics = Array("saprkProcess") //二参数设置 DBs.setup() val fromdbOffset: Map[TopicPartition, Long] = DB.readOnly { implicit session => SQL(s"select * from `streaming_offset_24` where groupId = '${groupId}'") .map(rs => (new TopicPartition(rs.string("topic"), rs.int("partitions")), rs.long("offset"))) .list().apply() }.toMap //程序启动,拉取kafka的消息。 val stream = if (fromdbOffset.size == 0) { KafkaUtils.createDirectStream[String, String]( ssc, LocationStrategies.PreferConsistent, ConsumerStrategies.Subscribe[String, String](topics, kafkaParams) ) } else { KafkaUtils.createDirectStream( ssc, LocationStrategies.PreferConsistent, ConsumerStrategies.Assign[String, String](fromdbOffset.keys, kafkaParams, fromdbOffset) ) } stream.foreachRDD({ rdd => val offsetRanges: Array[OffsetRange] = rdd.asInstanceOf[HasOffsetRanges].offsetRanges //数据处理 val resout: RDD[(String, Int)] = rdd.flatMap(_.value().split(" ")).map((_, 1)).reduceByKey(_ + _) //可以将结果保存至redis,hbase等db resout.foreach(println) // resout.foreachPartition({ //// it => //// val jedis = RedisUtils.getJedis //// it.foreach({ //// va => //// jedis.hincrBy(field,val1,val2) //// }) //// jedis.close() // // }) //偏移量存入mysql,使用scalikejdbc框架,下面两种方式都可以。
//note: localTx is transactional, if metric update or offset update fails, neither will be committed DB.localTx { implicit session => for (or <- offsetRanges) { SQL("replace into `streaming_offset_24`(topic,groupId,partitions,offset) values(?,?,?,?)") .bind(or.topic,groupId, or.partition, or.untilOffset).update().apply() } } // offsetRanges.foreach(osr => { // DB.autoCommit{ implicit session => // sql"REPLACE INTO streaming_offset_24(topic, groupid, partitions, offset) VALUES(?,?,?,?)" // .bind(osr.topic, groupId, osr.partition, osr.untilOffset).update().apply() // } // }) }) stream.count().print() ssc.start() ssc.awaitTermination() } }
低调大师中文资讯倾力打造互联网数据资讯、行业资源、电子商务、移动互联网、网络营销平台。
持续更新报道IT业界、互联网、市场资讯、驱动更新,是最及时权威的产业资讯及硬件资讯报道平台。
转载内容版权归作者及来源网站所有,本站原创内容转载请注明来源。
- 上一篇
Kubernetes-在Kubernetes集群上搭建Stateful Elasticsearch集群
准备工作 Elasticsearch镜像,我以Elasticsearch官方镜像的5.6.10版本为基础创建的。 Dockerfile FROM docker.elastic.co/elasticsearch/elasticsearch:5.6.10 MAINTAINER leo.lee(lis85@163.com) WORKDIR /usr/share/elasticsearch USER root # copying custom-entrypoint.sh and configuration (elasticsearch.yml, log4j2.properties) # to their respective directories in /usr/share/elasticsearch (already the WORKDIR) COPY custom-entrypoint.sh bin/ COPY elasticsearch.yml config/ COPY log4j2.properties config/ # assuring "elasticsearch" user ...
- 下一篇
HBase查询优化之Short-Circuit Local Reads
1.概述 在《HBase查询优化》一文中,介绍了基于HBase层面的读取优化。由于HBase的实际数据是以HFile的形式,存储在HDFS上。那么,HDFS层面也有它自己的优化点,即:Short-Circuit Local Reads。本篇博客笔者将从HDFS层面来进行优化,从而间接的提升HBase的查询性能。 2.内容 Hadoop系统在设计之初,遵循一个原则,那就是移动计算的代价比移动数据要小。故Hadoop在做计算的时候,通常是在本地节点上的数据中进行计算。即计算和数据本地化。流程如下图所示: 在最开始的时候,短回路本地化读取和跨节点的读取的处理方式是一样的,流程都是先从DataNode读取数据,然后通过RPC服务把数据传输给DFSClient,这样处理虽然流程比较简单,但是读取性能会受到影响,因为跨节点读取数据,需要经过网络将一个DataNode的数据传输到另外一个DataNode节点(一般来说,HDFS有3个副本,所以,本地取不到数据,会到其他DataNode节点去取数据)。 2.1 方案一:客户端直接读取DataNode文件 短回路本地化读取的核心思想是,由于客户端和数据...
相关文章
文章评论
共有0条评论来说两句吧...
文章二维码
点击排行
推荐阅读
最新文章
- Eclipse初始化配置,告别卡顿、闪退、编译时间过长
- SpringBoot2整合MyBatis,连接MySql数据库做增删改查操作
- SpringBoot2整合Thymeleaf,官方推荐html解决方案
- SpringBoot2整合Redis,开启缓存,提高访问速度
- CentOS8编译安装MySQL8.0.19
- Docker使用Oracle官方镜像安装(12C,18C,19C)
- CentOS7,8上快速安装Gitea,搭建Git服务器
- Docker安装Oracle12C,快速搭建Oracle学习环境
- Springboot2将连接池hikari替换为druid,体验最强大的数据库连接池
- Windows10,CentOS7,CentOS8安装Nodejs环境