spark RDD,reduceByKey vs groupByKey
Spark 中有两个类似的api,分别是 reduceByKey 和 groupByKey 。这两个的功能类似,但底层实现却有些不同,那么为什么要这样设计呢?我们来从源码的角度分析一下。
先看两者的调用顺序(都是使用默认的Partitioner,即defaultPartitioner)
所用 spark 版本:spark 2.1.0
先看reduceByKey
Step1
def reduceByKey(func: (V, V) => V): RDD[(K, V)] = self.withScope { reduceByKey(defaultPartitioner(self), func) }
Setp2
def reduceByKey(partitioner: Partitioner, func: (V, V) => V): RDD[(K, V)] = self.withScope { combineByKeyWithClassTag[V]((v: V) => v, func, func, partitioner) }
Setp3
def combineByKeyWithClassTag[C]( createCombiner: V => C, mergeValue: (C, V) => C, mergeCombiners: (C, C) => C, partitioner: Partitioner, mapSideCombine: Boolean = true, serializer: Serializer = null)(implicit ct: ClassTag[C]): RDD[(K, C)] = self.withScope { require(mergeCombiners != null, "mergeCombiners must be defined") // required as of Spark 0.9.0 if (keyClass.isArray) { if (mapSideCombine) { throw new SparkException("Cannot use map-side combining with array keys.") } if (partitioner.isInstanceOf[HashPartitioner]) { throw new SparkException("HashPartitioner cannot partition array keys.") } } val aggregator = new Aggregator[K, V, C]( self.context.clean(createCombiner), self.context.clean(mergeValue), self.context.clean(mergeCombiners)) if (self.partitioner == Some(partitioner)) { self.mapPartitions(iter => { val context = TaskContext.get() new InterruptibleIterator(context, aggregator.combineValuesByKey(iter, context)) }, preservesPartitioning = true) } else { new ShuffledRDD[K, V, C](self, partitioner) .setSerializer(serializer) .setAggregator(aggregator) .setMapSideCombine(mapSideCombine) } }
姑且不去看方法里面的细节,我们会只要知道最后调用的是 combineByKeyWithClassTag 这个方法。这个方法有两个参数我们来重点看一下,
def combineByKeyWithClassTag[C]( createCombiner: V => C, mergeValue: (C, V) => C, mergeCombiners: (C, C) => C, partitioner: Partitioner, mapSideCombine: Boolean = true, serializer: Serializer = null)
首先是 partitioner 参数 ,这个即是 RDD 的分区设置。除了默认的 defaultPartitioner,Spark 还提供了 RangePartitioner 和 HashPartitioner 外,此外用户也可以自定义 partitioner 。通过源码可以发现如果是 HashPartitioner 的话,那么是会抛出一个错误的。
然后是 mapSideCombine 参数 ,这个参数正是 reduceByKey 和 groupByKey 最大不同的地方,它决定是是否会先在节点上进行一次 Combine 操作,下面会有更具体的例子来介绍。
然后是groupByKey
Step1
def groupByKey(): RDD[(K, Iterable[V])] = self.withScope { groupByKey(defaultPartitioner(self)) }
Step2
def groupByKey(partitioner: Partitioner): RDD[(K, Iterable[V])] = self.withScope { // groupByKey shouldn't use map side combine because map side combine does not // reduce the amount of data shuffled and requires all map side data be inserted // into a hash table, leading to more objects in the old gen. val createCombiner = (v: V) => CompactBuffer(v) val mergeValue = (buf: CompactBuffer[V], v: V) => buf += v val mergeCombiners = (c1: CompactBuffer[V], c2: CompactBuffer[V]) => c1 ++= c2 val bufs = combineByKeyWithClassTag[CompactBuffer[V]]( createCombiner, mergeValue, mergeCombiners, partitioner, mapSideCombine = false) bufs.asInstanceOf[RDD[(K, Iterable[V])]] }
Setp3
def combineByKeyWithClassTag[C]( createCombiner: V => C, mergeValue: (C, V) => C, mergeCombiners: (C, C) => C, partitioner: Partitioner, mapSideCombine: Boolean = true, serializer: Serializer = null)(implicit ct: ClassTag[C]): RDD[(K, C)] = self.withScope { require(mergeCombiners != null, "mergeCombiners must be defined") // required as of Spark 0.9.0 if (keyClass.isArray) { if (mapSideCombine) { throw new SparkException("Cannot use map-side combining with array keys.") } if (partitioner.isInstanceOf[HashPartitioner]) { throw new SparkException("HashPartitioner cannot partition array keys.") } } val aggregator = new Aggregator[K, V, C]( self.context.clean(createCombiner), self.context.clean(mergeValue), self.context.clean(mergeCombiners)) if (self.partitioner == Some(partitioner)) { self.mapPartitions(iter => { val context = TaskContext.get() new InterruptibleIterator(context, aggregator.combineValuesByKey(iter, context)) }, preservesPartitioning = true) } else { new ShuffledRDD[K, V, C](self, partitioner) .setSerializer(serializer) .setAggregator(aggregator) .setMapSideCombine(mapSideCombine) } }
结合上面 reduceByKey 的调用链,可以发现最终其实都是调用 combineByKeyWithClassTag 这个方法的,但调用的参数不同。
reduceByKey的调用
combineByKeyWithClassTag[V]((v: V) => v, func, func, partitioner)
groupByKey的调用
combineByKeyWithClassTag[CompactBuffer[V]]( createCombiner, mergeValue, mergeCombiners, partitioner, mapSideCombine = false)
正是两者不同的调用方式导致了两个方法的差别,我们分别来看
- reduceByKey的泛型参数直接是[V],而groupByKey的泛型参数是[CompactBuffer[V]]。这直接导致了 reduceByKey 和 groupByKey 的返回值不同,前者是RDD[(K, V)],而后者是RDD[(K, Iterable[V])]
- 然后就是mapSideCombine = false 了,这个mapSideCombine 参数的默认是true的。这个值有什么用呢,上面也说了,这个参数的作用是控制要不要在map端进行初步合并(Combine)。可以看看下面具体的例子。
从功能上来说,可以发现 ReduceByKey 其实就是会在每个节点先进行一次合并的操作,而 groupByKey 没有。
这么来看 ReduceByKey 的性能会比 groupByKey 好很多,因为有些工作在节点已经处理了。那么 groupByKey 为什么存在,它的应用场景是什么呢?我也不清楚,如果观看这篇文章的读者知道的话不妨在评论里说出来吧。非常感谢!
低调大师中文资讯倾力打造互联网数据资讯、行业资源、电子商务、移动互联网、网络营销平台。
持续更新报道IT业界、互联网、市场资讯、驱动更新,是最及时权威的产业资讯及硬件资讯报道平台。
转载内容版权归作者及来源网站所有,本站原创内容转载请注明来源。
- 上一篇
记一次HBase内存泄漏导致RegionServer挂掉问题
问题描述 在测试Phoenix稳定性时,发现HBase集群其中一台RegionServer节点FullGC严重,隔一段时间就会挂掉。 HBase集群规格 选项 Master RegionServer 节点数 2台,一主一备 3台 CPU核数 2core 4core 存储 SSD云盘,单节点440G 初步分析 使用jstat监控RegionServer的Heap size和垃圾回收情况: [root@iZbp18zqovyu9djsjb05dzZ ~]# jstat -gcutil 454 5000 S0 S1 E O M CCS YGC YGCT FGC FGCT GCT 100.00 0.00 55.68 90.19 98.75 97.30 2244 57.3
- 下一篇
HDP2.6 Hadoop如何支持读写OSS
HDP和Ambari HDP(Hortonworks Data Platform)是由Hortonworks发行的大数据平台,里面包含了Hadoop、Hive、HBase等很多开源组件,目前有不少用户直接使用HDP版本的Hadoop。Ambari是一个分布式工具,可以安装、管理,监控HDP平台。HDP与Ambari的关系,可以类比CDH与CM的关系。目前,HDP的最新版本是3.0.1,里面的Hadoop版本是3.1.1,天然支持了OSS。本文主要介绍如何使低版本的HDP(以HDP2.6.1.0为例)支持读写OSS。 HDP2.6.1.0支持读写OSS HDP2.6.1.0中,Hadoop的版本是2.7.3,还不支持OSS(目前Apache Hadoop支持OSS的最低版本是2.9.1)。 搭建HDP集群 通过官方文档,利用Ambari搭建H
相关文章
文章评论
共有0条评论来说两句吧...
文章二维码
点击排行
推荐阅读
最新文章
- SpringBoot2整合MyBatis,连接MySql数据库做增删改查操作
- Windows10,CentOS7,CentOS8安装Nodejs环境
- SpringBoot2整合Thymeleaf,官方推荐html解决方案
- CentOS6,7,8上安装Nginx,支持https2.0的开启
- MySQL8.0.19开启GTID主从同步CentOS8
- CentOS7,8上快速安装Gitea,搭建Git服务器
- Hadoop3单机部署,实现最简伪集群
- Docker安装Oracle12C,快速搭建Oracle学习环境
- CentOS8,CentOS7,CentOS6编译安装Redis5.0.7
- CentOS8编译安装MySQL8.0.19