Spark UDF使用详解及代码示例
我的原创地址:https://dongkelun.com/2018/08/02/sparkUDF/
前言
本文介绍如何在Spark Sql和DataFrame中使用UDF,如何利用UDF给一个表或者一个DataFrame根据需求添加几列,并给出了旧版(Spark1.x)和新版(Spark2.x)完整的代码示例。
- 关于UDF:UDF:User Defined Function,用户自定义函数。
1、创建测试用DataFrame
下面以Spark2.x为例给出代码,关于Spark1.x创建DataFrame可在最后的完整代码里查看。
// 构造测试数据,有两个字段、名字和年龄 val userData = Array(("Leo", 16), ("Marry", 21), ("Jack", 14), ("Tom", 18)) //创建测试df val userDF = spark.createDataFrame(userData).toDF("name", "age") userDF.show
+-----+---+ | name|age| +-----+---+ | Leo| 16| |Marry| 21| | Jack| 14| | Tom| 18| +-----+---+
// 注册一张user表 userDF.createOrReplaceTempView("user")
2、Spark Sql用法
2.1 通过匿名函数注册UDF
下面的UDF的功能是计算某列的长度,该列的类型为String
2.1.1 注册
- Spark2.x:
spark.udf.register("strLen", (str: String) => str.length())
- Spark1.x:
sqlContext.udf.register("strLen", (str: String) => str.length())
2.2.2 使用
仅以Spark2.x为例
spark.sql("select name,strLen(name) as name_len from user").show
+-----+--------+ | name|name_len| +-----+--------+ | Leo| 3| |Marry| 5| | Jack| 4| | Tom| 3| +-----+--------+
2.2 通过实名函数注册UDF
实名函数的注册有点不同,要在后面加 _(注意前面有个空格)
定义一个实名函数
/** * 根据年龄大小返回是否成年 成年:true,未成年:false */ def isAdult(age: Int) = { if (age < 18) { false } else { true } }
注册(仅以Spark2.x为例)
spark.udf.register("isAdult", isAdult _)
至于使用都是一样的
2.3 关于spark.udf和sqlContext.udf
在Spark2.x里,两者实际最终都是调用的spark.udf
sqlContext.udf源码
def udf: UDFRegistration = sparkSession.udf
可以看到调用的是sparkSession的udf,即spark.udf
3、DataFrame用法
DataFrame的udf方法虽然和Spark Sql的名字一样,但是属于不同的类,它在org.apache.spark.sql.functions里,下面是它的用法
3.1注册
import org.apache.spark.sql.functions._ //注册自定义函数(通过匿名函数) val strLen = udf((str: String) => str.length()) //注册自定义函数(通过实名函数) val udf_isAdult = udf(isAdult _)
3.2 使用
可通过withColumn和select使用,下面的代码已经实现了给user表添加两列的功能
* 通过看源码,下面的withColumn和select方法Spark2.0.0之后才有的,关于spark1.xDataFrame怎么使用注册好的UDF没有研究
//通过withColumn添加列 userDF.withColumn("name_len", strLen(col("name"))).withColumn("isAdult", udf_isAdult(col("age"))).show //通过select添加列 userDF.select(col("*"), strLen(col("name")) as "name_len", udf_isAdult(col("age")) as "isAdult").show
结果均为
+-----+---+--------+-------+ | name|age|name_len|isAdult| +-----+---+--------+-------+ | Leo| 16| 3| false| |Marry| 21| 5| true| | Jack| 14| 4| false| | Tom| 18| 3| true| +-----+---+--------+-------+
3.3 withColumn和select的区别
可通过withColumn的源码看出withColumn的功能是实现增加一列,或者替换一个已存在的列,他会先判断DataFrame里有没有这个列名,如果有的话就会替换掉原来的列,没有的话就用调用select方法增加一列,所以如果我们的需求是增加一列的话,两者实现的功能一样,且最终都是调用select方法,但是withColumn会提前做一些判断处理,所以withColumn的性能不如select好。
* 注:select方法和sql 里的select一样,如果新增的列名在表里已经存在,那么结果里允许出现两列列名相同但数据不一样,大家可以自己试一下。
/** * Returns a new Dataset by adding a column or replacing the existing column that has * the same name. * * @group untypedrel * @since 2.0.0 */ def withColumn(colName: String, col: Column): DataFrame = { val resolver = sparkSession.sessionState.analyzer.resolver val output = queryExecution.analyzed.output val shouldReplace = output.exists(f => resolver(f.name, colName)) if (shouldReplace) { val columns = output.map { field => if (resolver(field.name, colName)) { col.as(colName) } else { Column(field) } } select(columns : _*) } else { select(Column("*"), col.as(colName)) } }
4、完整代码
下面的代码的功能是使用UDF给user表添加两列:name_len、isAdult,每个输出结果都是一样的
+-----+---+--------+-------+ | name|age|name_len|isAdult| +-----+---+--------+-------+ | Leo| 16| 3| false| |Marry| 21| 5| true| | Jack| 14| 4| false| | Tom| 18| 3| true| +-----+---+--------+-------+
代码:
package com.dkl.leanring.spark.sql import org.apache.spark.SparkContext import org.apache.spark.sql.SQLContext import org.apache.spark.SparkConf import org.apache.spark.sql.SparkSession /** * Spark Sql 用户自定义函数示例 */ object UdfDemo { def main(args: Array[String]): Unit = { oldUdf newUdf newDfUdf oldDfUdf } /** * 根据年龄大小返回是否成年 成年:true,未成年:false */ def isAdult(age: Int) = { if (age < 18) { false } else { true } } /** * 旧版本(Spark1.x)Spark Sql udf示例 */ def oldUdf() { //spark 初始化 val conf = new SparkConf() .setMaster("local") .setAppName("oldUdf") val sc = new SparkContext(conf) val sqlContext = new SQLContext(sc) import sqlContext.implicits._ // 构造测试数据,有两个字段、名字和年龄 val userData = Array(("Leo", 16), ("Marry", 21), ("Jack", 14), ("Tom", 18)) //创建测试df val userDF = sc.parallelize(userData).toDF("name", "age") // 注册一张user表 userDF.registerTempTable("user") // 注册自定义函数(通过匿名函数) sqlContext.udf.register("strLen", (str: String) => str.length()) sqlContext.udf.register("isAdult", isAdult _) // 使用自定义函数 sqlContext.sql("select *,strLen(name)as name_len,isAdult(age) as isAdult from user").show //关闭 sc.stop() } /** * 新版本(Spark2.x)Spark Sql udf示例 */ def newUdf() { //spark初始化 val spark = SparkSession.builder().appName("newUdf").master("local").getOrCreate() // 构造测试数据,有两个字段、名字和年龄 val userData = Array(("Leo", 16), ("Marry", 21), ("Jack", 14), ("Tom", 18)) //创建测试df val userDF = spark.createDataFrame(userData).toDF("name", "age") // 注册一张user表 userDF.createOrReplaceTempView("user") //注册自定义函数(通过匿名函数) spark.udf.register("strLen", (str: String) => str.length()) //注册自定义函数(通过实名函数) spark.udf.register("isAdult", isAdult _) spark.sql("select *,strLen(name) as name_len,isAdult(age) as isAdult from user").show //关闭 spark.stop() } /** * 新版本(Spark2.x)DataFrame udf示例 */ def newDfUdf() { val spark = SparkSession.builder().appName("newDfUdf").master("local").getOrCreate() // 构造测试数据,有两个字段、名字和年龄 val userData = Array(("Leo", 16), ("Marry", 21), ("Jack", 14), ("Tom", 18)) //创建测试df val userDF = spark.createDataFrame(userData).toDF("name", "age") import org.apache.spark.sql.functions._ //注册自定义函数(通过匿名函数) val strLen = udf((str: String) => str.length()) //注册自定义函数(通过实名函数) val udf_isAdult = udf(isAdult _) //通过withColumn添加列 userDF.withColumn("name_len", strLen(col("name"))).withColumn("isAdult", udf_isAdult(col("age"))).show //通过select添加列 userDF.select(col("*"), strLen(col("name")) as "name_len", udf_isAdult(col("age")) as "isAdult").show //关闭 spark.stop() } /** * 旧版本(Spark1.x)DataFrame udf示例 * 注意,这里只是用的Spark1.x创建sc的和df的语法,其中注册udf在Spark1.x也是可以使用的的 * 但是withColumn和select方法Spark2.0.0之后才有的,关于spark1.xDataFrame怎么使用注册好的UDF没有研究 */ def oldDfUdf() { //spark 初始化 val conf = new SparkConf() .setMaster("local") .setAppName("oldDfUdf") val sc = new SparkContext(conf) val sqlContext = new SQLContext(sc) import sqlContext.implicits._ // 构造测试数据,有两个字段、名字和年龄 val userData = Array(("Leo", 16), ("Marry", 21), ("Jack", 14), ("Tom", 18)) //创建测试df val userDF = sc.parallelize(userData).toDF("name", "age") import org.apache.spark.sql.functions._ //注册自定义函数(通过匿名函数) val strLen = udf((str: String) => str.length()) //注册自定义函数(通过实名函数) val udf_isAdult = udf(isAdult _) //通过withColumn添加列 userDF.withColumn("name_len", strLen(col("name"))).withColumn("isAdult", udf_isAdult(col("age"))).show //通过select添加列 userDF.select(col("*"), strLen(col("name")) as "name_len", udf_isAdult(col("age")) as "isAdult").show //关闭 sc.stop() } }

低调大师中文资讯倾力打造互联网数据资讯、行业资源、电子商务、移动互联网、网络营销平台。
持续更新报道IT业界、互联网、市场资讯、驱动更新,是最及时权威的产业资讯及硬件资讯报道平台。
转载内容版权归作者及来源网站所有,本站原创内容转载请注明来源。
- 上一篇
通过数据库客户端界面工具DBeaver连接Hive
版权声明:本文由董可伦首发于https://dongkelun.com,非商业转载请注明作者及原创出处。商业转载请联系作者本人。 https://blog.csdn.net/dkl12/article/details/81381122 我的原创地址:https://dongkelun.com/2018/07/13/dbeaverConnectHive/ 前言 本文讲解如何通过数据库客户端界面工具DBeaver连接hive,并解决驱动下载不下来的问题。 1、为什么使用客户端界面工具 为什么使用客户端界面工具而不用命令行使用hive 通过界面工具查看分析hive里的数据要方便很多 业务人员没有权限通过命令行连接hive 领导喜欢在界面工具上查看hive里的数据 2、为什么使用DBeaver 其实在网上搜一下,连接hive的工具还有很多,使用DBeaver的原因是因为我之前连接关系型数据库使用的就是DBeaver,正好DBeaver支持连接hive,且个人认为DBeaver确实挺好用的,支持各种关系型数据库,如连接Oracle数据库不需要像plsql那样自己配置连接文件,只需要在界面上输入u...
- 下一篇
HBase详细说明
HBase是一个高可靠性、高性能、面向列、可伸缩的分布式存储系统。 适合于存储大表数据(表的规模可以达到数十亿行以及数百万列),并且对大表数据的读、写访问可以达到实时级别; 利用Hadoop HDFS(Hadoop Distributed File System)作为其文件存储系统,提供高可靠性、高性能、列存储、可伸缩、实时读写的数据库系统; 利用ZooKeeper作为协同服务。 与RMDB比较: HBase 分布式存储,面向列。 动态扩展列。 普通商用硬件支持,扩容成本低。 RMDB 数据结构固定。 需要预先定义好数据结构。 需要大量IO,扩展成本大。 HBase适合具有如下需求的应用: 海量数据(TB、PB) 高吞吐量 需要在海量数据中实现高效的随机读取 需要很好的性能伸缩能力 能够同时处理结构化和非结构化的数据 不需要完全拥有传统关系型数据库所具备的ACID特性 数据结构介绍: 结构化数据 具有固定的结构,属性划分,以及类型等信息。我们通常所理解的关系型数据库中所存储的数据信息,大多是结构化数据, 如职工信息表,拥有ID、Name、Phone、Addr...
相关文章
文章评论
共有0条评论来说两句吧...
文章二维码
点击排行
推荐阅读
最新文章
- CentOS8编译安装MySQL8.0.19
- SpringBoot2整合Thymeleaf,官方推荐html解决方案
- Hadoop3单机部署,实现最简伪集群
- Springboot2将连接池hikari替换为druid,体验最强大的数据库连接池
- CentOS7设置SWAP分区,小内存服务器的救世主
- SpringBoot2全家桶,快速入门学习开发网站教程
- SpringBoot2编写第一个Controller,响应你的http请求并返回结果
- Docker快速安装Oracle11G,搭建oracle11g学习环境
- Windows10,CentOS7,CentOS8安装Nodejs环境
- Eclipse初始化配置,告别卡顿、闪退、编译时间过长