Apache Spark 3.0 中的向量化 IO
本文转载自:过往记忆大数据
原文链接
R 是数据科学中最流行的计算机语言之一,专门用于统计分析和一些扩展,如用于数据处理和机器学习任务的 RStudio addins 和其他 R 包。此外,它使数据科学家能够轻松地可视化他们的数据集。
通过在 Apache Spark 中使用 SparkR,可以很容易地扩展 R 代码。要交互式地运行作业,可以通过运行 R shell 轻松地在分布式集群中运行 R 的作业。
当 SparkR 不需要与 R 进程交互时,其性能实际上与 Scala、Java 和 Python 等其他语言 API 相同。但是,当 SparkR 作业与本机 R 函数或数据类型交互时,会性能显著下降。
如果在 Spark 和 R 之间使用 Apache Arrow 来进行数据交换,其性能会有很大的提升。这篇博客文章概述了 SparkR 中 Spark 和 R 的交互,并对比了没有向量化执行和有向量化执行的性能差异。
Spark 和 R 交互
SparkR 不仅支持丰富的 ML 和类似 SQL 的 API 集合,而且还支持用于直接与 R 代码进行交互的一组 API。例如,Spark DataFrame 和 R DataFrame 之间的无缝转换以及在 Spark DataFrame 上以分布式的方式执行 R 内置函数。
在大多数情况下,Spark 中的其他语言 API 之间的性能实际上是一致的——例如,当用户代码依赖于 Spark UDF 或者 SQL API 时,执行过程完全在 JVM 中进行, I/O 方面没有任何性能损失。比如下面的两种调用时间都只需要一秒:
// Scala API // ~1 second sql("SELECT id FROM range(2000000000)").filter("id > 10").count() # R API # ~1 second count(filter(sql("SELECT * FROM range(2000000000)"), "id > 10"))
但是,在需要执行 R 的内置函数或将其从 R 内置类型转换到其他语言类型的情况下,其性能将有很大不同,如下所示。
// Scala API val ds = (1L to 100000L).toDS // ~1 second ds.mapPartitions(iter => iter.filter(_ < 50000)).count() # R API df <- createDataFrame(lapply(seq(100000), function (e) list(value=e))) # ~15 seconds - 15 times slower count(dapply( df, function(x) as.data.frame(x[x$value < 50000,]), schema(df)))
上面其实仅仅是对每个分区中过滤出小于 50000 的数据,然后对其进行 count 操作,但是 SparkR 却比 Scala 编写的代码慢 15 倍!
// Scala API // ~0.2 seconds val df = sql("SELECT * FROM range(1000000)").collect() # R API # ~8 seconds - 40 times slower df <- collect(sql("SELECT * FROM range(1000000)"))
上面这个例子情况更糟糕,其仅仅是将数据收集到 Driver 端,但是 SparkR 比 Scala 要慢 40 倍!
这是因为上面计算需要与 R 内置函数或数据类型交互的 API ,但是其实现效率不高。在 SparkR 中类似的函数还有六个:
createDataFrame()
collect()
dapply()
dapplyCollect()
gapply()
gapplyCollect()
简单来说,createDataFrame() 和 collect() 需要在 JVM 和 R 之间进行序列化/反序列化,并且对数据进行转换,比如 Java 中的字符串需要转换成 R 中的 character。
原始实现(Native implementation)
上图中 SparkR DataFrame 的计算是分布在 Spark 集群上所有可用的节点上。如果不需要将数据以 R 的 data.frame 进行收集(collect)或不需要执行 R 内置函数,则在 Driver 或 executor 端不需要与 R 进程进行通信。但是当它需要使用 R 的 data.frame 或使用 R 的内置函数时,需要 Driver 或 executor 使用 sockets 使得 JVM 和 R 进行通信。
这需要在 JVM 和 R 直接对交换的数据进行序列化和反序列化操作,而这个操作的编码格式非常低效,完全没有考虑到现代 CPU 的设计,比如 CPU pipelining。
向量化执行(Vectorized implementation)
在 Apache Spark 3.0 中,SparkR 中引入了一种新的向量化(vectorized)实现,它利用 Apache Arrow 直接在 JVM 和 R 之间交换数据,且(反)序列化成本非常小,具体如下:
新的实现方式并没有在 JVM 和 R 之间使用低效的格式对数据逐行进行(反)序列化,而是利用 Apache Arrow 以高效的列格式进行流水线处理和单指令多数据(SIMD)。
新的矢量化 SparkR API 默认情况下未启用,但可以通过在 Apache Spark 3.0 中将 spark.sql.execution.arrow.sparkr.enabled 设置为 true 来启用。注意,dapplyCollect() 和 gapplyCollect() 矢量化操作尚未实现。建议使用 dapply() 和 gapply() 来替代。
基准测试结果
下面的基准测试使用的数据集为 500,000 条记录。分别测试使用和未使用矢量化的执行时间:
使用矢量化优化之后,collect() 和 createDataFrame() 性能分别大致提升 17 倍和 42x 倍;而对 dapply() 和 gapply(), 分别提升了43x 和 33x 。
从上面的启发可以看到,如果我们需要在不同系统之间进行数据交互,也可以使用 Apache Arrow。
阿里巴巴开源大数据技术团队成立Apache Spark中国技术社区,定期推送精彩案例,技术专家直播,问答区近万人Spark技术同学在线提问答疑,只为营造纯粹的Spark氛围,欢迎钉钉扫码加入!
对开源大数据和感兴趣的同学可以加小编微信(下图二维码,备注“进群”)进入技术交流微信群。
Apache Spark技术交流社区公众号,微信扫一扫关注

