Spark-SparkSQL深入学习系列六(转自OopsOutOfMemory)
/**Spark SQL源码分析系列文章*/ 前面几篇文章主要介绍的是Sparksql包里的的spark sql执行流程,以及Catalyst包内的SqlParser,Analyzer和Optimizer,最后要介绍一下Catalyst里最后的一个Plan了,即Physical Plan。物理计划是Spark SQL执行Spark job的前置,也是最后一道计划。 如图: 一、SparkPlanner 话接上回,Optimizer接受输入的Analyzed Logical Plan后,会有SparkPlanner来对Optimized Logical Plan进行转换,生成Physical plans。 [java] view plain copy lazyvaloptimizedPlan=optimizer(analyzed) //TODO:Don'tjustpickthefirstone... lazyvalsparkPlan=planner(optimizedPlan).next() SparkPlanner的apply方法,会返回一个Iterator[PhysicalPlan]。 SparkPlanner继承了SparkStrategies,SparkStrategies继承了QueryPlanner。 SparkStrategies包含了一系列特定的Strategies,这些Strategies是继承自QueryPlanner中定义的Strategy,它定义接受一个Logical Plan,生成一系列的Physical Plan [java] view plain copy @transient protected[sql]valplanner=newSparkPlanner protected[sql]classSparkPlannerextendsSparkStrategies{ valsparkContext:SparkContext=self.sparkContext valsqlContext:SQLContext=self defnumPartitions=self.numShufflePartitions//partitions的个数 valstrategies:Seq[Strategy]=//策略的集合 CommandStrategy(self):: TakeOrdered:: PartialAggregation:: LeftSemiJoin:: HashJoin:: InMemoryScans:: ParquetOperations:: BasicOperators:: CartesianProduct:: BroadcastNestedLoopJoin::Nil etc...... } QueryPlanner 是SparkPlanner的基类,定义了一系列的关键点,如Strategy,planLater和apply。 [java] view plain copy abstractclassQueryPlanner[PhysicalPlan<:TreeNode[PhysicalPlan]]{ /**Alistofexecutionstrategiesthatcanbeusedbytheplanner*/ defstrategies:Seq[Strategy] /** *Givena[[plans.logical.LogicalPlanLogicalPlan]],returnsalistof`PhysicalPlan`sthatcan *beusedforexecution.Ifthisstrategydoesnotapplytothegivelogicaloperationthenan *emptylistshouldbereturned. */ abstractprotectedclassStrategyextendsLogging{ defapply(plan:LogicalPlan):Seq[PhysicalPlan]//接受一个logicalplan,返回Seq[PhysicalPlan] } /** *Returnsaplaceholderforaphysicalplanthatexecutes`plan`.Thisplaceholderwillbe *filledinautomaticallybytheQueryPlannerusingtheotherexecutionstrategiesthatare *available. */ protecteddefplanLater(plan:LogicalPlan)=apply(plan).next()//返回一个占位符,占位符会自动被QueryPlanner用其它的strategiesapply defapply(plan:LogicalPlan):Iterator[PhysicalPlan]={ //Obviouslyalottodoherestill... valiter=strategies.view.flatMap(_(plan)).toIterator//整合所有的Strategy,_(plan)每个Strategy应用plan上,得到所有Strategies执行完后生成的所有PhysicalPlan的集合,一个iter assert(iter.hasNext,s"Noplanfor$plan") iter//返回所有物理计划 } } 继承关系: 二、Spark Plan Spark Plan是Catalyst里经过所有Strategies apply 的最终的物理执行计划的抽象类,它只是用来执行spark job的。 [java] view plain copy lazyvalexecutedPlan:SparkPlan=prepareForExecution(sparkPlan) prepareForExecution其实是一个RuleExecutor[SparkPlan],当然这里的Rule就是SparkPlan了。 [java] view plain copy @transient protected[sql]valprepareForExecution=newRuleExecutor[SparkPlan]{ valbatches= Batch("Addexchange",Once,AddExchange(self)):://添加shuffler操作如果必要的话 Batch("PrepareExpressions",Once,newBindReferences[SparkPlan])::Nil//Bindreferences } Spark Plan继承Query Plan[Spark Plan],里面定义的partition,requiredChildDistribution以及spark sql启动执行的execute方法。 [java] view plain copy abstractclassSparkPlanextendsQueryPlan[SparkPlan]withLogging{ self:Product=> //TODO:Moveto`DistributedPlan` /**Specifieshowdataispartitionedacrossdifferentnodesinthecluster.*/ defoutputPartitioning:Partitioning=UnknownPartitioning(0)//TODO:WRONGWIDTH! /**Specifiesanypartitionrequirementsontheinputdataforthisoperator.*/ defrequiredChildDistribution:Seq[Distribution]= Seq.fill(children.size)(UnspecifiedDistribution) /** *RunsthisqueryreturningtheresultasanRDD. */ defexecute():RDD[Row]//真正执行查询的方法execute,返回的是一个RDD /** *Runsthisqueryreturningtheresultasanarray. */ defexecuteCollect():Array[Row]=execute().map(_.copy()).collect()//exe&collect protecteddefbuildRow(values:Seq[Any]):Row=//根据当前的值,生成Row对象,其实是一个封装了Array的对象。 newGenericRow(values.toArray) } 关于Spark Plan的继承关系,如图: 三、Strategies Strategy,注意这里Strategy是在execution包下的,在SparkPlanner里定义了目前的几种策略: LeftSemiJoin、HashJoin、PartialAggregation、BroadcastNestedLoopJoin、CartesianProduct、TakeOrdered、ParquetOperations、InMemoryScans、BasicOperators、CommandStrategy 3.1、LeftSemiJoin Join分为好几种类型: [java] view plain copy caseobjectInnerextendsJoinType caseobjectLeftOuterextendsJoinType caseobjectRightOuterextendsJoinType caseobjectFullOuterextendsJoinType caseobjectLeftSemiextendsJoinType 如果Logical Plan里的Join是joinType为LeftSemi的话,就会执行这种策略, 这里ExtractEquiJoinKeys是一个pattern定义在patterns.scala里,主要是做模式匹配用的。 这里匹配只要是等值的join操作,都会封装为ExtractEquiJoinKeys对象,它会解析当前join,最后返回(joinType, rightKeys, leftKeys, condition, leftChild, rightChild)的格式。 最后返回一个execution.LeftSemiJoinHash这个Spark Plan,可见Spark Plan的类图继承关系图。 [java] view plain copy objectLeftSemiJoinextendsStrategywithPredicateHelper{ defapply(plan:LogicalPlan):Seq[SparkPlan]=planmatch{ //Findleftsemijoinswhereatleastsomepredicatescanbeevaluatedbymatchingjoinkeys caseExtractEquiJoinKeys(LeftSemi,leftKeys,rightKeys,condition,left,right)=> valsemiJoin=execution.LeftSemiJoinHash(//根据解析后的Join,实例化execution.LeftSemiJoinHash这个SparkPlan返回 leftKeys,rightKeys,planLater(left),planLater(right)) condition.map(Filter(_,semiJoin)).getOrElse(semiJoin)::Nil //nopredicatecanbeevaluatedbymatchinghashkeys caselogical.Join(left,right,LeftSemi,condition)=>//没有Joinkey的,即非等值join连接的,返回LeftSemiJoinBNL这个SparkPlan execution.LeftSemiJoinBNL( planLater(left),planLater(right),condition)(sqlContext)::Nil case_=>Nil } } 3.2、HashJoin HashJoin是我们最见的操作,innerJoin类型,里面提供了2种Spark Plan,BroadcastHashJoin 和 ShuffledHashJoin BroadcastHashJoin的实现是一种广播变量的实现方法,如果设置了spark.sql.join.broadcastTables这个参数的表(表面逗号隔开) 就会用spark的Broadcast Variables方式先将一张表给查询出来,然后广播到各个机器中,相当于Hive中的map join。 ShuffledHashJoin是一种最传统的默认的join方式,会根据shuffle key进行shuffle的hash join。 [java] view plain copy objectHashJoinextendsStrategywithPredicateHelper{ private[this]defbroadcastHashJoin( leftKeys:Seq[Expression], rightKeys:Seq[Expression], left:LogicalPlan, right:LogicalPlan, condition:Option[Expression], side:BuildSide)={ valbroadcastHashJoin=execution.BroadcastHashJoin( leftKeys,rightKeys,side,planLater(left),planLater(right))(sqlContext) condition.map(Filter(_,broadcastHashJoin)).getOrElse(broadcastHashJoin)::Nil } defbroadcastTables:Seq[String]=sqlContext.joinBroadcastTables.split(",").toBuffer//获取需要广播的表 defapply(plan:LogicalPlan):Seq[SparkPlan]=planmatch{ caseExtractEquiJoinKeys( Inner, leftKeys, rightKeys, condition, left, right@PhysicalOperation(_,_,b:BaseRelation)) ifbroadcastTables.contains(b.tableName)=>//如果右孩子是广播的表,则buildSide取BuildRight broadcastHashJoin(leftKeys,rightKeys,left,right,condition,BuildRight) caseExtractEquiJoinKeys( Inner, leftKeys, rightKeys, condition, left@PhysicalOperation(_,_,b:BaseRelation), right) ifbroadcastTables.contains(b.tableName)=>//如果左孩子是广播的表,则buildSide取BuildLeft broadcastHashJoin(leftKeys,rightKeys,left,right,condition,BuildLeft) caseExtractEquiJoinKeys(Inner,leftKeys,rightKeys,condition,left,right)=> valhashJoin= execution.ShuffledHashJoin(//根据hashkeyshuffle的HashJoin leftKeys,rightKeys,BuildRight,planLater(left),planLater(right)) condition.map(Filter(_,hashJoin)).getOrElse(hashJoin)::Nil case_=>Nil } } 3.3、PartialAggregation PartialAggregation是一个部分聚合的策略,即有些聚合操作可以在local里面完成的,就在local data里完成,而不必要的去shuffle所有的字段。 [java] view plain copy objectPartialAggregationextendsStrategy{ defapply(plan:LogicalPlan):Seq[SparkPlan]=planmatch{ caselogical.Aggregate(groupingExpressions,aggregateExpressions,child)=> //Collectallaggregateexpressions. valallAggregates= aggregateExpressions.flatMap(_collect{casea:AggregateExpression=>a}) //Collectallaggregateexpressionsthatcanbecomputedpartially. valpartialAggregates= aggregateExpressions.flatMap(_collect{casep:PartialAggregate=>p}) //Onlydopartialaggregationifsupportedbyallaggregateexpressions. if(allAggregates.size==partialAggregates.size){ //Createamapofexpressionstotheirpartialevaluationsforallaggregateexpressions. valpartialEvaluations:Map[Long,SplitEvaluation]= partialAggregates.map(a=>(a.id,a.asPartial)).toMap //Weneedtopassallgroupingexpressionsthoughsothegroupingcanhappenasecond //time.Howeversomeofthemmightbeunnamedsowealiasthemallowingthemtobe //referencedinthesecondaggregation. valnamedGroupingExpressions:Map[Expression,NamedExpression]=groupingExpressions.map{ casen:NamedExpression=>(n,n) caseother=>(other,Alias(other,"PartialGroup")()) }.toMap //Replaceaggregationswithanewexpressionthatcomputestheresultfromthealready //computedpartialevaluationsandgroupingvalues. valrewrittenAggregateExpressions=aggregateExpressions.map(_.transformUp{ casee:ExpressionifpartialEvaluations.contains(e.id)=> partialEvaluations(e.id).finalEvaluation casee:ExpressionifnamedGroupingExpressions.contains(e)=> namedGroupingExpressions(e).toAttribute }).asInstanceOf[Seq[NamedExpression]] valpartialComputation= (namedGroupingExpressions.values++ partialEvaluations.values.flatMap(_.partialEvaluations)).toSeq //Constructtwophasedaggregation. execution.Aggregate(//返回execution.Aggregate这个SparkPlan partial=false, namedGroupingExpressions.values.map(_.toAttribute).toSeq, rewrittenAggregateExpressions, execution.Aggregate( partial=true, groupingExpressions, partialComputation, planLater(child))(sqlContext))(sqlContext)::Nil }else{ Nil } case_=>Nil } } 3.4、BroadcastNestedLoopJoin BroadcastNestedLoopJoin是用于Left Outer Join, RightOuter, FullOuter这三种类型的join 而上述的Hash Join仅仅用于InnerJoin,这点要区分开来。 [java] view plain copy objectBroadcastNestedLoopJoinextendsStrategy{ defapply(plan:LogicalPlan):Seq[SparkPlan]=planmatch{ caselogical.Join(left,right,joinType,condition)=> execution.BroadcastNestedLoopJoin( planLater(left),planLater(right),joinType,condition)(sqlContext)::Nil case_=>Nil } } 部分代码; [java] view plain copy if(!matched&&(joinType==LeftOuter||joinType==FullOuter)){//LeftOuterorFullOuter matchedRows+=buildRow(streamedRow++Array.fill(right.output.size)(null)) } } Iterator((matchedRows,includedBroadcastTuples)) } valincludedBroadcastTuples=streamedPlusMatches.map(_._2) valallIncludedBroadcastTuples= if(includedBroadcastTuples.count==0){ newscala.collection.mutable.BitSet(broadcastedRelation.value.size) }else{ streamedPlusMatches.map(_._2).reduce(_++_) } valrightOuterMatches:Seq[Row]= if(joinType==RightOuter||joinType==FullOuter){//RightOuterorFullOuter broadcastedRelation.value.zipWithIndex.filter{ case(row,i)=>!allIncludedBroadcastTuples.contains(i) }.map{ //TODO:Useprojection. case(row,_)=>buildRow(Vector.fill(left.output.size)(null)++row) } }else{ Vector() } 3.5、CartesianProduct [java] view plain copy 笛卡尔积的Join,有待过滤条件的Join。 主要是利用RDD的cartesian实现的。 objectCartesianProductextendsStrategy{ defapply(plan:LogicalPlan):Seq[SparkPlan]=planmatch{ caselogical.Join(left,right,_,None)=> execution.CartesianProduct(planLater(left),planLater(right))::Nil caselogical.Join(left,right,Inner,Some(condition))=> execution.Filter(condition, execution.CartesianProduct(planLater(left),planLater(right)))::Nil case_=>Nil } } 3.6、TakeOrdered TakeOrdered是用于Limit操作的,如果有Limit和Sort操作。 则返回一个TakeOrdered的Spark Plan。 主要也是利用RDD的takeOrdered方法来实现的排序后取TopN。 [java] view plain copy objectTakeOrderedextendsStrategy{ defapply(plan:LogicalPlan):Seq[SparkPlan]=planmatch{ caselogical.Limit(IntegerLiteral(limit),logical.Sort(order,child))=> execution.TakeOrdered(limit,order,planLater(child))(sqlContext)::Nil case_=>Nil } } 3.7、ParquetOperations 支持ParquetOperations的读写,插入Table等。 [java] view plain copy objectParquetOperationsextendsStrategy{ defapply(plan:LogicalPlan):Seq[SparkPlan]=planmatch{ //TODO:needtosupportwritingtoothertypesoffiles.Unifythebelowcodepaths. caselogical.WriteToFile(path,child)=> valrelation= ParquetRelation.create(path,child,sparkContext.hadoopConfiguration) //Note:overwrite=falsebecauseotherwisethemetadatawejustcreatedwillbedeleted InsertIntoParquetTable(relation,planLater(child),overwrite=false)(sqlContext)::Nil caselogical.InsertIntoTable(table:ParquetRelation,partition,child,overwrite)=> InsertIntoParquetTable(table,planLater(child),overwrite)(sqlContext)::Nil casePhysicalOperation(projectList,filters:Seq[Expression],relation:ParquetRelation)=> valprunePushedDownFilters= if(sparkContext.conf.getBoolean(ParquetFilters.PARQUET_FILTER_PUSHDOWN_ENABLED,true)){ (filters:Seq[Expression])=>{ filters.filter{filter=> //Note:filterscannotbepusheddowntoParquetiftheycontainmorecomplex //expressionsthansimple"AttributecmpLiteral"comparisons.Hereweremove //allfiltersthathavebeenpusheddown.Notethatapredicatesuchas //"(AANDB)ORC"canresultin"AORC"beingpusheddown. valrecordFilter=ParquetFilters.createFilter(filter) if(!recordFilter.isDefined){ //Firstcase:thepushdowndidnotresultinanyrecordfilter. true }else{ //Secondcase:arecordfilterwascreated;hereweareconservativein //thesensethatevenif"A"waspushedandwecheckfor"AANDB"we //stillwanttokeep"AANDB"inthehigher-levelfilter,notjust"B". !ParquetFilters.findExpression(recordFilter.get,filter).isDefined } } } }else{ identity[Seq[Expression]]_ } pruneFilterProject( projectList, filters, prunePushedDownFilters, ParquetTableScan(_,relation,filters)(sqlContext))::Nil case_=>Nil } } 3.8、InMemoryScans InMemoryScans主要是对InMemoryRelation这个Logical Plan操作。 调用的其实是Spark Planner里的pruneFilterProject这个方法。 [java] view plain copy objectInMemoryScansextendsStrategy{ defapply(plan:LogicalPlan):Seq[SparkPlan]=planmatch{ casePhysicalOperation(projectList,filters,mem:InMemoryRelation)=> pruneFilterProject( projectList, filters, identity[Seq[Expression]],//Nofiltersarepusheddown. InMemoryColumnarTableScan(_,mem))::Nil case_=>Nil } } 3.9、BasicOperators 所有定义在org.apache.spark.sql.execution里的基本的Spark Plan,它们都在org.apache.spark.sql.execution包下basicOperators.scala内的 有Project、Filter、Sample、Union、Limit、TakeOrdered、Sort、ExistingRdd。 这些是基本元素,实现都相对简单,基本上都是RDD里的方法来实现的。 [java] view plain copy objectBasicOperatorsextendsStrategy{ defnumPartitions=self.numPartitions defapply(plan:LogicalPlan):Seq[SparkPlan]=planmatch{ caselogical.Distinct(child)=> execution.Aggregate( partial=false,child.output,child.output,planLater(child))(sqlContext)::Nil caselogical.Sort(sortExprs,child)=> //Thissortisaglobalsort.ItsrequiredDistributionwillbeanOrderedDistribution. execution.Sort(sortExprs,global=true,planLater(child))::Nil caselogical.SortPartitions(sortExprs,child)=> //Thissortonlysortstupleswithinapartition.ItsrequiredDistributionwillbe //anUnspecifiedDistribution. execution.Sort(sortExprs,global=false,planLater(child))::Nil caselogical.Project(projectList,child)=> execution.Project(projectList,planLater(child))::Nil caselogical.Filter(condition,child)=> execution.Filter(condition,planLater(child))::Nil caselogical.Aggregate(group,agg,child)=> execution.Aggregate(partial=false,group,agg,planLater(child))(sqlContext)::Nil caselogical.Sample(fraction,withReplacement,seed,child)=> execution.Sample(fraction,withReplacement,seed,planLater(child))::Nil caselogical.LocalRelation(output,data)=> valdataAsRdd= sparkContext.parallelize(data.map(r=> newGenericRow(r.productIterator.map(convertToCatalyst).toArray):Row)) execution.ExistingRdd(output,dataAsRdd)::Nil caselogical.Limit(IntegerLiteral(limit),child)=> execution.Limit(limit,planLater(child))(sqlContext)::Nil caseUnions(unionChildren)=> execution.Union(unionChildren.map(planLater))(sqlContext)::Nil caselogical.Generate(generator,join,outer,_,child)=> execution.Generate(generator,join=join,outer=outer,planLater(child))::Nil caselogical.NoRelation=> execution.ExistingRdd(Nil,singleRowRdd)::Nil caselogical.Repartition(expressions,child)=> execution.Exchange(HashPartitioning(expressions,numPartitions),planLater(child))::Nil caseSparkLogicalPlan(existingPlan,_)=>existingPlan::Nil case_=>Nil } } 3.10 CommandStrategy CommandStrategy是专门针对Command类型的Logical Plan 即set key = value 、 explain sql、 cache table xxx 这类操作 SetCommand主要实现方式是SparkContext的参数 ExplainCommand主要实现方式是利用executed Plan打印出tree string CacheCommand主要实现方式SparkContext的cache table和uncache table [java] view plain copy caseclassCommandStrategy(context:SQLContext)extendsStrategy{ defapply(plan:LogicalPlan):Seq[SparkPlan]=planmatch{ caselogical.SetCommand(key,value)=> Seq(execution.SetCommand(key,value,plan.output)(context)) caselogical.ExplainCommand(logicalPlan)=> Seq(execution.ExplainCommand(logicalPlan,plan.output)(context)) caselogical.CacheCommand(tableName,cache)=> Seq(execution.CacheCommand(tableName,cache)(context)) case_=>Nil } } 四、Execution Spark Plan的Execution方式均为调用其execute()方法生成RDD,除了简单的基本操作例如上面的basic operator实现比较简单,其它的实现都比较复杂,大致的实现我都在上面介绍了,本文就不详细讨论了。 五、总结 本文从介绍了Spark SQL的Catalyst框架的Physical plan以及其如何从Optimized Logical Plan转化为Spark Plan的过程,这个过程用到了很多的物理计划策略Strategies,每个Strategies最后还是在RuleExecutor里面被执行,最后生成一系列物理计划Executed Spark Plans。 Spark Plan是执行前最后一种计划,当生成executed spark plan后,就可以调用collect()方法来启动Spark Job来进行Spark SQL的真正执行了。 ——EOF—— 原创文章,转载请注明: 转载自:OopsOutOfMemory盛利的Blog,作者:OopsOutOfMemory 本文链接地址:http://blog.csdn.net/oopsoom/article/details/38235247 注:本文基于署名-非商业性使用-禁止演绎 2.5 中国大陆(CC BY-NC-ND 2.5 CN)协议,欢迎转载、转发和评论,但是请保留本文作者署名和文章链接。如若需要用于商业目的或者与授权方面的协商,请联系我。