SparkStreaming+Kafka
摘自 :
Spark踩坑记——Spark Streaming+Kafka
SpringStreaming+Kafka
1.SpringStreaming+Kafka 接受数据和发送数据
(1)SparkStreaming 接受kafka方式
- 基于Received的方式
![]()
- 基于DirectKafkaStreaming
![]()
DirectKafkaStreaming 相比较 ReceiverKafkaStreaming
- 简化的并行:在Receiver的方式中我们提到创建多个Receiver之后利用union来合并成一个Dstream的方式提高数据传输并行度。而在Direct方式中,Kafka中的partition与RDD中的partition是一一对应的并行读取Kafka数据,这种映射关系也更利于理解和优化。
- 高效:在Receiver的方式中,为了达到0数据丢失需要将数据存入Write Ahead Log中,这样在Kafka和日志中就保存了两份数据,浪费!而第二种方式不存在这个问题,只要我们Kafka的数据保留时间足够长,我们都能够从Kafka进行数据恢复。
- 精确一次:在Receiver的方式中,使用的是Kafka的高阶API接口从Zookeeper中获取offset值,这也是传统的从Kafka中读取数据的方式,但由于Spark Streaming消费的数据和Zookeeper中记录的offset不同步,这种方式偶尔会造成数据重复消费。而第二种方式,直接使用了简单的低阶Kafka API,Offsets则利用Spark Streaming的checkpoints进行记录,消除了这种不一致性。
(2)Spark 发送数据至Kafka中
一般处理方式 : 在RDD.forpartition进行操作
input.foreachRDD(rdd => // 不能在这里创建KafkaProducer rdd.foreachPartition(partition => partition.foreach{ case x:String=>{ val props = new HashMap[String, Object]() props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, brokers) props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer") props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer") println(x) val producer = new KafkaProducer[String,String](props) val message=new ProducerRecord[String, String]("output",null,x) producer.send(message) } } ) )
此方式的缺点在于每次foreach操作都需要重新创建一次kafkaProduce 主要花费时间都在 创建连接的时候.
基于此我们以以下方式进行操作
- 首先,我们需要将KafkaProducer利用lazy val的方式进行包装如下:
import java.util.concurrent.Future import org.apache.kafka.clients.producer.{ KafkaProducer, ProducerRecord, RecordMetadata } class KafkaSink[K, V](createProducer: () => KafkaProducer[K, V]) extends Serializable { /* This is the key idea that allows us to work around running into NotSerializableExceptions. */ lazy val producer = createProducer() def send(topic: String, key: K, value: V): Future[RecordMetadata] = producer.send(new ProducerRecord[K, V](topic, key, value)) def send(topic: String, value: V): Future[RecordMetadata] = producer.send(new ProducerRecord[K, V](topic, value)) } object KafkaSink { import scala.collection.JavaConversions._ def apply[K, V](config: Map[String, Object]): KafkaSink[K, V] = { val createProducerFunc = () => { val producer = new KafkaProducer[K, V](config) sys.addShutdownHook { // Ensure that, on executor JVM shutdown, the Kafka producer sends // any buffered messages to Kafka before shutting down. producer.close() } producer } new KafkaSink(createProducerFunc) } def apply[K, V](config: java.util.Properties): KafkaSink[K, V] = apply(config.toMap) }
- 之后我们利用广播变量的形式,将KafkaProducer广播到每一个executor,如下:
// 广播KafkaSink val kafkaProducer: Broadcast[KafkaSink[String, String]] = { val kafkaProducerConfig = { val p = new Properties() p.setProperty("bootstrap.servers", Conf.brokers) p.setProperty("key.serializer", classOf[StringSerializer].getName) p.setProperty("value.serializer", classOf[StringSerializer].getName) p } log.warn("kafka producer init done!") ssc.sparkContext.broadcast(KafkaSink[String, String](kafkaProducerConfig)) }
- 这样我们就能在每个executor中愉快的将数据输入到kafka当中:
//输出到kafka segmentedStream.foreachRDD(rdd => { if (!rdd.isEmpty) { rdd.foreach(record => { kafkaProducer.value.send(Conf.outTopics, record._1.toString, record._2) // do something else }) } })
2.Spark streaming+Kafka调优
2.1 批处理时间设置
参数设置:
2.2 合理的Kafka拉取量
参数设置: spark.streaming.kafka.maxRatePerPartition
2.3 缓存反复使用的Dstream(RDD)
DStream.cache()
2.4 设置合理的GC
长期使用Java的小伙伴都知道,JVM中的垃圾回收机制,可以让我们不过多的关注与内存的分配回收,更加专注于业务逻辑,JVM都会为我们搞定。对JVM有些了解的小伙伴应该知道,在Java虚拟机中,将内存分为了初生代(eden generation)、年轻代(young generation)、老年代(old generation)以及永久代(permanent generation),其中每次GC都是需要耗费一定时间的,尤其是老年代的GC回收,需要对内存碎片进行整理,通常采用标记-清楚的做法。同样的在Spark程序中,JVM GC的频率和时间也是影响整个Spark效率的关键因素。在通常的使用中建议:
--conf "spark.executor.extraJavaOptions=-XX:+UseConcMarkSweepGC"
2.5 设置合理的CPU资源数
CPU的core数量,每个executor可以占用一个或多个core,可以通过观察CPU的使用率变化来了解计算资源的使用情况,例如,很常见的一种浪费是一个executor占用了多个core,但是总的CPU使用率却不高(因为一个executor并不总能充分利用多核的能力),这个时候可以考虑让么个executor占用更少的core,同时worker下面增加更多的executor,或者一台host上面增加更多的worker来增加并行执行的executor的数量,从而增加CPU利用率。但是增加executor的时候需要考虑好内存消耗,因为一台机器的内存分配给越多的executor,每个executor的内存就越小,以致出现过多的数据spill over甚至out of memory的情况。
2.6设置合理的parallelism
partition和parallelism,partition指的就是数据分片的数量,每一次task只能处理一个partition的数据,这个值太小了会导致每片数据量太大,导致内存压力,或者诸多executor的计算能力无法利用充分;但是如果太大了则会导致分片太多,执行效率降低。在执行action类型操作的时候(比如各种reduce操作),partition的数量会选择parent RDD中最大的那一个。而parallelism则指的是在RDD进行reduce类操作的时候,默认返回数据的paritition数量(而在进行map类操作的时候,partition数量通常取自parent RDD中较大的一个,而且也不会涉及shuffle,因此这个parallelism的参数没有影响)。所以说,这两个概念密切相关,都是涉及到数据分片的,作用方式其实是统一的。通过spark.default.parallelism可以设置默认的分片数量,而很多RDD的操作都可以指定一个partition参数来显式控制具体的分片数量。
在SparkStreaming+kafka的使用中,我们采用了Direct连接方式,前文阐述过Spark中的partition和Kafka中的Partition是一一对应的,我们一般默认设置为Kafka中Partition的数量。
2.7使用高性能的算子
- 使用reduceByKey/aggregateByKey替代groupByKey
- 使用mapPartitions替代普通map
- 使用foreachPartitions替代foreach
- 使用filter之后进行coalesce操作
- 使用repartitionAndSortWithinPartitions替代repartition与sort类操作

低调大师中文资讯倾力打造互联网数据资讯、行业资源、电子商务、移动互联网、网络营销平台。
持续更新报道IT业界、互联网、市场资讯、驱动更新,是最及时权威的产业资讯及硬件资讯报道平台。
转载内容版权归作者及来源网站所有,本站原创内容转载请注明来源。
- 上一篇
【实战】ElasticSearch的备份与恢复
早先时候我们讲了关于HDFS的备份方法,作为Hadoop在搜索层级的演进应用,Elasticsearch可以接入多种异构的数据平台(结构化/非结构化,流式/批量的),加之有丰富的管理工具,在很短时间内迅速获得了很多有EDW需求的公司的青睐。 在Elasticsearch的架构中,由Logstash负责采集客户端数据(类似于Hadoop生态系统中的Flume,当然Flume和Hadoop也可以直接作为Elasticsearch的数据源),再由Elasticsearch做即时分析与处理,最后交由基于现代化HTML5的web应用-Kibana将分析结果作可视化展现。 说到Elasticsearch的用户,刚被微软以75亿美元天价收购的Github就是一个典型案例,此外还有Facebook、WordPress、stackoverflow等超大型社交类应用及社区。Elasticsearch不仅给这些用户提供了冗余、可扩展的数据系统,并且通过实时索引,有效地将原先长达几十分钟的搜索体验缩短至秒级。通过轻型的Elasticsearch-Hadoop库可以与现有的Hadoop平台无缝对接,并且可以实现...
- 下一篇
ElasticSearch入门 附.Net Core例子
1.什么是ElasticSearch? Elasticsearch是基于Lucene的搜索引擎。它提供了一个分布式,支持多租户的全文搜索引擎,它具有HTTP Web界面和无模式JSON文档。 Elasticsearch是用Java开发的,根据Apache许可条款作为开源发布。 ----来自维基百科的解释 我个人的理解是Elasticsearch(以下简称ES)是一个支持分布式的全文搜索引擎,因为在海量数据搜索时,普通关系型、非关系型数据库因为IO读取、处理器运算能力的限制,导致查询效率难以提升,但是ES是分布式的(能把处理压力分摊给每个节点),而且它是给每个词创建索引,所以查询效率极高,堪称即时搜索。 而且ES能搭配Kibana,实现数据的可视化管理与数据分析。 Kibana仪表盘 2.ES中名词概念 2.1 Node和Cluster 前面所过ES是一个分布式搜索引擎,其本质是一个分布式数据库,可以多台计算机上的ES实例协同工作,这里面的某一台计算机上的某个ES实例,就可以称为一个Node(节点),所有的这些协同工作的实例,可以称为一个Cluster(集群)。 2.2 Index E...
相关文章
文章评论
共有0条评论来说两句吧...
文章二维码
点击排行
推荐阅读
最新文章
- Hadoop3单机部署,实现最简伪集群
- Springboot2将连接池hikari替换为druid,体验最强大的数据库连接池
- Docker使用Oracle官方镜像安装(12C,18C,19C)
- CentOS8,CentOS7,CentOS6编译安装Redis5.0.7
- SpringBoot2整合Redis,开启缓存,提高访问速度
- CentOS6,7,8上安装Nginx,支持https2.0的开启
- Eclipse初始化配置,告别卡顿、闪退、编译时间过长
- Docker快速安装Oracle11G,搭建oracle11g学习环境
- CentOS7,8上快速安装Gitea,搭建Git服务器
- CentOS8安装MyCat,轻松搞定数据库的读写分离、垂直分库、水平分库