低调大师中文资讯倾力打造互联网数据资讯、行业资源、电子商务、移动互联网、网络营销平台。
持续更新报道IT业界、互联网、市场资讯、驱动更新,是最及时权威的产业资讯及硬件资讯报道平台。
转载内容版权归作者及来源网站所有,本站原创内容转载请注明来源。
- 上一篇
7月23日社区直播【TFPark: Distributed TensorFlow in Production on Apache Spark...
直播主题 TFPark: Distributed TensorFlow in Production on Apache Spark 讲师: 汪洋英特尔大数据团队的机器学习工程师,专注于分布式机器学习框架和应用。他是Analytics Zoo和BigDL的核心贡献者之一。 时间: 7月23日 19:00 观看直播方式: 扫描下方二维码入群,或届时进入直播间(回看链接)https://developer.aliyun.com/live/43484 直播介绍 TFPark是开源AI平台Analytics Zoo中一个模块,它的可以很方便让用户在Spark集群中分布式地进行TensorFlow模型的训练和推断。一方面,TFPark利用Spark将TensorFlow 定义的AI训练或推理任务无缝的嵌入到用户的大数据流水线中,而无需对现有集群做任何修改;另一方面TFPark屏蔽了复杂的分布式系统逻辑,可以将单机开发的AI应用轻松扩展到几十甚至上百节点上。本次分享将介绍TFPark的使用,内部实现以及在生产环境中的实际案例。
- 下一篇
初次使用 Elasticsearch 遇多种分词难题?那是你没掌握这些原理
作者介绍 魏彬,普翔科技 CTO,开源软件爱好者,中国第一位 Elastic 认证工程师,《Elastic日报》和 《ElasticTalk》社区项目发起人,被 elastic 中国公司授予 2019 年度合作伙伴架构师特别贡献奖。对 Elasticsearch、Kibana、Beats、Logstash、Grafana 等开源软件有丰富的实践经验,为零售、金融、保险、证券、科技等众多行业的客户提供过咨询和培训服务,帮助客户在实际业务中找准开源软件的定位,实现从 0 到 1 的落地、从 1 到 N 的拓展,产生实际的业务价值。 初次接触 Elasticsearch 的同学经常会遇到分词相关的难题,比如如下这些场景: 1、为什么命名有包含搜索关键词的文档,但结果里面就没有相关文档呢?2、我存进去的文档到底被分成哪些词(term)了?3、我得自定义分词规则,但感觉好麻烦呢,无从下手 如果你遇到过类似的问题,希望本文可以解决你的疑惑。 一、上手 让我们从一个实例出发,如下创建一个文档: PUT test/doc/1 { "msg":"Eating an apple a day keeps d...
相关文章
文章评论
共有0条评论来说两句吧...
文章二维码
点击排行
推荐阅读
最新文章
- Springboot2将连接池hikari替换为druid,体验最强大的数据库连接池
- CentOS6,7,8上安装Nginx,支持https2.0的开启
- CentOS关闭SELinux安全模块
- CentOS7设置SWAP分区,小内存服务器的救世主
- Docker安装Oracle12C,快速搭建Oracle学习环境
- Docker快速安装Oracle11G,搭建oracle11g学习环境
- CentOS7编译安装Gcc9.2.0,解决mysql等软件编译问题
- CentOS7,8上快速安装Gitea,搭建Git服务器
- CentOS7安装Docker,走上虚拟化容器引擎之路
- SpringBoot2整合Thymeleaf,官方推荐html解决方案