VectorizedReader 和 ORC
spark SQL not only SQL
1.SparkSession/DataFrame/Datasets API 2.Catalyst Optimization & Tungsten Execution 3.DataSource Connectors/ Spark Core(RDD API)
优化尽可能的发生晚些,因为spark SQL,可以通过函数和库优化
整体的优化使用库和sql/dataframe
RUN EXPLAIN plan Interpret plan tune plan
optimizer:
使用启发式和代价重写查询计划
column pruning:列裁剪, outer join elimination:消除outer join Predicate push down:谓词下推, constraint propagation:约束传播(broadcast) constant floding:常量累加: join reordering: join重排序 ..... spark.sql.autoBroadcastJoin.threshold keep the statistics updated broadcastJoin Hint memory manager: 跟踪内存的使用,有效的分配内存在task和算子 code generator: 编译物理计划到优化后的java代码 Tungsten Engine: 高效的二进制数据格式和数据结构对cpu和内存的高效。 调整spark.sql.codegen.hugeMethodLimit去避免较大的方法(>8k), 因为这不能被JIT编译器 所编译。
spark分为计算和存储:
完整的数据流: 外部存储给spark 喂数据 spark处理数据 如果spark处理数据很快,数据源就可能称为瓶颈。 更高效的去读取柱状的向量化数据 更高效的使用jvm生成simd 说明 指定的文件系统可以完成跳过不必要的数据和预shuffle,可以通过不必要的shuffle和IO来加速查询 选择支持向量化读取的数据源(parquet,orc) 基于文件的数据源,尽可能的创建分区,桶。
Spark 2.3.0支持ORC Vectorized矢量化源码分析
在Spark2.3.0的release文档中,提到ORC Vectored带来的性能提升:
提高scan吞吐2-5倍;
开启条件:spark.sql.orc.impl=native;
ORC 文件类型
当然该ISSUE的提出还是有些背景的(https://issues.apache.org/jira/browse/SPARK-16060),ORC文件格式本身是Hortonworks提出的针对Hive查询的一种列式存储方案,ORC是在一定程度上扩展了RCFile,是对RCFile的优化。有别于Facebook的RCFile类型,ORC有如下优点:
- ORCFile在RCFile基础上引申出来Stripe和Footer等。每个ORC文件首先会被横向切分成多个Stripe,而每个Stripe内部以列存储,所有的列存储在一个文件中,而且每个stripe默认的大小是250MB,相对于RCFile默认的行组大小是4MB,所以比RCFile更高效;
- ORCFile扩展了RCFile的压缩,除了Run-length(游程编码),引入了字典编码和Bit编码;
- ORCFile保存了文件更多的元信息;
其存储格式如下:
IndexData中保存了该stripe上数据的位置信息,总行数等信息 RowData以stream的形式保存了数据的具体信息 Stripe Footer中包含该stripe的统计结果,包括Max,Min,count等信息 IndexData RowData StripeFooter ... FileFooter中包含该表的统计结果,以及各个Stripe的位置信息 Postscripts中存储该表的行数,压缩参数,压缩大小,列等信息
- 1
- 2
- 3
- 4
- 5
- 6
- 7
- 8
- 9
- 10
- 11
- 12
ORC Vectored使用场景
spark.sql.orc.impl=native的判断
其中spark.sql.orc.impl
有native和hive两种选择,如果针对orc类型选择hive格式直接调用org.apache.spark.sql.hive.orc.OrcFileFormat
类实现类的加载,而如果为native则会基于org.apache.spark.sql.execution.datasources.orc.OrcFileFormat
类进行加载。
/** Given a provider name, look up the data source class definition. */ def lookupDataSource(provider: String, conf: SQLConf): Class[_] = { val provider1 = backwardCompatibilityMap.getOrElse(provider, provider) match { case name if name.equalsIgnoreCase("orc") && conf.getConf(SQLConf.ORC_IMPLEMENTATION) == "native" => classOf[OrcFileFormat].getCanonicalName case name if name.equalsIgnoreCase("orc") && conf.getConf(SQLConf.ORC_IMPLEMENTATION) == "hive" => "org.apache.spark.sql.hive.orc.OrcFileFormat" case name => name } ... }
- 1
- 2
- 3
- 4
- 5
- 6
- 7
- 8
- 9
- 10
- 11
- 12
- 13
- 14
org.apache.spark.sql.execution.datasources.orc.OrcFileFormat
什么时候支持ORC Vectored?
override def supportBatch(sparkSession: SparkSession, schema: StructType): Boolean = { val conf = sparkSession.sessionState.conf conf.orcVectorizedReaderEnabled && conf.wholeStageEnabled && schema.length <= conf.wholeStageMaxNumFields && schema.forall(_.dataType.isInstanceOf[AtomicType]) }
- 1
- 2
- 3
- 4
- 5
- 6
需要满足以下条件:
* 开启spark.sql.orc.enableVectorizedReader
: 默认true;
* 开启spark.sql.codegen.wholeStage
: 默认true并且其scheme的长度不大于wholeStageMaxNumFields(默认100列);
* [关键]所有列数据类型需要为AtomicType类型的;
AtomicType类型,可根据定义查看:
/** * An internal type used to represent everything that is not null, UDTs, arrays, structs, and maps. */ protected[sql] abstract class AtomicType extends DataType { private[sql] type InternalType private[sql] val tag: TypeTag[InternalType] private[sql] val ordering: Ordering[InternalType] }
- 1
- 2
- 3
- 4
- 5
- 6
- 7
- 8
AtomicType代表了非 null/UDTs/arrays/structs/maps类型。所以如果所含列中如果包含null/UDTs/arrays/structs/maps类型,依然无法收到该ISSUE的便利。
ORC Vectored实现
OrcColumnarBatchReader的使用
在OrcFileFormat.buildReaderWithPartitionValues中:
if (enableVectorizedReader) { val batchReader = new OrcColumnarBatchReader( enableOffHeapColumnVector && taskContext.isDefined, copyToSpark) // SPARK-23399 Register a task completion listener first to call `close()` in all cases. // There is a possibility that `initialize` and `initBatch` hit some errors (like OOM) // after opening a file. val iter = new RecordReaderIterator(batchReader) Option(TaskContext.get()).foreach(_.addTaskCompletionListener(_ => iter.close())) // 调用initialize函数 batchReader.initialize(fileSplit, taskAttemptContext) // 调用initBatch batchReader.initBatch( reader.getSchema, requestedColIds, requiredSchema.fields, partitionSchema, file.partitionValues) // 生成iter iter.asInstanceOf[Iterator[InternalRow]] } else { val orcRecordReader = new OrcInputFormat[OrcStruct] .createRecordReader(fileSplit, taskAttemptContext) val iter = new RecordReaderIterator[OrcStruct](orcRecordReader) Option(TaskContext.get()).foreach(_.addTaskCompletionListener(_ => iter.close())) val fullSchema = requiredSchema.toAttributes ++ partitionSchema.toAttributes val unsafeProjection = GenerateUnsafeProjection.generate(fullSchema, fullSchema) val deserializer = new OrcDeserializer(dataSchema, requiredSchema, requestedColIds) if (partitionSchema.length == 0) { iter.map(value => unsafeProjection(deserializer.deserialize(value))) } else { val joinedRow = new JoinedRow() iter.map(value => unsafeProjection(joinedRow(deserializer.deserialize(value), file.partitionValues))) } }
- 1
- 2
- 3
- 4
- 5
- 6
- 7
- 8
- 9
- 10
- 11
- 12
- 13
- 14
- 15
- 16
- 17
- 18
- 19
- 20
- 21
- 22
- 23
- 24
- 25
- 26
- 27
- 28
- 29
- 30
- 31
- 32
- 33
- 34
- 35
- 36
- 37
- 38
OrcColumnarBatchReader的实现
initialize(): 初始化OrcFile Reader及Hadoop环境配置;
initBatch(): 初始化batch变量和columnarBatch变量(其中batch为ORC Reader矢量化每次读取的结果存储变量,columnarBatch为codegen转换为Spark定义类型存储变量 );
nextBatch(): 迭代器,其核心还是调用ORC自定义的vectored函数,需要根据类型转换Spark定义type;
单元测试
参考: org.apache.spark.sql.hive.orc.OrcReadBenchmark
结果分析:
针对数字、String类型测试
Native ORC Vectorized > Native ORC Vectorized with copy > Native ORC MR > Hive built-in ORC
针对分区、不分区测试
Partition性能远远>不分区性能
参考:
- http://spark.apache.org/releases/spark-release-2-3-0.html
- https://www.cnblogs.com/ITtangtang/p/7677912.html
- RCFile和ORCFile: http://blog.csdn.net/u014307117/article/details/52381383
低调大师中文资讯倾力打造互联网数据资讯、行业资源、电子商务、移动互联网、网络营销平台。
持续更新报道IT业界、互联网、市场资讯、驱动更新,是最及时权威的产业资讯及硬件资讯报道平台。
转载内容版权归作者及来源网站所有,本站原创内容转载请注明来源。
- 上一篇
spark2.1.0之源码分析——RPC客户端工厂TransportClientFactory
版权声明:本文为博主原创文章,未经博主允许不得转载。 https://blog.csdn.net/beliefer/article/details/80981101 提示:阅读本文前最好先阅读《Spark2.1.0之内置RPC框架》和《spark2.1.0之源码分析——RPC配置TransportConf》。 TransportClientFactory是创建传输客户端(TransportClient)的工厂类。在说明《Spark2.1.0之内置RPC框架》文中的图1中的记号①时提到过TransportContext的createClientFactory方法可以创建TransportClientFactory的实例,其实现见代码清单1。 代码清单1 创建客户端工厂 public TransportClientFactory createClientFactory(List<TransportClientBootstrap> bootstraps) { return new TransportClientFactory(this, bootstraps); } pub...
- 下一篇
大数据计算杭州高端峰会—探寻真正的“云数据,大计算”
阿里巴巴大数据计算服务MaxCompute携手阿里云MVP邀您共赴杭州大数据计算高端峰会,探寻真正的“云数据,大计算” 现在报名>>> 你的业务数据还在沉睡吗? 如何让数据发挥更大的价值? 数据是企业无价之宝,上云真能保证安全吗? 如何省去自建环境、省去运维,快速实现大数据平台落地,更多聚焦于业务? 我的业务离数据智能有多远?。。。。。。 你也有这些需求和疑惑吗?如果你在杭州,那就来现场倾听、交流,让MaxCompute为你答疑解惑。 【活动介绍】MaxCompute(原ODPS)是一种安全可靠、高效能、低成本、从GB到EB级别的大数据计算服务。能够快速解决用户海量数据的计算问题,有效降低企业大数据计算平台的总体拥有成本,提升大数据应用开发效率,保障数据的云上安全。作为新一代计算引擎,MaxCompute 致力于帮助企业发掘更大
相关文章
文章评论
共有0条评论来说两句吧...
文章二维码
点击排行
推荐阅读
最新文章
- CentOS7,8上快速安装Gitea,搭建Git服务器
- CentOS6,CentOS7官方镜像安装Oracle11G
- CentOS8安装Docker,最新的服务器搭配容器使用
- CentOS7设置SWAP分区,小内存服务器的救世主
- CentOS7安装Docker,走上虚拟化容器引擎之路
- CentOS6,7,8上安装Nginx,支持https2.0的开启
- SpringBoot2整合MyBatis,连接MySql数据库做增删改查操作
- Springboot2将连接池hikari替换为druid,体验最强大的数据库连接池
- Linux系统CentOS6、CentOS7手动修改IP地址
- Jdk安装(Linux,MacOS,Windows),包含三大操作系统的最全安装