Spark操作Hive分区表
我的原创地址:https://dongkelun.com/2018/12/04/sparkHivePatition/
前言
前面学习总结了Hive分区表,现在学习总结一下Spark如何操作Hive分区表,包括利用Spark DataFrame创建Hive的分区表和Spark向已经存在Hive分区表里插入数据,并记录一下遇到的问题以及如何解决。
1、Spark创建分区表
只写主要代码,完整代码见附录
val data = Array(("001", "张三", 21, "2018"), ("002", "李四", 18, "2017")) val df = spark.createDataFrame(data).toDF("id", "name", "age", "year") //可以将append改为overwrite,这样如果表已存在会删掉之前的表,新建表 df.write.mode("append").partitionBy("year").saveAsTable("new_test_partition")
然后在Hive命令行里看一下,新建的表是否有分区字段year
用命令
desc new_test_partition;
或
show create table new_test_partition;
根据下面的结果可以看到新建的表确实有分区字段year
hive> desc new_test_partition; OK id string name string age int year string # Partition Information # col_name data_type comment year string Time taken: 0.432 seconds, Fetched: 9 row(s)
2、向已存在的表插入数据
2.1 Spark创建的分区表
- 这种情况其实和建表语句一样就可以了
- 不需要开启动态分区
df.write.mode("append").partitionBy("year").saveAsTable("new_test_partition")
当然也有其他方式插入数据,会在后面讲到。
2.2 在Hive命令行创建的表
- 这里主要指和Spark创建的表的文件格式不一样,Spark默认的文件格式为PARQUET,为在命令行Hive默认的文件格式为TEXTFILE,这种区别,也导致了异常的出现。
- 需要开启动态分区
- 不开启会有异常:
Exception in thread "main" org.apache.spark.SparkException: Dynamic partition strict mode requires at least one static partition column. To turn this off set hive.exec.dynamic.partition.mode=nonstrict
2.2.1 建表
用Hive分区表学习总结的建表语句建表(之前已经建过就不用重复建了)。
create table test_partition ( id string comment 'ID', name string comment '名字', age int comment '年龄' ) comment '测试分区' partitioned by (year int comment '年') ROW FORMAT DELIMITED FIELDS TERMINATED BY ',' ;
2.2.2 异常
试着用上面的插入语句插入数据
df.write.mode("append").partitionBy("year").saveAsTable("test_partition")
抛出异常
Exception in thread "main" org.apache.spark.sql.AnalysisException: The format of the existing table dkl.test_partition is `HiveFileFormat`. It doesn't match the specified format `ParquetFileFormat`.;
原因就是上面说的文件格式不一致造成的。
2.2.3 解决办法
用fomat指定格式
df.write.mode("append").format("Hive").partitionBy("year").saveAsTable("test_partition")
2.3 其他方法
df.createOrReplaceTempView("temp_table") sql("insert into test_partition select * from temp_table") df.write.insertInto("test_partition")
其中insertInto不需要也不能将df进行partitionBy,否则会抛出异常
df.write.partitionBy("year").insertInto("test_partition") Exception in thread "main" org.apache.spark.sql.AnalysisException: insertInto() can't be used together with partitionBy(). Partition columns have already be defined for the table. It is not necessary to use partitionBy().;
3、完整代码
package com.dkl.blog.spark.hive import org.apache.spark.sql.SparkSession /** * 博客:Spark操作Hive分区表 * https://dongkelun.com/2018/12/04/sparkHivePatition/ * */ object SparkHivePatition { def main(args: Array[String]): Unit = { val spark = SparkSession .builder() .appName("SparkHive") .master("local") .config("spark.sql.parquet.writeLegacyFormat", true) .enableHiveSupport() .getOrCreate() import spark.sql val data = Array(("001", "张三", 21, "2018"), ("002", "李四", 18, "2017")) val df = spark.createDataFrame(data).toDF("id", "name", "age", "year") //创建临时表 df.createOrReplaceTempView("temp_table") //切换hive的数据库 sql("use dkl") // 1、创建分区表,可以将append改为overwrite,这样如果表已存在会删掉之前的表,新建表 df.write.mode("append").partitionBy("year").saveAsTable("new_test_partition") //2、向Spark创建的分区表写入数据 df.write.mode("append").partitionBy("year").saveAsTable("new_test_partition") sql("insert into new_test_partition select * from temp_table") df.write.insertInto("new_test_partition") //开启动态分区 sql("set hive.exec.dynamic.partition.mode=nonstrict") //3、向在Hive里用Sql创建的分区表写入数据,抛出异常 // df.write.mode("append").partitionBy("year").saveAsTable("test_partition") // 4、解决方法 df.write.mode("append").format("Hive").partitionBy("year").saveAsTable("test_partition") sql("insert into test_partition select * from temp_table") df.write.insertInto("test_partition") //这样会抛出异常 // df.write.partitionBy("year").insertInto("test_partition") spark.stop } }
相关阅读
低调大师中文资讯倾力打造互联网数据资讯、行业资源、电子商务、移动互联网、网络营销平台。
持续更新报道IT业界、互联网、市场资讯、驱动更新,是最及时权威的产业资讯及硬件资讯报道平台。
转载内容版权归作者及来源网站所有,本站原创内容转载请注明来源。
- 上一篇
Spark SQL,如何将 DataFrame 转为 json 格式
今天主要介绍一下如何将 Spark dataframe 的数据转成 json 数据。用到的是 scala 提供的 json 处理的 api。 用过 Spark SQL 应该知道,Spark dataframe 本身有提供一个 api 可以供我们将数据转成一个 JsonArray,我们可以在 spark-shell 里头举个栗子来看一下。 import org.apache.spark.sql.SparkSession val spark = SparkSession.builder().master("master").appName("test").config("spark.sql.warehouse.dir", warehouseLocation).enableHiveSupport().getOrCreate(); //提供隐式转换功能,比如将 Rdd 转为 dataframe import spark.implicits._ val df:DataFrame = sc.parallelize(Array(("abc",2),("efg",4))).toDF() df.show...
- 下一篇
# Apache Spark系列技术直播# 第四讲 【 机器学习介绍与Spark MLlib实践 】
主讲人:江宇(燕回) 阿里巴巴计算平台EMR技术专家 直播时间:2018.12.06 19:00 - 20:00 内容提要:本次讲座主要面对的是机器学习的入门者,以及想要使用Spark来进行机器学习的用户。我们会介绍一下机器学习相关领域的基础知识,以及机器学习在spark上面的实践,同时给出我们的一些使用建议。 视频分享:https://yq.aliyun.com/live/693ppt分享:https://yq.aliyun.com/download/3129 欢迎扫码进群共同交流:
相关文章
文章评论
共有0条评论来说两句吧...
文章二维码
点击排行
推荐阅读
最新文章
- SpringBoot2编写第一个Controller,响应你的http请求并返回结果
- Mario游戏-低调大师作品
- CentOS6,CentOS7官方镜像安装Oracle11G
- SpringBoot2配置默认Tomcat设置,开启更多高级功能
- Windows10,CentOS7,CentOS8安装Nodejs环境
- Docker安装Oracle12C,快速搭建Oracle学习环境
- Windows10,CentOS7,CentOS8安装MongoDB4.0.16
- CentOS8编译安装MySQL8.0.19
- MySQL8.0.19开启GTID主从同步CentOS8
- CentOS8安装Docker,最新的服务器搭配容器使用