Spark 2.4.0编程指南--Spark SQL UDF和UDAF
Spark 2.4.0编程指南--Spark SQL UDF和UDAF
更多资源
视频
- Spark 2.4.0编程指南--Spark SQL UDF和UDAF(bilibili视频) : https://www.bilibili.com/video/av38193405/?p=4
<iframe width="800" height="500" src="//player.bilibili.com/player.html?aid=38193405&cid=67137841&page=4" scrolling="no" border="0" frameborder="no" framespacing="0" allowfullscreen="true"> </iframe>
文档
前置条件
- 已安装好java(选用的是java 1.8.0_191)
- 已安装好scala(选用的是scala 2.11.121)
- 已安装好hadoop(选用的是Hadoop 3.1.1)
- 已安装好spark(选用的是spark 2.4.0)
技能标签
- 了解UDF 用户定义函数(User-defined functions, UDFs)
- 了解UDAF (user-defined aggregate function), 用户定义的聚合函数
- UDF示例(统计行数据字符长度)
- UDF示例(统计行数据字符转大写)
- UDAF示例(统计总行数)
- UDAF示例(统计最大收入)
- UDAF示例(统计平均收入)
- UDAF示例(统计按性别分组的最大收入)
- 官网: http://spark.apache.org/docs/2.4.0/sql-getting-started.html#aggregations
UDF
用户定义函数(User-defined functions, UDFs)是大多数 SQL 环境的关键特性,用于扩展系统的内置功能。 UDF允许开发人员通过抽象其低级语言实现来在更高级语言(如SQL)中启用新功能。 Apache Spark 也不例外,并且提供了用于将 UDF 与 Spark SQL工作流集成的各种选项。
- 用户定义函数(User-defined functions, UDFs)
- UDF对表中的单行进行转换,以便为每行生成单个对应的输出值
##示例
- 得到SparkSession
BaseSparkSession
/** * 得到SparkSession * 首先 extends BaseSparkSession * 本地: val spark = sparkSession(true) * 集群: val spark = sparkSession() */ class BaseSparkSession { var appName = "sparkSession" var master = "spark://standalone.com:7077" //本地模式:local standalone:spark://master:7077 def sparkSession(): SparkSession = { val spark = SparkSession.builder .master(master) .appName(appName) .config("spark.eventLog.enabled","true") .config("spark.history.fs.logDirectory","hdfs://standalone.com:9000/spark/log/historyEventLog") .config("spark.eventLog.dir","hdfs://standalone.com:9000/spark/log/historyEventLog") .getOrCreate() spark.sparkContext.addJar("/opt/n_001_workspaces/bigdata/spark-scala-maven-2.4.0/target/spark-scala-maven-2.4.0-1.0-SNAPSHOT.jar") //import spark.implicits._ spark } def sparkSession(isLocal:Boolean = false): SparkSession = { if(isLocal){ master = "local" val spark = SparkSession.builder .master(master) .appName(appName) .getOrCreate() //spark.sparkContext.addJar("/opt/n_001_workspaces/bigdata/spark-scala-maven-2.4.0/target/spark-scala-maven-2.4.0-1.0-SNAPSHOT.jar") //import spark.implicits._ spark }else{ val spark = SparkSession.builder .master(master) .appName(appName) .config("spark.eventLog.enabled","true") .config("spark.history.fs.logDirectory","hdfs://standalone.com:9000/spark/log/historyEventLog") .config("spark.eventLog.dir","hdfs://standalone.com:9000/spark/log/historyEventLog") .getOrCreate() // spark.sparkContext.addJar("/opt/n_001_workspaces/bigdata/spark-scala-maven-2.4.0/target/spark-scala-maven-2.4.0-1.0-SNAPSHOT.jar") //import spark.implicits._ spark } } /** * 得到当前工程的路径 * @return */ def getProjectPath:String=System.getProperty("user.dir") }
UDF (统计字段长度)
- 对数据集中,每行数据的特定字段,计算字符长度
- 通过 spark.sql 直接在字段查询处调用函数名称
/** * 自定义匿名函数 * 功能: 得到某列数据长度的函数 */ object Run extends BaseSparkSession{ def main(args: Array[String]): Unit = { val spark = sparkSession(true) val ds = spark.read.json("hdfs://standalone.com:9000/home/liuwen/data/employees.json") ds.show() // +-------+------+ // | name|salary| // +-------+------+ // |Michael| 3000| // | Andy| 4500| // | Justin| 3500| // | Berta| 4000| // +-------+------+ spark.udf.register("strLength",(str: String) => str.length()) ds.createOrReplaceTempView("employees") spark.sql("select name,salary,strLength(name) as name_Length from employees").show() // +-------+------+-----------+ // | name|salary|name_Length| // +-------+------+-----------+ // |Michael| 3000| 7| // | Andy| 4500| 4| // | Justin| 3500| 6| // | Berta| 4000| 5| // +-------+------+-----------+ spark.stop() } }
UDF (字段转成大写)
- 对数据集中,每行数据的特定字段,计算字符长度
- 通过 dataSet.withColumn 调用column
- Column通过udf函数转换
import com.opensource.bigdata.spark.standalone.base.BaseSparkSession /** * 自定义匿名函数 * 功能: 得到某列数据长度的函数 */ object Run extends BaseSparkSession{ def main(args: Array[String]): Unit = { val spark = sparkSession(true) val ds = spark.read.json("hdfs://standalone.com:9000/home/liuwen/data/employees.json") ds.show() // +-------+------+ // | name|salary| // +-------+------+ // |Michael| 3000| // | Andy| 4500| // | Justin| 3500| // | Berta| 4000| // +-------+------+ import org.apache.spark.sql.functions._ val strUpper = udf((str: String) => str.toUpperCase()) import spark.implicits._ ds.withColumn("toUpperCase", strUpper($"name")).show // +-------+------+-----------+ // | name|salary|toUpperCase| // +-------+------+-----------+ // |Michael| 3000| MICHAEL| // | Andy| 4500| ANDY| // | Justin| 3500| JUSTIN| // | Berta| 4000| BERTA| // +-------+------+-----------+ spark.stop() } }
UDAF
- UDAF(user-defined aggregate function, 用户定义的聚合函数
- 同时处理多行,并且返回一个结果,通常结合使用 GROUP BY 语句(例如 COUNT 或 SUM)
count
- 统计一共有多少行数据
package com.opensource.bigdata.spark.sql.n_08_spark_udaf.n_01_spark_udaf_count import com.opensource.bigdata.spark.standalone.base.BaseSparkSession import org.apache.spark.sql.Row import org.apache.spark.sql.expressions.{MutableAggregationBuffer, UserDefinedAggregateFunction} import org.apache.spark.sql.types._ /** * ).initialize()方法,初使使,即没数据时的值 * ).update() 方法把每一行的数据进行计算,放到缓冲对象中 * ).merge() 把每个分区,缓冲对象进行合并 * ).evaluate()计算结果表达式,把缓冲对象中的数据进行最终计算 */ object Run2 extends BaseSparkSession{ object CustomerCount extends UserDefinedAggregateFunction{ //聚合函数的输入参数数据类型 def inputSchema: StructType = { StructType(StructField("inputColumn",StringType) :: Nil) } //中间缓存的数据类型 def bufferSchema: StructType = { StructType(StructField("sum",LongType) :: Nil) } //最终输出结果的数据类型 def dataType: DataType = LongType def deterministic: Boolean = true //初始值,要是DataSet没有数据,就返回该值 def initialize(buffer: MutableAggregationBuffer): Unit = { buffer(0) = 0L } /** * * @param buffer 相当于把当前分区的,每行数据都需要进行计算,计算的结果保存到buffer中 * @param input */ def update(buffer: MutableAggregationBuffer, input: Row): Unit ={ if(!input.isNullAt(0)){ buffer(0) = buffer.getLong(0) + 1 } } /** * 相当于把每个分区的数据进行汇总 * @param buffer1 分区一的数据 * @param buffer2 分区二的数据 */ def merge(buffer1: MutableAggregationBuffer, buffer2: Row): Unit={ buffer1(0) = buffer1.getLong(0) +buffer2.getLong(0) // salary } //计算最终的结果 def evaluate(buffer: Row): Long = buffer.getLong(0) } def main(args: Array[String]): Unit = { val spark = sparkSession(true) spark.udf.register("customerCount",CustomerCount) val df = spark.read.json("hdfs://standalone.com:9000/home/liuwen/data/employees.json") df.createOrReplaceTempView("employees") val sqlDF = spark.sql("select customerCount(name) as average_salary from employees ") df.show() // +-------+------+ // | name|salary| // +-------+------+ // |Michael| 3000| // | Andy| 4500| // | Justin| 3500| // | Berta| 4000| // +-------+------+ sqlDF.show() // +--------------+ // |average_salary| // +--------------+ // | 4.0| // +--------------+ spark.stop() } }
max
- 统计收入最高的
package com.opensource.bigdata.spark.sql.n_08_spark_udaf.n_03_spark_udaf_sum import com.opensource.bigdata.spark.standalone.base.BaseSparkSession import org.apache.spark.sql.Row import org.apache.spark.sql.expressions.{MutableAggregationBuffer, UserDefinedAggregateFunction} import org.apache.spark.sql.types._ /** * ).initialize()方法,初使使,即没数据时的值 * ).update() 方法把每一行的数据进行计算,放到缓冲对象中 * ).merge() 把每个分区,缓冲对象进行合并 * ).evaluate()计算结果表达式,把缓冲对象中的数据进行最终计算 */ object Run extends BaseSparkSession{ object CustomerSum extends UserDefinedAggregateFunction{ //聚合函数的输入参数数据类型 def inputSchema: StructType = { StructType(StructField("inputColumn",LongType) :: Nil) } //中间缓存的数据类型 def bufferSchema: StructType = { StructType(StructField("sum",LongType) :: StructField("count",LongType) :: Nil) } //最终输出结果的数据类型 def dataType: DataType = LongType def deterministic: Boolean = true //初始值,要是DataSet没有数据,就返回该值 def initialize(buffer: MutableAggregationBuffer): Unit = { buffer(0) = 0L } /** * * @param buffer 相当于把当前分区的,每行数据都需要进行计算,计算的结果保存到buffer中 * @param input */ def update(buffer: MutableAggregationBuffer, input: Row): Unit ={ if(!input.isNullAt(0)){ buffer(0) = buffer.getLong(0) + input.getLong(0) } } /** * 相当于把每个分区的数据进行汇总 * @param buffer1 分区一的数据 * @param buffer2 分区二的数据 */ def merge(buffer1: MutableAggregationBuffer, buffer2: Row): Unit={ buffer1(0) = buffer1.getLong(0) + buffer2.getLong(0) } //计算最终的结果 def evaluate(buffer: Row): Long = buffer.getLong(0) } def main(args: Array[String]): Unit = { val spark = sparkSession(true) spark.udf.register("customerSum",CustomerSum) val df = spark.read.json("hdfs://standalone.com:9000/home/liuwen/data/employees.json") df.createOrReplaceTempView("employees") val sqlDF = spark.sql("select customerSum(salary) as average_salary from employees ") df.show // +-------+------+ // | name|salary| // +-------+------+ // |Michael| 3000| // | Andy| 4500| // | Justin| 3500| // | Berta| 4000| // +-------+------+ sqlDF.show() // +--------------+ // |average_salary| // +--------------+ // | 15000| // +--------------+ spark.stop() } }
average
- 统计平均收入水平
package com.opensource.bigdata.spark.sql.n_08_spark_udaf.n_04_spark_udaf_average import com.opensource.bigdata.spark.standalone.base.BaseSparkSession import org.apache.spark.sql.Row import org.apache.spark.sql.expressions.{MutableAggregationBuffer, UserDefinedAggregateFunction} import org.apache.spark.sql.types._ object Run extends BaseSparkSession{ object MyAverage extends UserDefinedAggregateFunction{ //聚合函数的输入参数数据类型 def inputSchema: StructType = { StructType(StructField("inputColumn",LongType) :: Nil) } //中间缓存的数据类型 def bufferSchema: StructType = { StructType(StructField("sum",LongType) :: StructField("count",LongType) :: Nil) } //最终输出结果的数据类型 def dataType: DataType = DoubleType def deterministic: Boolean = true //初始值,要是DataSet没有数据,就返回该值 def initialize(buffer: MutableAggregationBuffer): Unit = { buffer(0) = 0L buffer(1) = 0L } /** * * @param buffer 相当于把当前分区的,每行数据都需要进行计算,计算的结果保存到buffer中 * @param input */ def update(buffer: MutableAggregationBuffer, input: Row): Unit ={ if(!input.isNullAt(0)){ buffer(0) = buffer.getLong(0) + input.getLong(0) // salary buffer(1) = buffer.getLong(1) + 1 // count } } /** * 相当于把每个分区的数据进行汇总 * @param buffer1 分区一的数据 * @param buffer2 分区二的数据 */ def merge(buffer1: MutableAggregationBuffer, buffer2: Row): Unit={ buffer1(0) = buffer1.getLong(0) +buffer2.getLong(0) // salary buffer1(1) = buffer1.getLong(1) +buffer2.getLong(1) // count } //计算最终的结果 def evaluate(buffer: Row): Double = buffer.getLong(0).toDouble / buffer.getLong(1) } def main(args: Array[String]): Unit = { val spark = sparkSession(true) spark.udf.register("MyAverage",MyAverage) val df = spark.read.json("hdfs://standalone.com:9000/home/liuwen/data/employees.json") df.createOrReplaceTempView("employees") val sqlDF = spark.sql("select MyAverage(salary) as average_salary from employees ") sqlDF.show() spark.stop() } }
group by max
- 按性别分组统计收入最高是多少
- 即统计男,女,各收入最高是多少
package com.opensource.bigdata.spark.sql.n_08_spark_udaf.n_05_spark_udaf_groupby_max import com.opensource.bigdata.spark.standalone.base.BaseSparkSession import org.apache.spark.sql.Row import org.apache.spark.sql.expressions.{MutableAggregationBuffer, UserDefinedAggregateFunction} import org.apache.spark.sql.types._ /** * ).initialize()方法,初使使,即没数据时的值 * ).update() 方法把每一行的数据进行计算,放到缓冲对象中 * ).merge() 把每个分区,缓冲对象进行合并 * ).evaluate()计算结果表达式,把缓冲对象中的数据进行最终计算 */ object Run extends BaseSparkSession{ object CustomerMax extends UserDefinedAggregateFunction{ //聚合函数的输入参数数据类型 def inputSchema: StructType = { StructType(StructField("inputColumn",LongType) :: Nil) } //中间缓存的数据类型 def bufferSchema: StructType = { StructType(StructField("sum",LongType) :: StructField("count",LongType) :: Nil) } //最终输出结果的数据类型 def dataType: DataType = LongType def deterministic: Boolean = true //初始值,要是DataSet没有数据,就返回该值 def initialize(buffer: MutableAggregationBuffer): Unit = { buffer(0) = 0L } /** * * @param buffer 相当于把当前分区的,每行数据都需要进行计算,计算的结果保存到buffer中 * @param input */ def update(buffer: MutableAggregationBuffer, input: Row): Unit ={ if(!input.isNullAt(0)){ if(input.getLong(0) > buffer.getLong(0)){ buffer(0) = input.getLong(0) } } } /** * 相当于把每个分区的数据进行汇总 * @param buffer1 分区一的数据 * @param buffer2 分区二的数据 */ def merge(buffer1: MutableAggregationBuffer, buffer2: Row): Unit={ if( buffer2.getLong(0) > buffer1.getLong(0)) buffer1(0) = buffer2.getLong(0) } //计算最终的结果 def evaluate(buffer: Row): Long = buffer.getLong(0) } def main(args: Array[String]): Unit = { val spark = sparkSession(true) spark.udf.register("customerMax",CustomerMax) val df = spark.read.json("hdfs://standalone.com:9000/home/liuwen/data/employeesCN.json") df.createOrReplaceTempView("employees") val sqlDF = spark.sql("select gender,customerMax(salary) as average_salary from employees group by gender ") df.show // +------+----+------+ // |gender|name|salary| // +------+----+------+ // | 男|小王| 30000| // | 女|小丽| 50000| // | 男|小军| 80000| // | 女|小李| 90000| // +------+----+------+ sqlDF.show() // +------+--------------+ // |gender|average_salary| // +------+--------------+ // | 男| 80000| // | 女| 90000| // +------+--------------+ spark.stop() } }
其它支持
- Spark SQL 支持集成现有 Hive 中的 UDF ,UDAF 和 UDTF 的(Java或Scala)实现。
- UDTFs(user-defined table functions, 用户定义的表函数)可以返回多列和多行 end
低调大师中文资讯倾力打造互联网数据资讯、行业资源、电子商务、移动互联网、网络营销平台。
持续更新报道IT业界、互联网、市场资讯、驱动更新,是最及时权威的产业资讯及硬件资讯报道平台。
转载内容版权归作者及来源网站所有,本站原创内容转载请注明来源。
- 上一篇
Azkaban源码编译
Azkaban源码编译 Azkaban没有提供成品的安装包,需要自己编译,其构建有两个硬性条件: 1.Azkaban是使用Gradle构建的。 2.Azkaban使用JDK版本必须是1.8及其以上的,这是一个强依赖。 1、编译环境 1.操作系统 官方提示可以使用Linux,OS X 等*nix平台。 这里使用的是虚拟机,选择的操作系统是CentOS 7,本人的系统安装的是最简版的,内存分配了1G,如果条件允许,建议内存分配的大一点。不然编译的时间会很长。 CentOS 6.5也是可以的,但是会遇到很多问题,这里建议使用CentOS7操作系统。 2.安装JDK 这里选择的是jdk1.8.0_131版本。 jdk的安装这里忽略。 3.安装git 使用如下命令进行安装: yum install git 安装过程中遇到选择y/n的选项,全部选择y。 如果不安装git在后续的编译过程中,会报错,错误信息如下: 4.安装g++ 使用如下命令进行安装: yum install gcc-c++ 安装过程中遇到选择y/n的选项,全部选择y。 2、下载源码 git下载 官方提供的是git下载,下载命令如下...
- 下一篇
原生JavaScript进行前后端同构
什么是前后端同构 明确三个概念:「后端渲染」指传统的 ASP、Java 或 PHP 的渲染机制;「前端渲染」指使用 JS 来渲染页面大部分内容,代表是现在流行的 SPA 单页面应用;「同构渲染」指前后端共用 JS,首次渲染时使用 Node.js 来直出 HTML。一般来说同构渲染是介于前后端中的共有部分。 感觉前端的确是折腾,之前还在流行前后端分离,现在怎么又要做前后端同构了? 原因是现在流行的SPA前端单页面应用比较沉重,首次访问需要加载文件较多,第一次加载过慢,用户需要等待前端进行渲染页面。而且不利于SEO及缓存,并且有一定的开发门槛。 前后端同构通过复用模板和JS文件,让一份代码可以同时跑在服务器和浏览器,首次渲染使用nodejs渲染页面,之后使用SPA路由跳转。可以有效减少用户首次访问的等待时间,并且对SEO比较友好,也便于缓存。 项目简介 本前后端同构项目主要分为两个部分,一个是基于koa2的渲染服务器,另一个是基于原生JS和zepto的前端SPA。 项目的特点是不使用vue和react等框架,门槛低,开发速度快,便于上手,比较轻巧,核心的router部分只有一百行左右的代码...
相关文章
文章评论
共有0条评论来说两句吧...
文章二维码
点击排行
推荐阅读
最新文章
- CentOS关闭SELinux安全模块
- CentOS7编译安装Gcc9.2.0,解决mysql等软件编译问题
- Windows10,CentOS7,CentOS8安装Nodejs环境
- CentOS7编译安装Cmake3.16.3,解决mysql等软件编译问题
- CentOS6,7,8上安装Nginx,支持https2.0的开启
- CentOS8,CentOS7,CentOS6编译安装Redis5.0.7
- CentOS8安装MyCat,轻松搞定数据库的读写分离、垂直分库、水平分库
- SpringBoot2编写第一个Controller,响应你的http请求并返回结果
- SpringBoot2整合Redis,开启缓存,提高访问速度
- SpringBoot2更换Tomcat为Jetty,小型站点的福音