Spark SQL,如何将 DataFrame 转为 json 格式
今天主要介绍一下如何将 Spark dataframe 的数据转成 json 数据。用到的是 scala 提供的 json 处理的 api。
用过 Spark SQL 应该知道,Spark dataframe 本身有提供一个 api 可以供我们将数据转成一个 JsonArray,我们可以在 spark-shell 里头举个栗子来看一下。
import org.apache.spark.sql.SparkSession val spark = SparkSession.builder().master("master").appName("test").config("spark.sql.warehouse.dir", warehouseLocation).enableHiveSupport().getOrCreate(); //提供隐式转换功能,比如将 Rdd 转为 dataframe import spark.implicits._ val df:DataFrame = sc.parallelize(Array(("abc",2),("efg",4))).toDF() df.show() /*-------------show ----------- +---+---+ | _1| _2| +---+---+ |abc| 2| |efg| 4| +---+---+ */ //这里使用 dataframe Api 转换成 jsonArray val jsonStr:String = a.toJSON.collectAsList.toString /*--------------- json String------------- [{"_1":"abc","_2":2}, {"_1":"efg","_2":4}] */
可以发现,我们可以使用 dataframe 提供的 api 直接将 dataframe 转换成 jsonArray 的形式,但这样子却有些冗余。以上面的例子来说,很多时候我要的不是这样的形式。
[{"_1":"abc","_2":2}, {"_1":"efg","_2":4}]
而是下面这种形式。
[{"abc":2}, {"efg":4}]
这才是我们通常会使用到的 json 格式。以 dataframe 的 api 转换而成的 json 明显太过冗余。为此,我们需要借助一些 json 处理的包,本着能懒则懒的原则,直接使用 scala 提供的 json 处理包。
import org.apache.spark.sql.{DataFrame, SparkSession} import org.apache.spark.sql.SparkSession val spark = SparkSession.builder().master("master").appName("test").config("spark.sql.warehouse.dir", warehouseLocation).enableHiveSupport().getOrCreate(); //提供隐式转换功能,比如将 Rdd 转为 dataframe import spark.implicits._ val df:DataFrame = sc.parallelize(Array(("abc",2),("efg",4))).toDF() df.show() /*-------------show ----------- +---+---+ | _1| _2| +---+---+ |abc| 2| |efg| 4| +---+---+ */ //接下来不一样了 val df2Array:Array[Tuple2[String,Int]] = df.collect().map{case org.apache.spark.sql.Row(x:String,y:Int) => (x,y)} val jsonData:Array[JSONObject] = aM.map{ i => new JSONObject(Map(i._1 -> i._2)) } val jsonArray:JSONArray = new JSONArray(jsonData.toList) /*-----------jsonArray------------ [{"abc" : 2}, {"efg" : 4}] */
大概说明一下上述的代码,首先我们要先将 df 变量进行 collect 操作,将它转换成 Array ,但是要生成 jsonObject 得是 Array[Tuple2[T,T]] 的格式,所以我们需要再进一步转换成对应格式。这里的 map 是函数式编程里面的 map 。
然后也是用 map 操作生成 Array[JSONObject],最后再转换成 JSONArray 就可以。
将数据转换成 json 的格式通常不能太大,一般用在 spark 跑出数据结果后写入到其他数据库的时候会用到,比如 Mysql 。
以上~~
欢迎关注公众号哈尔的数据城堡,里面有数据,代码,以及深度的思考。
低调大师中文资讯倾力打造互联网数据资讯、行业资源、电子商务、移动互联网、网络营销平台。
持续更新报道IT业界、互联网、市场资讯、驱动更新,是最及时权威的产业资讯及硬件资讯报道平台。
转载内容版权归作者及来源网站所有,本站原创内容转载请注明来源。
- 上一篇
Spark分布式计算引擎的应用
什么是分布式计算 基本概念 分布式计算和它相反,运算过程是同时发生在不同机器上执行的,然后通过一定的机制将每台机器的结果聚合得出最后的数据结论--> 和集中式计算相反,分布式计算的一个计算过程将会在多台机器上进行。组件之间彼此进行交互以实现一个共同的目标,把需要进行大量计算的工程数据分区成小块,由多台计算机分别计算,再上传运算结果后,将结果统一合并得出数据结论。 简单说就是1个人干活和100个人干活的区别。 就像CPU从单核变多核一样,然后发展处超线程这种技术,从单台机器的集中式计算发展为多台机器的分布式计算是随着计算机的发展自然而然出现的--> 分布式计算是一门计算机科学的研究课题,涉及到许多分支技术(CS模型、集群技术、通用型分布式计算环境等)。 以下仅涉及其中一部分内容:从分布式计算的理论基础中实现,并且已经得到了大规模生产环境验证的计算框架。 如何实现 要实现分布式计算首先要解决其中两个最重要的问题: 1.如何拆分计算逻辑 2.如何分发计算逻辑 拆分逻辑 计算逻辑要实现分布式,就必须要解决:如何将一个巨大的问题拆分成相对独立的子问题分发到各个机器上求解。 从在哪里发生计算的角度...
- 下一篇
Spark操作Hive分区表
版权声明:本文由董可伦首发于https://dongkelun.com,非商业转载请注明作者及原创出处。商业转载请联系作者本人。 https://blog.csdn.net/dkl12/article/details/84867600 我的原创地址:https://dongkelun.com/2018/12/04/sparkHivePatition/ 前言 前面学习总结了Hive分区表,现在学习总结一下Spark如何操作Hive分区表,包括利用Spark DataFrame创建Hive的分区表和Spark向已经存在Hive分区表里插入数据,并记录一下遇到的问题以及如何解决。 1、Spark创建分区表 只写主要代码,完整代码见附录 val data = Array(("001", "张三", 21, "2018"), ("002", "李四", 18, "2017")) val df = spark.createDataFrame(data).toDF("id", "name", "age", "year") //可以将append改为overwrite,这样如果表已存在会删掉之前的表,...
相关文章
文章评论
共有0条评论来说两句吧...
文章二维码
点击排行
推荐阅读
最新文章
- Docker使用Oracle官方镜像安装(12C,18C,19C)
- CentOS6,CentOS7官方镜像安装Oracle11G
- SpringBoot2初体验,简单认识spring boot2并且搭建基础工程
- Hadoop3单机部署,实现最简伪集群
- CentOS6,7,8上安装Nginx,支持https2.0的开启
- CentOS关闭SELinux安全模块
- SpringBoot2编写第一个Controller,响应你的http请求并返回结果
- Windows10,CentOS7,CentOS8安装MongoDB4.0.16
- SpringBoot2整合Thymeleaf,官方推荐html解决方案
- Springboot2将连接池hikari替换为druid,体验最强大的数据库连接池