Spark-SparkSQL深入学习系列八(转自OopsOutOfMemory)
/** Spark SQL源码分析系列文章*/
在SQL的世界里,除了官方提供的常用的处理函数之外,一般都会提供可扩展的对外自定义函数接口,这已经成为一种事实的标准。
在前面Spark SQL源码分析之核心流程一文中,已经介绍了Spark SQL Catalyst Analyzer的作用,其中包含了ResolveFunctions这个解析函数的功能。但是随着Spark1.1版本的发布,Spark SQL的代码有很多新完善和新功能了,和我先前基于1.0的源码分析多少有些不同,比如支持UDF:
spark1.0及以前的实现:
- protected[sql] lazy val catalog: Catalog = new SimpleCatalog
- @transient
- protected[sql] lazy val analyzer: Analyzer =
- new Analyzer(catalog, EmptyFunctionRegistry, caseSensitive = true) //EmptyFunctionRegistry空实现
- @transient
- protected[sql] val optimizer = Optimizer
Spark1.1及以后的实现:
- protected[sql] lazy val functionRegistry: FunctionRegistry = new SimpleFunctionRegistry //SimpleFunctionRegistry实现,支持简单的UDF
- @transient
- protected[sql] lazy val analyzer: Analyzer =
- new Analyzer(catalog, functionRegistry, caseSensitive = true)
一、引子:
对于SQL语句中的函数,会经过SqlParser的的解析成UnresolvedFunction。UnresolvedFunction最后会被Analyzer解析。
SqlParser:
除了非官方定义的函数外,还可以定义自定义函数,sql parser会进行解析。
- ident ~ "(" ~ repsep(expression, ",") <~ ")" ^^ {
- case udfName ~ _ ~ exprs => UnresolvedFunction(udfName, exprs)
只是这个Expression的dataType等一系列属性和eval计算方法均无法访问,强制访问会抛出异常,因为它没有被Resolved,只是一个载体。
- case class UnresolvedFunction(name: String, children: Seq[Expression]) extends Expression {
- override def dataType = throw new UnresolvedException(this, "dataType")
- override def foldable = throw new UnresolvedException(this, "foldable")
- override def nullable = throw new UnresolvedException(this, "nullable")
- override lazy val resolved = false
- // Unresolved functions are transient at compile time and don't get evaluated during execution.
- override def eval(input: Row = null): EvaluatedType =
- throw new TreeNodeException(this, s"No function to evaluate expression. type: ${this.nodeName}")
- override def toString = s"'$name(${children.mkString(",")})"
- }<strong></strong>
Analyzer:
Analyzer初始化的时候会需要Catalog,database和table的元数据关系,以及FunctionRegistry来维护UDF名称和UDF实现的元数据,这里使用SimpleFunctionRegistry。
- /**
- * Replaces [[UnresolvedFunction]]s with concrete [[catalyst.expressions.Expression Expressions]].
- */
- object ResolveFunctions extends Rule[LogicalPlan] {
- def apply(plan: LogicalPlan): LogicalPlan = plan transform {
- case q: LogicalPlan =>
- q transformExpressions { //对当前LogicalPlan进行transformExpressions操作
- case u @ UnresolvedFunction(name, children) if u.childrenResolved => //如果遍历到了UnresolvedFunction
- registry.lookupFunction(name, children) //从UDF元数据表里查找udf函数
- }
- }
- }
二、UDF注册
2.1 UDFRegistration
registerFunction是UDFRegistration下的方法,SQLContext现在实现了UDFRegistration这个trait,只要导入SQLContext,即可以使用udf功能。
UDFRegistration核心方法registerFunction:
registerFunction方法签名def registerFunction[T: TypeTag](name: String, func: Function1[_, T]): Unit
接受一个udfName 和 一个FunctionN,可以是Function1 到Function22。即这个udf的参数只支持1-22个。(scala的痛啊)
内部builder通过ScalaUdf来构造一个Expression,这里ScalaUdf继承自Expression(可以简单的理解目前的SimpleUDF即是一个Catalyst的一个Expression),传入scala的function作为UDF的实现,并且用反射检查字段类型是否是Catalyst允许的,见ScalaReflection.
- def registerFunction[T: TypeTag](name: String, func: Function1[_, T]): Unit = {
- def builder(e: Seq[Expression]) = ScalaUdf(func, ScalaReflection.schemaFor(typeTag[T]).dataType, e)//构造Expression
- functionRegistry.registerFunction(name, builder)//向SQLContext的functionRegistry(维护了一个hashMap来管理udf映射)注册
注意:这里FunctionBuilder是一个type FunctionBuilder = Seq[Expression] => Expression
- class SimpleFunctionRegistry extends FunctionRegistry {
- val functionBuilders = new mutable.HashMap[String, FunctionBuilder]() //udf映射关系维护[udfName,Expression]
- def registerFunction(name: String, builder: FunctionBuilder) = { //put expression进Map
- functionBuilders.put(name, builder)
- }
- override def lookupFunction(name: String, children: Seq[Expression]): Expression = {
- functionBuilders(name)(children) //查找udf,返回Expression
- }
- }
三、UDF计算:
UDF既然已经被封装为catalyst树里的一个Expression节点,那么计算的时候也就是计算ScalaUdf的eval方法。
先通过Row和表达式计算function所需要的参数,最后通过反射调用function,来达到计算udf的目的。
ScalaUdf继承自Expression:
scalaUdf接受一个function, dataType,和一系列表达式。
比较简单,看注释即可:
- case class ScalaUdf(function: AnyRef, dataType: DataType, children: Seq[Expression])
- extends Expression {
- type EvaluatedType = Any
- def nullable = true
- override def toString = s"scalaUDF(${children.mkString(",")})"
- override def eval(input: Row): Any = {
- val result = children.size match {
- case 0 => function.asInstanceOf[() => Any]()
- case 1 => function.asInstanceOf[(Any) => Any](children(0).eval(input)) //反射调用function
- case 2 =>
- function.asInstanceOf[(Any, Any) => Any](
- children(0).eval(input), //表达式参数计算
- children(1).eval(input))
- case 3 =>
- function.asInstanceOf[(Any, Any, Any) => Any](
- children(0).eval(input),
- children(1).eval(input),
- children(2).eval(input))
- case 4 =>
- ......
- case 22 => //scala function只支持22个参数,这里枚举了。
- function.asInstanceOf[(Any, Any, Any, Any, Any, Any, Any, Any, Any, Any, Any, Any, Any, Any, Any, Any, Any, Any, Any, Any, Any, Any) => Any](
- children(0).eval(input),
- children(1).eval(input),
- children(2).eval(input),
- children(3).eval(input),
- children(4).eval(input),
- children(5).eval(input),
- children(6).eval(input),
- children(7).eval(input),
- children(8).eval(input),
- children(9).eval(input),
- children(10).eval(input),
- children(11).eval(input),
- children(12).eval(input),
- children(13).eval(input),
- children(14).eval(input),
- children(15).eval(input),
- children(16).eval(input),
- children(17).eval(input),
- children(18).eval(input),
- children(19).eval(input),
- children(20).eval(input),
- children(21).eval(input))
四、总结
Spark目前的UDF其实就是scala function。将scala function封装到一个Catalyst Expression当中,在进行sql计算时,使用同样的Eval方法对当前输入Row进行计算。
编写一个spark udf非常简单,只需给UDF起个函数名,并且传递一个scala function即可。依靠scala函数编程的表现能力,使得编写scala udf比较简单,且相较hive的udf更容易使人理解。
——EOF——
原创文章,转载请注明:
转载自:OopsOutOfMemory盛利的Blog,作者: OopsOutOfMemory
本文链接地址:http://blog.csdn.net/oopsoom/article/details/39395641
注:本文基于署名-非商业性使用-禁止演绎 2.5 中国大陆(CC BY-NC-ND 2.5 CN)协议,欢迎转载、转发和评论,但是请保留本文作者署名和文章链接。如若需要用于商业目的或者与授权方面的协商,请联系我。

低调大师中文资讯倾力打造互联网数据资讯、行业资源、电子商务、移动互联网、网络营销平台。
持续更新报道IT业界、互联网、市场资讯、驱动更新,是最及时权威的产业资讯及硬件资讯报道平台。
转载内容版权归作者及来源网站所有,本站原创内容转载请注明来源。
- 上一篇
Spark-SparkSQL深入学习系列六(转自OopsOutOfMemory)
/**Spark SQL源码分析系列文章*/ 前面几篇文章主要介绍的是Sparksql包里的的spark sql执行流程,以及Catalyst包内的SqlParser,Analyzer和Optimizer,最后要介绍一下Catalyst里最后的一个Plan了,即Physical Plan。物理计划是Spark SQL执行Spark job的前置,也是最后一道计划。 如图: 一、SparkPlanner 话接上回,Optimizer接受输入的Analyzed Logical Plan后,会有SparkPlanner来对Optimized Logical Plan进行转换,生成Physical plans。 [java] view plain copy lazyvaloptimizedPlan=optimizer(analyzed) //TODO:Don'tjustpickthefirstone... lazyvalsparkPlan=planner(optimizedPlan).next() SparkPlanner的apply方法,会返回一个Iterator[PhysicalP...
- 下一篇
Spark-SparkSQL深入学习系列九(转自OopsOutOfMemory)
/**Spark SQL源码分析系列文章*/ SparkSQL 可以将数据缓存到内存中,我们可以见到的通过调用cache table tableName即可将一张表缓存到内存中,来极大的提高查询效率。 这就涉及到内存中的数据的存储形式,我们知道基于关系型的数据可以存储为基于行存储结构 或 者基于列存储结构,或者基于行和列的混合存储,即Row Based Storage、Column Based Storage、 PAX Storage。 Spark SQL 的内存数据是如何组织的? Spark SQL 将数据加载到内存是以列的存储结构。称为In-Memory Columnar Storage。 若直接存储JavaObject 会产生很大的内存开销,并且这样是基于Row的存储结构。查询某些列速度略慢,虽然数据以及载入内存,查询效率还是低于面向列的存储结构。 基于Row的Java Object存储: 内存开销大,且容易FULL GC,按列查询比较慢。 基于Column的ByteBuffer存储(Spark SQL): 内存开销小,按列查询速度较快。 Spark SQL的In-Me...
相关文章
文章评论
共有0条评论来说两句吧...
文章二维码
点击排行
推荐阅读
最新文章
- CentOS8,CentOS7,CentOS6编译安装Redis5.0.7
- Eclipse初始化配置,告别卡顿、闪退、编译时间过长
- 设置Eclipse缩进为4个空格,增强代码规范
- Hadoop3单机部署,实现最简伪集群
- CentOS7编译安装Gcc9.2.0,解决mysql等软件编译问题
- CentOS7编译安装Cmake3.16.3,解决mysql等软件编译问题
- 2048小游戏-低调大师作品
- CentOS6,CentOS7官方镜像安装Oracle11G
- Jdk安装(Linux,MacOS,Windows),包含三大操作系统的最全安装
- SpringBoot2更换Tomcat为Jetty,小型站点的福音