Spark-SparkSQL深入学习系列五(转自OopsOutOfMemory)
/**Spark SQL源码分析系列文章*/ 前几篇文章介绍了SparkSQL的Catalyst的核心运行流程、SqlParser,和Analyzer以及核心类库TreeNode,本文将详细讲解Spark SQL的Optimizer的优化思想以及Optimizer在Catalyst里的表现方式,并加上自己的实践,对Optimizer有一个直观的认识。 Optimizer的主要职责是将Analyzer给Resolved的Logical Plan根据不同的优化策略Batch,来对语法树进行优化,优化逻辑计划节点(Logical Plan)以及表达式(Expression),也是转换成物理执行计划的前置。如下图: 一、Optimizer Optimizer这个类是在catalyst里的optimizer包下的唯一一个类,Optimizer的工作方式其实类似Analyzer,因为它们都继承自RuleExecutor[LogicalPlan],都是执行一系列的Batch操作: Optimizer里的batches包含了3类优化策略:1、Combine Limits 合并Limits2、ConstantFolding 常量合并3、Filter Pushdown 过滤器下推,每个Batch里定义的优化伴随对象都定义在Optimizer里了: [java] view plain copy objectOptimizerextendsRuleExecutor[LogicalPlan]{ valbatches= Batch("CombineLimits",FixedPoint(100), CombineLimits):: Batch("ConstantFolding",FixedPoint(100), NullPropagation, ConstantFolding, BooleanSimplification, SimplifyFilters, SimplifyCasts, SimplifyCaseConversionExpressions):: Batch("FilterPushdown",FixedPoint(100), CombineFilters, PushPredicateThroughProject, PushPredicateThroughJoin, ColumnPruning)::Nil } 另外提一点,Optimizer里不但对Logical Plan进行了优化,而且对Logical Plan中的Expression也进行了优化,所以有必要了解一下Expression相关类,主要是用到了references和outputSet,references主要是Logical Plan或Expression节点的所依赖的那些Expressions,而outputSet是Logical Plan所有的Attribute的输出: 如:Aggregate是一个Logical Plan, 它的references就是group by的表达式 和 aggreagate的表达式的并集去重。 [java] view plain copy caseclassAggregate( groupingExpressions:Seq[Expression], aggregateExpressions:Seq[NamedExpression], child:LogicalPlan) extendsUnaryNode{ overridedefoutput=aggregateExpressions.map(_.toAttribute) overridedefreferences= (groupingExpressions++aggregateExpressions).flatMap(_.references).toSet } 二、优化策略详解 Optimizer的优化策略不仅有对plan进行transform的,也有对expression进行transform的,究其原理就是遍历树,然后应用优化的Rule,但是注意一点,对Logical Plantransfrom的是 先序遍历(pre-order),而对Expression transfrom的时候是 后序遍历(post-order): 2.1、Batch:Combine Limits 如果出现了2个Limit,则将2个Limit合并为一个,这个要求一个Limit是另一个Limit的grandChild。 [java] view plain copy /** *Combinestwoadjacent[[Limit]]operatorsintoone,mergingthe *expressionsintoonesingleexpression. */ objectCombineLimitsextendsRule[LogicalPlan]{ defapply(plan:LogicalPlan):LogicalPlan=plantransform{ casell@Limit(le,nl@Limit(ne,grandChild))=>//ll为当前Limit,le为其expression,nl是ll的grandChild,ne是nl的expression Limit(If(LessThan(ne,le),ne,le),grandChild)//expression比较,如果ne比le小则表达式为ne,否则为le } } 给定SQL:val query = sql("select * from (select * from temp_shengli limit 100)a limit 10 ") [java] view plain copy scala>query.queryExecution.analyzed res12:org.apache.spark.sql.catalyst.plans.logical.LogicalPlan= Limit10 Project[key#13,value#14] Limit100 Project[key#13,value#14] MetastoreRelationdefault,temp_shengli,None 子查询里limit100,外层查询limit10,这里我们当然可以在子查询里不必查那么多,因为外层只需要10个,所以这里会合并Limit10,和Limit100 为 Limit 10。 2.2、Batch:ConstantFolding 这个Batch里包含了Rules:NullPropagation,ConstantFolding,BooleanSimplification,SimplifyFilters,SimplifyCasts,SimplifyCaseConversionExpressions。 2.2.1、Rule:NullPropagation 这里先提一下Literal字面量,它其实是一个能匹配任意基本类型的类。(为下文做铺垫) [java] view plain copy objectLiteral{ defapply(v:Any):Literal=vmatch{ casei:Int=>Literal(i,IntegerType) casel:Long=>Literal(l,LongType) cased:Double=>Literal(d,DoubleType) casef:Float=>Literal(f,FloatType) caseb:Byte=>Literal(b,ByteType) cases:Short=>Literal(s,ShortType) cases:String=>Literal(s,StringType) caseb:Boolean=>Literal(b,BooleanType) cased:BigDecimal=>Literal(d,DecimalType) caset:Timestamp=>Literal(t,TimestampType) casea:Array[Byte]=>Literal(a,BinaryType) casenull=>Literal(null,NullType) } } 注意Literal是一个LeafExpression,核心方法是eval,给定Row,计算表达式返回值: [java] view plain copy caseclassLiteral(value:Any,dataType:DataType)extendsLeafExpression{ overridedeffoldable=true defnullable=value==null defreferences=Set.empty overridedeftoString=if(value!=null)value.toStringelse"null" typeEvaluatedType=Any overridedefeval(input:Row):Any=value } 现在来看一下NullPropagation都做了什么。 NullPropagation是一个能将Expression Expressions替换为等价的Literal值的优化,并且能够避免NULL值在SQL语法树的传播。 [java] view plain copy /** *Replaces[[ExpressionExpressions]]thatcanbestaticallyevaluatedwith *equivalent[[Literal]]values.Thisruleismorespecificwith *Nullvaluepropagationfrombottomtotopoftheexpressiontree. */ objectNullPropagationextendsRule[LogicalPlan]{ defapply(plan:LogicalPlan):LogicalPlan=plantransform{ caseq:LogicalPlan=>qtransformExpressionsUp{ casee@Count(Literal(null,_))=>Cast(Literal(0L),e.dataType)//如果count(null)则转化为count(0) casee@Sum(Literal(c,_))ifc==0=>Cast(Literal(0L),e.dataType)<spanstyle="font-family:Arial;">//如果sum(null)则转化为sum(0)</span> casee@Average(Literal(c,_))ifc==0=>Literal(0.0,e.dataType) casee@IsNull(c)if!c.nullable=>Literal(false,BooleanType) casee@IsNotNull(c)if!c.nullable=>Literal(true,BooleanType) casee@GetItem(Literal(null,_),_)=>Literal(null,e.dataType) casee@GetItem(_,Literal(null,_))=>Literal(null,e.dataType) casee@GetField(Literal(null,_),_)=>Literal(null,e.dataType) casee@Coalesce(children)=>{ valnewChildren=children.filter(c=>cmatch{ caseLiteral(null,_)=>false case_=>true }) if(newChildren.length==0){ Literal(null,e.dataType) }elseif(newChildren.length==1){ newChildren(0) }else{ Coalesce(newChildren) } } casee@If(Literal(v,_),trueValue,falseValue)=>if(v==true)trueValueelsefalseValue casee@In(Literal(v,_),list)if(list.exists(c=>cmatch{ caseLiteral(candidate,_)ifcandidate==v=>true case_=>false }))=>Literal(true,BooleanType) //Putexceptionalcasesaboveifany casee:BinaryArithmetic=>e.childrenmatch{ caseLiteral(null,_)::right::Nil=>Literal(null,e.dataType) caseleft::Literal(null,_)::Nil=>Literal(null,e.dataType) case_=>e } casee:BinaryComparison=>e.childrenmatch{ caseLiteral(null,_)::right::Nil=>Literal(null,e.dataType) caseleft::Literal(null,_)::Nil=>Literal(null,e.dataType) case_=>e } casee:StringRegexExpression=>e.childrenmatch{ caseLiteral(null,_)::right::Nil=>Literal(null,e.dataType) caseleft::Literal(null,_)::Nil=>Literal(null,e.dataType) case_=>e } } } } 给定SQL:val query = sql("select count(null) from temp_shengli where key is not null") [java] view plain copy scala>query.queryExecution.analyzed res6:org.apache.spark.sql.catalyst.plans.logical.LogicalPlan= Aggregate[],[COUNT(null)ASc0#5L]//这里count的是null FilterISNOTNULLkey#7 MetastoreRelationdefault,temp_shengli,None 调用NullPropagation [java] view plain copy scala>NullPropagation(query.queryExecution.analyzed) res7:org.apache.spark.sql.catalyst.plans.logical.LogicalPlan= Aggregate[],[CAST(0,LongType)ASc0#5L]//优化后为0了 FilterISNOTNULLkey#7 MetastoreRelationdefault,temp_shengli,None 2.2.2、Rule:ConstantFolding 常量合并是属于Expression优化的一种,对于可以直接计算的常量,不用放到物理执行里去生成对象来计算了,直接可以在计划里就计算出来: [java] view plain copy objectConstantFoldingextendsRule[LogicalPlan]{ defapply(plan:LogicalPlan):LogicalPlan=plantransform{//先对plan进行transform caseq:LogicalPlan=>qtransformExpressionsDown{//对每个plan的expression进行transform //Skipredundantfoldingofliterals. casel:Literal=>l caseeife.foldable=>Literal(e.eval(null),e.dataType)//调用eval方法计算结果 } } } 给定SQL:val query = sql("select 1+2+3+4 from temp_shengli") [java] view plain copy scala>query.queryExecution.analyzed res23:org.apache.spark.sql.catalyst.plans.logical.LogicalPlan= Project[(((1+2)+3)+4)ASc0#21]//这里还是常量表达式 MetastoreRelationdefault,src,None 优化后: [java] view plain copy scala>query.queryExecution.optimizedPlan res24:org.apache.spark.sql.catalyst.plans.logical.LogicalPlan= Project[10ASc0#21]//优化后,直接合并为10 MetastoreRelationdefault,src,None 2.2.3、BooleanSimplification 这个是对布尔表达式的优化,有点像Java布尔表达式中的短路判断,不过这个写的倒是很优雅。 看看布尔表达式2边能不能通过只计算1边,而省去计算另一边而提高效率,称为简化布尔表达式。 解释请看我写的注释: [java] view plain copy /** *Simplifiesbooleanexpressionswheretheanswercanbedeterminedwithoutevaluatingbothsides. *Notethatthisrulecaneliminateexpressionsthatmightotherwisehavebeenevaluatedandthus *isonlysafewhenevaluationsofexpressionsdoesnotresultinsideeffects. */ objectBooleanSimplificationextendsRule[LogicalPlan]{ defapply(plan:LogicalPlan):LogicalPlan=plantransform{ caseq:LogicalPlan=>qtransformExpressionsUp{ caseand@And(left,right)=>//如果布尔表达式是AND操作,即exp1andexp2 (left,right)match{//(左边表达式,右边表达式) case(Literal(true,BooleanType),r)=>r//左边true,返回右边的<spanstyle="font-family:Arial;">bool</span><spanstyle="font-family:Arial;">值</span> case(l,Literal(true,BooleanType))=>l//右边true,返回左边的bool值 case(Literal(false,BooleanType),_)=>Literal(false)//左边都false,右边随便,反正是返回false case(_,Literal(false,BooleanType))=>Literal(false)//只要有1边是false了,都是false case(_,_)=>and } caseor@Or(left,right)=> (left,right)match{ case(Literal(true,BooleanType),_)=>Literal(true)//只要左边是true了,不用判断右边都是true case(_,Literal(true,BooleanType))=>Literal(true)//只要有一边是true,都返回true case(Literal(false,BooleanType),r)=>r//希望右边r是true case(l,Literal(false,BooleanType))=>l case(_,_)=>or } } } } 2.3 Batch:Filter Pushdown Filter Pushdown下包含了CombineFilters、PushPredicateThroughProject、PushPredicateThroughJoin、ColumnPruning Ps:感觉Filter Pushdown的名字起的有点不能涵盖全部比如ColumnPruning列裁剪。 2.3.1、Combine Filters 合并两个相邻的Filter,这个和上述Combine Limit差不多。合并2个节点,就可以 减少树的深度从而减少重复执行过滤的代价。 [java] view plain copy /** *Combinestwoadjacent[[Filter]]operatorsintoone,mergingthe *conditionsintooneconjunctivepredicate. */ objectCombineFiltersextendsRule[LogicalPlan]{ defapply(plan:LogicalPlan):LogicalPlan=plantransform{ caseff@Filter(fc,nf@Filter(nc,grandChild))=>Filter(And(nc,fc),grandChild) } } 给定SQL:val query = sql("select key from (select key from temp_shengli where key >100)a where key > 80 ") 优化前:我们看到一个filter 是另一个filter的grandChild [java] view plain copy scala>query.queryExecution.analyzed res25:org.apache.spark.sql.catalyst.plans.logical.LogicalPlan= Project[key#27] Filter(key#27>80)//filter>80 Project[key#27] Filter(key#27>100)//filter>100 MetastoreRelationdefault,src,None 优化后:其实filter也可以表达为一个复杂的boolean表达式 [java] view plain copy scala>query.queryExecution.optimizedPlan res26:org.apache.spark.sql.catalyst.plans.logical.LogicalPlan= Project[key#27] Filter((key#27>100)&&(key#27>80))//合并为1个 MetastoreRelationdefault,src,None 2.3.2 Filter Pushdown Filter Pushdown,过滤器下推。 原理就是更早的过滤掉不需要的元素来减少开销。 给定SQL:val query = sql("select key from (select * from temp_shengli)a where key>100") 生成的逻辑计划为: [java] view plain copy scala>scala>query.queryExecution.analyzed res29:org.apache.spark.sql.catalyst.plans.logical.LogicalPlan= Project[key#31] Filter(key#31>100)//先selectkey,value,然后再Filter Project[key#31,value#32] MetastoreRelationdefault,src,None 优化后的计划为: [java] view plain copy query.queryExecution.optimizedPlan res30:org.apache.spark.sql.catalyst.plans.logical.LogicalPlan= Project[key#31] Filter(key#31>100)//先filter,然后再select MetastoreRelationdefault,src,None 2.3.3、ColumnPruning 列裁剪用的比较多,就是减少不必要select的某些列。 列裁剪在3种地方可以用: 1、在聚合操作中,可以做列裁剪 2、在join操作中,左右孩子可以做列裁剪 3、合并相邻的Project的列 [java] view plain copy objectColumnPruningextendsRule[LogicalPlan]{ defapply(plan:LogicalPlan):LogicalPlan=plantransform{ //Eliminateattributesthatarenotneededtocalculatethespecifiedaggregates. casea@Aggregate(_,_,child)if(child.outputSet--a.references).nonEmpty=>////如果project的outputSet中减去a.references的元素如果不同,那么就将Aggreagte的child替换为a.references a.copy(child=Project(a.references.toSeq,child)) //EliminateunneededattributesfromeithersideofaJoin. caseProject(projectList,Join(left,right,joinType,condition))=>//消除join的left和right孩子的不必要属性,将join的左右子树的列进行裁剪 //Collectthelistofoffreferencesrequiredeitheraboveortoevaluatethecondition. valallReferences:Set[Attribute]= projectList.flatMap(_.references).toSet++condition.map(_.references).getOrElse(Set.empty) /**Appliesaprojectiononlywhenthechildisproducingunnecessaryattributes*/ defprunedChild(c:LogicalPlan)= if((c.outputSet--allReferences.filter(c.outputSet.contains)).nonEmpty){ Project(allReferences.filter(c.outputSet.contains).toSeq,c) }else{ c } Project(projectList,Join(prunedChild(left),prunedChild(right),joinType,condition)) //CombineadjacentProjects. caseProject(projectList1,Project(projectList2,child))=>//合并相邻Project的列 //CreateamapofAliasestotheirvaluesfromthechildprojection. //e.g.,'SELECT...FROM(SELECTa+bASc,d...)'producesMap(c->Alias(a+b,c)). valaliasMap=projectList2.collect{ casea@Alias(e,_)=>(a.toAttribute:Expression,a) }.toMap //Substituteanyattributesthatareproducedbythechildprojection,sothatwesafely //eliminateit. //e.g.,'SELECTc+1FROM(SELECTa+bASC...'produces'SELECTa+b+1...' //TODO:FixTransformBasetoavoidthecastbelow. valsubstitutedProjection=projectList1.map(_.transform{ caseaifaliasMap.contains(a)=>aliasMap(a) }).asInstanceOf[Seq[NamedExpression]] Project(substitutedProjection,child) //Eliminateno-opProjects caseProject(projectList,child)ifchild.output==projectList=>child } } 分别举三个例子来对应三种情况进行说明: 1、在聚合操作中,可以做列裁剪 给定SQL:val query = sql("SELECT 1+1 as shengli, key from (select key, value from temp_shengli)a group by key") 优化前: [java] view plain copy res57:org.apache.spark.sql.catalyst.plans.logical.LogicalPlan= Aggregate[key#51],[(1+1)ASshengli#49,key#51] Project[key#51,value#52]//优化前默认selectkey和value两列 MetastoreRelationdefault,temp_shengli,None 优化后: [java] view plain copy scala>ColumnPruning1(query.queryExecution.analyzed) MetastoreRelationdefault,temp_shengli,None res59:org.apache.spark.sql.catalyst.plans.logical.LogicalPlan= Aggregate[key#51],[(1+1)ASshengli#49,key#51] Project[key#51]//优化后,列裁剪掉了value,只selectkey MetastoreRelationdefault,temp_shengli,None 2、 在join操作中,左右孩子可以做列裁剪 给定SQL:val query = sql("select a.value qween from (select * from temp_shengli) a join (select * from temp_shengli)b on a.key =b.key ") 没有优化之前: [java] view plain copy scala>query.queryExecution.analyzed res51:org.apache.spark.sql.catalyst.plans.logical.LogicalPlan= Project[value#42ASqween#39] JoinInner,Some((key#41=key#43)) Project[key#41,value#42]//这里多select了一列,即value MetastoreRelationdefault,temp_shengli,None Project[key#43,value#44]//这里多select了一列,即value MetastoreRelationdefault,temp_shengli,None 优化后:(ColumnPruning2是我自己调试用的) [java] view plain copy scala>ColumnPruning2(query.queryExecution.analyzed) allReferencesis->Set(key#35,key#37) MetastoreRelationdefault,temp_shengli,None MetastoreRelationdefault,temp_shengli,None res47:org.apache.spark.sql.catalyst.plans.logical.LogicalPlan= Project[key#35ASqween#33] JoinInner,Some((key#35=key#37)) Project[key#35]//经过列裁剪之后,leftChild只需要selectkey这一个列 MetastoreRelationdefault,temp_shengli,None Project[key#37]//经过列裁剪之后,rightChild只需要selectkey这一个列 MetastoreRelationdefault,temp_shengli,None 3、 合并相邻的Project的列,裁剪 给定SQL:val query = sql("SELECT c + 1 FROM (SELECT 1 + 1 as c from temp_shengli ) a ") 优化前: [java] view plain copy scala>query.queryExecution.analyzed res61:org.apache.spark.sql.catalyst.plans.logical.LogicalPlan= Project[(c#56+1)ASc0#57] Project[(1+1)ASc#56] MetastoreRelationdefault,temp_shengli,None 优化后: [java] view plain copy scala>query.queryExecution.optimizedPlan res62:org.apache.spark.sql.catalyst.plans.logical.LogicalPlan= Project[(2ASc#56+1)ASc0#57]//将子查询里的c代入到外层select里的c,直接计算结果 MetastoreRelationdefault,temp_shengli,None 三、总结: 本文介绍了Optimizer在Catalyst里的作用即将Analyzed Logical Plan 经过对Logical Plan和Expression进行Rule的应用transfrom,从而达到树的节点进行合并和优化。其中主要的优化的策略总结起来是合并、列裁剪、过滤器下推几大类。 Catalyst应该在不断迭代中,本文只是基于spark1.0.0进行研究,后续如果新加入的优化策略也会在后续补充进来。 欢迎大家讨论,共同进步! ——EOF—— 原创文章,转载请注明: 转载自:OopsOutOfMemory盛利的Blog,作者:OopsOutOfMemory 本文链接地址:http://blog.csdn.net/oopsoom/article/details/38121259 注:本文基于署名-非商业性使用-禁止演绎 2.5 中国大陆(CC BY-NC-ND 2.5 CN)协议,欢迎转载、转发和评论,但是请保留本文作者署名和文章链接。如若需要用于商业目的或者与授权方面的协商,请联系我。