您现在的位置是:首页 > 文章详情

Spark获取当前分区的partitionId

日期:2018-07-05点击:374
版权声明:本文由董可伦首发于https://dongkelun.com,非商业转载请注明作者及原创出处。商业转载请联系作者本人。 https://blog.csdn.net/dkl12/article/details/80943341

我的原创地址:https://dongkelun.com/2018/06/28/sparkGetPartitionId/

前言

本文讲解Spark如何获取当前分区的partitionId,这是一位群友提出的问题,其实只要通过TaskContext.get.partitionId(我是在官网上看到的),下面给出一些示例。

1、代码

下面的代码主要测试SparkSession,SparkContext创建的rdd和df是否都支持。

package com.dkl.leanring.partition import org.apache.spark.sql.SparkSession import org.apache.spark.TaskContext /** * 获取当前分区的partitionId */ object GetPartitionIdDemo { def main(args: Array[String]): Unit = { val spark = SparkSession.builder().appName("GetPartitionIdDemo").master("local").getOrCreate() val sc = spark.sparkContext val data = Seq(1, 2, 3, 4) // 测试rdd,三个分区 val rdd = sc.parallelize(data, 3) rdd.foreach(i => { println("partitionId:" + TaskContext.get.partitionId) }) import spark.implicits._ // 测试df,三个分区 val df = rdd.toDF("id") df.show df.foreach(row => { println("partitionId:" + TaskContext.get.partitionId) }) // 测试df,两个分区 val data1 = Array((1, 2), (3, 4)) val df1 = spark.createDataFrame(data1).repartition(2) df1.show() df1.foreach(row => { println("partitionId:" + TaskContext.get.partitionId) }) } }

2、结果

原文链接:https://yq.aliyun.com/articles/676189
关注公众号

低调大师中文资讯倾力打造互联网数据资讯、行业资源、电子商务、移动互联网、网络营销平台。

持续更新报道IT业界、互联网、市场资讯、驱动更新,是最及时权威的产业资讯及硬件资讯报道平台。

转载内容版权归作者及来源网站所有,本站原创内容转载请注明来源。

文章评论

共有0条评论来说两句吧...

文章二维码

扫描即可查看该文章

点击排行

推荐阅读

最新文章