首页 文章 精选 留言 我的

精选列表

搜索[学习],共10000篇文章
优秀的个人博客,低调大师

Spark-SparkSQL深入学习系列六(转自OopsOutOfMemory)

/**Spark SQL源码分析系列文章*/ 前面几篇文章主要介绍的是Sparksql包里的的spark sql执行流程,以及Catalyst包内的SqlParser,Analyzer和Optimizer,最后要介绍一下Catalyst里最后的一个Plan了,即Physical Plan。物理计划是Spark SQL执行Spark job的前置,也是最后一道计划。 如图: 一、SparkPlanner 话接上回,Optimizer接受输入的Analyzed Logical Plan后,会有SparkPlanner来对Optimized Logical Plan进行转换,生成Physical plans。 [java] view plain copy lazyvaloptimizedPlan=optimizer(analyzed) //TODO:Don'tjustpickthefirstone... lazyvalsparkPlan=planner(optimizedPlan).next() SparkPlanner的apply方法,会返回一个Iterator[PhysicalPlan]。 SparkPlanner继承了SparkStrategies,SparkStrategies继承了QueryPlanner。 SparkStrategies包含了一系列特定的Strategies,这些Strategies是继承自QueryPlanner中定义的Strategy,它定义接受一个Logical Plan,生成一系列的Physical Plan [java] view plain copy @transient protected[sql]valplanner=newSparkPlanner protected[sql]classSparkPlannerextendsSparkStrategies{ valsparkContext:SparkContext=self.sparkContext valsqlContext:SQLContext=self defnumPartitions=self.numShufflePartitions//partitions的个数 valstrategies:Seq[Strategy]=//策略的集合 CommandStrategy(self):: TakeOrdered:: PartialAggregation:: LeftSemiJoin:: HashJoin:: InMemoryScans:: ParquetOperations:: BasicOperators:: CartesianProduct:: BroadcastNestedLoopJoin::Nil etc...... } QueryPlanner 是SparkPlanner的基类,定义了一系列的关键点,如Strategy,planLater和apply。 [java] view plain copy abstractclassQueryPlanner[PhysicalPlan<:TreeNode[PhysicalPlan]]{ /**Alistofexecutionstrategiesthatcanbeusedbytheplanner*/ defstrategies:Seq[Strategy] /** *Givena[[plans.logical.LogicalPlanLogicalPlan]],returnsalistof`PhysicalPlan`sthatcan *beusedforexecution.Ifthisstrategydoesnotapplytothegivelogicaloperationthenan *emptylistshouldbereturned. */ abstractprotectedclassStrategyextendsLogging{ defapply(plan:LogicalPlan):Seq[PhysicalPlan]//接受一个logicalplan,返回Seq[PhysicalPlan] } /** *Returnsaplaceholderforaphysicalplanthatexecutes`plan`.Thisplaceholderwillbe *filledinautomaticallybytheQueryPlannerusingtheotherexecutionstrategiesthatare *available. */ protecteddefplanLater(plan:LogicalPlan)=apply(plan).next()//返回一个占位符,占位符会自动被QueryPlanner用其它的strategiesapply defapply(plan:LogicalPlan):Iterator[PhysicalPlan]={ //Obviouslyalottodoherestill... valiter=strategies.view.flatMap(_(plan)).toIterator//整合所有的Strategy,_(plan)每个Strategy应用plan上,得到所有Strategies执行完后生成的所有PhysicalPlan的集合,一个iter assert(iter.hasNext,s"Noplanfor$plan") iter//返回所有物理计划 } } 继承关系: 二、Spark Plan Spark Plan是Catalyst里经过所有Strategies apply 的最终的物理执行计划的抽象类,它只是用来执行spark job的。 [java] view plain copy lazyvalexecutedPlan:SparkPlan=prepareForExecution(sparkPlan) prepareForExecution其实是一个RuleExecutor[SparkPlan],当然这里的Rule就是SparkPlan了。 [java] view plain copy @transient protected[sql]valprepareForExecution=newRuleExecutor[SparkPlan]{ valbatches= Batch("Addexchange",Once,AddExchange(self)):://添加shuffler操作如果必要的话 Batch("PrepareExpressions",Once,newBindReferences[SparkPlan])::Nil//Bindreferences } Spark Plan继承Query Plan[Spark Plan],里面定义的partition,requiredChildDistribution以及spark sql启动执行的execute方法。 [java] view plain copy abstractclassSparkPlanextendsQueryPlan[SparkPlan]withLogging{ self:Product=> //TODO:Moveto`DistributedPlan` /**Specifieshowdataispartitionedacrossdifferentnodesinthecluster.*/ defoutputPartitioning:Partitioning=UnknownPartitioning(0)//TODO:WRONGWIDTH! /**Specifiesanypartitionrequirementsontheinputdataforthisoperator.*/ defrequiredChildDistribution:Seq[Distribution]= Seq.fill(children.size)(UnspecifiedDistribution) /** *RunsthisqueryreturningtheresultasanRDD. */ defexecute():RDD[Row]//真正执行查询的方法execute,返回的是一个RDD /** *Runsthisqueryreturningtheresultasanarray. */ defexecuteCollect():Array[Row]=execute().map(_.copy()).collect()//exe&collect protecteddefbuildRow(values:Seq[Any]):Row=//根据当前的值,生成Row对象,其实是一个封装了Array的对象。 newGenericRow(values.toArray) } 关于Spark Plan的继承关系,如图: 三、Strategies Strategy,注意这里Strategy是在execution包下的,在SparkPlanner里定义了目前的几种策略: LeftSemiJoin、HashJoin、PartialAggregation、BroadcastNestedLoopJoin、CartesianProduct、TakeOrdered、ParquetOperations、InMemoryScans、BasicOperators、CommandStrategy 3.1、LeftSemiJoin Join分为好几种类型: [java] view plain copy caseobjectInnerextendsJoinType caseobjectLeftOuterextendsJoinType caseobjectRightOuterextendsJoinType caseobjectFullOuterextendsJoinType caseobjectLeftSemiextendsJoinType 如果Logical Plan里的Join是joinType为LeftSemi的话,就会执行这种策略, 这里ExtractEquiJoinKeys是一个pattern定义在patterns.scala里,主要是做模式匹配用的。 这里匹配只要是等值的join操作,都会封装为ExtractEquiJoinKeys对象,它会解析当前join,最后返回(joinType, rightKeys, leftKeys, condition, leftChild, rightChild)的格式。 最后返回一个execution.LeftSemiJoinHash这个Spark Plan,可见Spark Plan的类图继承关系图。 [java] view plain copy objectLeftSemiJoinextendsStrategywithPredicateHelper{ defapply(plan:LogicalPlan):Seq[SparkPlan]=planmatch{ //Findleftsemijoinswhereatleastsomepredicatescanbeevaluatedbymatchingjoinkeys caseExtractEquiJoinKeys(LeftSemi,leftKeys,rightKeys,condition,left,right)=> valsemiJoin=execution.LeftSemiJoinHash(//根据解析后的Join,实例化execution.LeftSemiJoinHash这个SparkPlan返回 leftKeys,rightKeys,planLater(left),planLater(right)) condition.map(Filter(_,semiJoin)).getOrElse(semiJoin)::Nil //nopredicatecanbeevaluatedbymatchinghashkeys caselogical.Join(left,right,LeftSemi,condition)=>//没有Joinkey的,即非等值join连接的,返回LeftSemiJoinBNL这个SparkPlan execution.LeftSemiJoinBNL( planLater(left),planLater(right),condition)(sqlContext)::Nil case_=>Nil } } 3.2、HashJoin HashJoin是我们最见的操作,innerJoin类型,里面提供了2种Spark Plan,BroadcastHashJoin 和 ShuffledHashJoin BroadcastHashJoin的实现是一种广播变量的实现方法,如果设置了spark.sql.join.broadcastTables这个参数的表(表面逗号隔开) 就会用spark的Broadcast Variables方式先将一张表给查询出来,然后广播到各个机器中,相当于Hive中的map join。 ShuffledHashJoin是一种最传统的默认的join方式,会根据shuffle key进行shuffle的hash join。 [java] view plain copy objectHashJoinextendsStrategywithPredicateHelper{ private[this]defbroadcastHashJoin( leftKeys:Seq[Expression], rightKeys:Seq[Expression], left:LogicalPlan, right:LogicalPlan, condition:Option[Expression], side:BuildSide)={ valbroadcastHashJoin=execution.BroadcastHashJoin( leftKeys,rightKeys,side,planLater(left),planLater(right))(sqlContext) condition.map(Filter(_,broadcastHashJoin)).getOrElse(broadcastHashJoin)::Nil } defbroadcastTables:Seq[String]=sqlContext.joinBroadcastTables.split(",").toBuffer//获取需要广播的表 defapply(plan:LogicalPlan):Seq[SparkPlan]=planmatch{ caseExtractEquiJoinKeys( Inner, leftKeys, rightKeys, condition, left, right@PhysicalOperation(_,_,b:BaseRelation)) ifbroadcastTables.contains(b.tableName)=>//如果右孩子是广播的表,则buildSide取BuildRight broadcastHashJoin(leftKeys,rightKeys,left,right,condition,BuildRight) caseExtractEquiJoinKeys( Inner, leftKeys, rightKeys, condition, left@PhysicalOperation(_,_,b:BaseRelation), right) ifbroadcastTables.contains(b.tableName)=>//如果左孩子是广播的表,则buildSide取BuildLeft broadcastHashJoin(leftKeys,rightKeys,left,right,condition,BuildLeft) caseExtractEquiJoinKeys(Inner,leftKeys,rightKeys,condition,left,right)=> valhashJoin= execution.ShuffledHashJoin(//根据hashkeyshuffle的HashJoin leftKeys,rightKeys,BuildRight,planLater(left),planLater(right)) condition.map(Filter(_,hashJoin)).getOrElse(hashJoin)::Nil case_=>Nil } } 3.3、PartialAggregation PartialAggregation是一个部分聚合的策略,即有些聚合操作可以在local里面完成的,就在local data里完成,而不必要的去shuffle所有的字段。 [java] view plain copy objectPartialAggregationextendsStrategy{ defapply(plan:LogicalPlan):Seq[SparkPlan]=planmatch{ caselogical.Aggregate(groupingExpressions,aggregateExpressions,child)=> //Collectallaggregateexpressions. valallAggregates= aggregateExpressions.flatMap(_collect{casea:AggregateExpression=>a}) //Collectallaggregateexpressionsthatcanbecomputedpartially. valpartialAggregates= aggregateExpressions.flatMap(_collect{casep:PartialAggregate=>p}) //Onlydopartialaggregationifsupportedbyallaggregateexpressions. if(allAggregates.size==partialAggregates.size){ //Createamapofexpressionstotheirpartialevaluationsforallaggregateexpressions. valpartialEvaluations:Map[Long,SplitEvaluation]= partialAggregates.map(a=>(a.id,a.asPartial)).toMap //Weneedtopassallgroupingexpressionsthoughsothegroupingcanhappenasecond //time.Howeversomeofthemmightbeunnamedsowealiasthemallowingthemtobe //referencedinthesecondaggregation. valnamedGroupingExpressions:Map[Expression,NamedExpression]=groupingExpressions.map{ casen:NamedExpression=>(n,n) caseother=>(other,Alias(other,"PartialGroup")()) }.toMap //Replaceaggregationswithanewexpressionthatcomputestheresultfromthealready //computedpartialevaluationsandgroupingvalues. valrewrittenAggregateExpressions=aggregateExpressions.map(_.transformUp{ casee:ExpressionifpartialEvaluations.contains(e.id)=> partialEvaluations(e.id).finalEvaluation casee:ExpressionifnamedGroupingExpressions.contains(e)=> namedGroupingExpressions(e).toAttribute }).asInstanceOf[Seq[NamedExpression]] valpartialComputation= (namedGroupingExpressions.values++ partialEvaluations.values.flatMap(_.partialEvaluations)).toSeq //Constructtwophasedaggregation. execution.Aggregate(//返回execution.Aggregate这个SparkPlan partial=false, namedGroupingExpressions.values.map(_.toAttribute).toSeq, rewrittenAggregateExpressions, execution.Aggregate( partial=true, groupingExpressions, partialComputation, planLater(child))(sqlContext))(sqlContext)::Nil }else{ Nil } case_=>Nil } } 3.4、BroadcastNestedLoopJoin BroadcastNestedLoopJoin是用于Left Outer Join, RightOuter, FullOuter这三种类型的join 而上述的Hash Join仅仅用于InnerJoin,这点要区分开来。 [java] view plain copy objectBroadcastNestedLoopJoinextendsStrategy{ defapply(plan:LogicalPlan):Seq[SparkPlan]=planmatch{ caselogical.Join(left,right,joinType,condition)=> execution.BroadcastNestedLoopJoin( planLater(left),planLater(right),joinType,condition)(sqlContext)::Nil case_=>Nil } } 部分代码; [java] view plain copy if(!matched&&(joinType==LeftOuter||joinType==FullOuter)){//LeftOuterorFullOuter matchedRows+=buildRow(streamedRow++Array.fill(right.output.size)(null)) } } Iterator((matchedRows,includedBroadcastTuples)) } valincludedBroadcastTuples=streamedPlusMatches.map(_._2) valallIncludedBroadcastTuples= if(includedBroadcastTuples.count==0){ newscala.collection.mutable.BitSet(broadcastedRelation.value.size) }else{ streamedPlusMatches.map(_._2).reduce(_++_) } valrightOuterMatches:Seq[Row]= if(joinType==RightOuter||joinType==FullOuter){//RightOuterorFullOuter broadcastedRelation.value.zipWithIndex.filter{ case(row,i)=>!allIncludedBroadcastTuples.contains(i) }.map{ //TODO:Useprojection. case(row,_)=>buildRow(Vector.fill(left.output.size)(null)++row) } }else{ Vector() } 3.5、CartesianProduct [java] view plain copy 笛卡尔积的Join,有待过滤条件的Join。 主要是利用RDD的cartesian实现的。 objectCartesianProductextendsStrategy{ defapply(plan:LogicalPlan):Seq[SparkPlan]=planmatch{ caselogical.Join(left,right,_,None)=> execution.CartesianProduct(planLater(left),planLater(right))::Nil caselogical.Join(left,right,Inner,Some(condition))=> execution.Filter(condition, execution.CartesianProduct(planLater(left),planLater(right)))::Nil case_=>Nil } } 3.6、TakeOrdered TakeOrdered是用于Limit操作的,如果有Limit和Sort操作。 则返回一个TakeOrdered的Spark Plan。 主要也是利用RDD的takeOrdered方法来实现的排序后取TopN。 [java] view plain copy objectTakeOrderedextendsStrategy{ defapply(plan:LogicalPlan):Seq[SparkPlan]=planmatch{ caselogical.Limit(IntegerLiteral(limit),logical.Sort(order,child))=> execution.TakeOrdered(limit,order,planLater(child))(sqlContext)::Nil case_=>Nil } } 3.7、ParquetOperations 支持ParquetOperations的读写,插入Table等。 [java] view plain copy objectParquetOperationsextendsStrategy{ defapply(plan:LogicalPlan):Seq[SparkPlan]=planmatch{ //TODO:needtosupportwritingtoothertypesoffiles.Unifythebelowcodepaths. caselogical.WriteToFile(path,child)=> valrelation= ParquetRelation.create(path,child,sparkContext.hadoopConfiguration) //Note:overwrite=falsebecauseotherwisethemetadatawejustcreatedwillbedeleted InsertIntoParquetTable(relation,planLater(child),overwrite=false)(sqlContext)::Nil caselogical.InsertIntoTable(table:ParquetRelation,partition,child,overwrite)=> InsertIntoParquetTable(table,planLater(child),overwrite)(sqlContext)::Nil casePhysicalOperation(projectList,filters:Seq[Expression],relation:ParquetRelation)=> valprunePushedDownFilters= if(sparkContext.conf.getBoolean(ParquetFilters.PARQUET_FILTER_PUSHDOWN_ENABLED,true)){ (filters:Seq[Expression])=>{ filters.filter{filter=> //Note:filterscannotbepusheddowntoParquetiftheycontainmorecomplex //expressionsthansimple"AttributecmpLiteral"comparisons.Hereweremove //allfiltersthathavebeenpusheddown.Notethatapredicatesuchas //"(AANDB)ORC"canresultin"AORC"beingpusheddown. valrecordFilter=ParquetFilters.createFilter(filter) if(!recordFilter.isDefined){ //Firstcase:thepushdowndidnotresultinanyrecordfilter. true }else{ //Secondcase:arecordfilterwascreated;hereweareconservativein //thesensethatevenif"A"waspushedandwecheckfor"AANDB"we //stillwanttokeep"AANDB"inthehigher-levelfilter,notjust"B". !ParquetFilters.findExpression(recordFilter.get,filter).isDefined } } } }else{ identity[Seq[Expression]]_ } pruneFilterProject( projectList, filters, prunePushedDownFilters, ParquetTableScan(_,relation,filters)(sqlContext))::Nil case_=>Nil } } 3.8、InMemoryScans InMemoryScans主要是对InMemoryRelation这个Logical Plan操作。 调用的其实是Spark Planner里的pruneFilterProject这个方法。 [java] view plain copy objectInMemoryScansextendsStrategy{ defapply(plan:LogicalPlan):Seq[SparkPlan]=planmatch{ casePhysicalOperation(projectList,filters,mem:InMemoryRelation)=> pruneFilterProject( projectList, filters, identity[Seq[Expression]],//Nofiltersarepusheddown. InMemoryColumnarTableScan(_,mem))::Nil case_=>Nil } } 3.9、BasicOperators 所有定义在org.apache.spark.sql.execution里的基本的Spark Plan,它们都在org.apache.spark.sql.execution包下basicOperators.scala内的 有Project、Filter、Sample、Union、Limit、TakeOrdered、Sort、ExistingRdd。 这些是基本元素,实现都相对简单,基本上都是RDD里的方法来实现的。 [java] view plain copy objectBasicOperatorsextendsStrategy{ defnumPartitions=self.numPartitions defapply(plan:LogicalPlan):Seq[SparkPlan]=planmatch{ caselogical.Distinct(child)=> execution.Aggregate( partial=false,child.output,child.output,planLater(child))(sqlContext)::Nil caselogical.Sort(sortExprs,child)=> //Thissortisaglobalsort.ItsrequiredDistributionwillbeanOrderedDistribution. execution.Sort(sortExprs,global=true,planLater(child))::Nil caselogical.SortPartitions(sortExprs,child)=> //Thissortonlysortstupleswithinapartition.ItsrequiredDistributionwillbe //anUnspecifiedDistribution. execution.Sort(sortExprs,global=false,planLater(child))::Nil caselogical.Project(projectList,child)=> execution.Project(projectList,planLater(child))::Nil caselogical.Filter(condition,child)=> execution.Filter(condition,planLater(child))::Nil caselogical.Aggregate(group,agg,child)=> execution.Aggregate(partial=false,group,agg,planLater(child))(sqlContext)::Nil caselogical.Sample(fraction,withReplacement,seed,child)=> execution.Sample(fraction,withReplacement,seed,planLater(child))::Nil caselogical.LocalRelation(output,data)=> valdataAsRdd= sparkContext.parallelize(data.map(r=> newGenericRow(r.productIterator.map(convertToCatalyst).toArray):Row)) execution.ExistingRdd(output,dataAsRdd)::Nil caselogical.Limit(IntegerLiteral(limit),child)=> execution.Limit(limit,planLater(child))(sqlContext)::Nil caseUnions(unionChildren)=> execution.Union(unionChildren.map(planLater))(sqlContext)::Nil caselogical.Generate(generator,join,outer,_,child)=> execution.Generate(generator,join=join,outer=outer,planLater(child))::Nil caselogical.NoRelation=> execution.ExistingRdd(Nil,singleRowRdd)::Nil caselogical.Repartition(expressions,child)=> execution.Exchange(HashPartitioning(expressions,numPartitions),planLater(child))::Nil caseSparkLogicalPlan(existingPlan,_)=>existingPlan::Nil case_=>Nil } } 3.10 CommandStrategy CommandStrategy是专门针对Command类型的Logical Plan 即set key = value 、 explain sql、 cache table xxx 这类操作 SetCommand主要实现方式是SparkContext的参数 ExplainCommand主要实现方式是利用executed Plan打印出tree string CacheCommand主要实现方式SparkContext的cache table和uncache table [java] view plain copy caseclassCommandStrategy(context:SQLContext)extendsStrategy{ defapply(plan:LogicalPlan):Seq[SparkPlan]=planmatch{ caselogical.SetCommand(key,value)=> Seq(execution.SetCommand(key,value,plan.output)(context)) caselogical.ExplainCommand(logicalPlan)=> Seq(execution.ExplainCommand(logicalPlan,plan.output)(context)) caselogical.CacheCommand(tableName,cache)=> Seq(execution.CacheCommand(tableName,cache)(context)) case_=>Nil } } 四、Execution Spark Plan的Execution方式均为调用其execute()方法生成RDD,除了简单的基本操作例如上面的basic operator实现比较简单,其它的实现都比较复杂,大致的实现我都在上面介绍了,本文就不详细讨论了。 五、总结 本文从介绍了Spark SQL的Catalyst框架的Physical plan以及其如何从Optimized Logical Plan转化为Spark Plan的过程,这个过程用到了很多的物理计划策略Strategies,每个Strategies最后还是在RuleExecutor里面被执行,最后生成一系列物理计划Executed Spark Plans。 Spark Plan是执行前最后一种计划,当生成executed spark plan后,就可以调用collect()方法来启动Spark Job来进行Spark SQL的真正执行了。 ——EOF—— 原创文章,转载请注明: 转载自:OopsOutOfMemory盛利的Blog,作者:OopsOutOfMemory 本文链接地址:http://blog.csdn.net/oopsoom/article/details/38235247 注:本文基于署名-非商业性使用-禁止演绎 2.5 中国大陆(CC BY-NC-ND 2.5 CN)协议,欢迎转载、转发和评论,但是请保留本文作者署名和文章链接。如若需要用于商业目的或者与授权方面的协商,请联系我。

优秀的个人博客,低调大师

Spark-SparkSQL深入学习系列五(转自OopsOutOfMemory)

/**Spark SQL源码分析系列文章*/ 前几篇文章介绍了SparkSQL的Catalyst的核心运行流程、SqlParser,和Analyzer以及核心类库TreeNode,本文将详细讲解Spark SQL的Optimizer的优化思想以及Optimizer在Catalyst里的表现方式,并加上自己的实践,对Optimizer有一个直观的认识。 Optimizer的主要职责是将Analyzer给Resolved的Logical Plan根据不同的优化策略Batch,来对语法树进行优化,优化逻辑计划节点(Logical Plan)以及表达式(Expression),也是转换成物理执行计划的前置。如下图: 一、Optimizer Optimizer这个类是在catalyst里的optimizer包下的唯一一个类,Optimizer的工作方式其实类似Analyzer,因为它们都继承自RuleExecutor[LogicalPlan],都是执行一系列的Batch操作: Optimizer里的batches包含了3类优化策略:1、Combine Limits 合并Limits2、ConstantFolding 常量合并3、Filter Pushdown 过滤器下推,每个Batch里定义的优化伴随对象都定义在Optimizer里了: [java] view plain copy objectOptimizerextendsRuleExecutor[LogicalPlan]{ valbatches= Batch("CombineLimits",FixedPoint(100), CombineLimits):: Batch("ConstantFolding",FixedPoint(100), NullPropagation, ConstantFolding, BooleanSimplification, SimplifyFilters, SimplifyCasts, SimplifyCaseConversionExpressions):: Batch("FilterPushdown",FixedPoint(100), CombineFilters, PushPredicateThroughProject, PushPredicateThroughJoin, ColumnPruning)::Nil } 另外提一点,Optimizer里不但对Logical Plan进行了优化,而且对Logical Plan中的Expression也进行了优化,所以有必要了解一下Expression相关类,主要是用到了references和outputSet,references主要是Logical Plan或Expression节点的所依赖的那些Expressions,而outputSet是Logical Plan所有的Attribute的输出: 如:Aggregate是一个Logical Plan, 它的references就是group by的表达式 和 aggreagate的表达式的并集去重。 [java] view plain copy caseclassAggregate( groupingExpressions:Seq[Expression], aggregateExpressions:Seq[NamedExpression], child:LogicalPlan) extendsUnaryNode{ overridedefoutput=aggregateExpressions.map(_.toAttribute) overridedefreferences= (groupingExpressions++aggregateExpressions).flatMap(_.references).toSet } 二、优化策略详解 Optimizer的优化策略不仅有对plan进行transform的,也有对expression进行transform的,究其原理就是遍历树,然后应用优化的Rule,但是注意一点,对Logical Plantransfrom的是 先序遍历(pre-order),而对Expression transfrom的时候是 后序遍历(post-order): 2.1、Batch:Combine Limits 如果出现了2个Limit,则将2个Limit合并为一个,这个要求一个Limit是另一个Limit的grandChild。 [java] view plain copy /** *Combinestwoadjacent[[Limit]]operatorsintoone,mergingthe *expressionsintoonesingleexpression. */ objectCombineLimitsextendsRule[LogicalPlan]{ defapply(plan:LogicalPlan):LogicalPlan=plantransform{ casell@Limit(le,nl@Limit(ne,grandChild))=>//ll为当前Limit,le为其expression,nl是ll的grandChild,ne是nl的expression Limit(If(LessThan(ne,le),ne,le),grandChild)//expression比较,如果ne比le小则表达式为ne,否则为le } } 给定SQL:val query = sql("select * from (select * from temp_shengli limit 100)a limit 10 ") [java] view plain copy scala>query.queryExecution.analyzed res12:org.apache.spark.sql.catalyst.plans.logical.LogicalPlan= Limit10 Project[key#13,value#14] Limit100 Project[key#13,value#14] MetastoreRelationdefault,temp_shengli,None 子查询里limit100,外层查询limit10,这里我们当然可以在子查询里不必查那么多,因为外层只需要10个,所以这里会合并Limit10,和Limit100 为 Limit 10。 2.2、Batch:ConstantFolding 这个Batch里包含了Rules:NullPropagation,ConstantFolding,BooleanSimplification,SimplifyFilters,SimplifyCasts,SimplifyCaseConversionExpressions。 2.2.1、Rule:NullPropagation 这里先提一下Literal字面量,它其实是一个能匹配任意基本类型的类。(为下文做铺垫) [java] view plain copy objectLiteral{ defapply(v:Any):Literal=vmatch{ casei:Int=>Literal(i,IntegerType) casel:Long=>Literal(l,LongType) cased:Double=>Literal(d,DoubleType) casef:Float=>Literal(f,FloatType) caseb:Byte=>Literal(b,ByteType) cases:Short=>Literal(s,ShortType) cases:String=>Literal(s,StringType) caseb:Boolean=>Literal(b,BooleanType) cased:BigDecimal=>Literal(d,DecimalType) caset:Timestamp=>Literal(t,TimestampType) casea:Array[Byte]=>Literal(a,BinaryType) casenull=>Literal(null,NullType) } } 注意Literal是一个LeafExpression,核心方法是eval,给定Row,计算表达式返回值: [java] view plain copy caseclassLiteral(value:Any,dataType:DataType)extendsLeafExpression{ overridedeffoldable=true defnullable=value==null defreferences=Set.empty overridedeftoString=if(value!=null)value.toStringelse"null" typeEvaluatedType=Any overridedefeval(input:Row):Any=value } 现在来看一下NullPropagation都做了什么。 NullPropagation是一个能将Expression Expressions替换为等价的Literal值的优化,并且能够避免NULL值在SQL语法树的传播。 [java] view plain copy /** *Replaces[[ExpressionExpressions]]thatcanbestaticallyevaluatedwith *equivalent[[Literal]]values.Thisruleismorespecificwith *Nullvaluepropagationfrombottomtotopoftheexpressiontree. */ objectNullPropagationextendsRule[LogicalPlan]{ defapply(plan:LogicalPlan):LogicalPlan=plantransform{ caseq:LogicalPlan=>qtransformExpressionsUp{ casee@Count(Literal(null,_))=>Cast(Literal(0L),e.dataType)//如果count(null)则转化为count(0) casee@Sum(Literal(c,_))ifc==0=>Cast(Literal(0L),e.dataType)<spanstyle="font-family:Arial;">//如果sum(null)则转化为sum(0)</span> casee@Average(Literal(c,_))ifc==0=>Literal(0.0,e.dataType) casee@IsNull(c)if!c.nullable=>Literal(false,BooleanType) casee@IsNotNull(c)if!c.nullable=>Literal(true,BooleanType) casee@GetItem(Literal(null,_),_)=>Literal(null,e.dataType) casee@GetItem(_,Literal(null,_))=>Literal(null,e.dataType) casee@GetField(Literal(null,_),_)=>Literal(null,e.dataType) casee@Coalesce(children)=>{ valnewChildren=children.filter(c=>cmatch{ caseLiteral(null,_)=>false case_=>true }) if(newChildren.length==0){ Literal(null,e.dataType) }elseif(newChildren.length==1){ newChildren(0) }else{ Coalesce(newChildren) } } casee@If(Literal(v,_),trueValue,falseValue)=>if(v==true)trueValueelsefalseValue casee@In(Literal(v,_),list)if(list.exists(c=>cmatch{ caseLiteral(candidate,_)ifcandidate==v=>true case_=>false }))=>Literal(true,BooleanType) //Putexceptionalcasesaboveifany casee:BinaryArithmetic=>e.childrenmatch{ caseLiteral(null,_)::right::Nil=>Literal(null,e.dataType) caseleft::Literal(null,_)::Nil=>Literal(null,e.dataType) case_=>e } casee:BinaryComparison=>e.childrenmatch{ caseLiteral(null,_)::right::Nil=>Literal(null,e.dataType) caseleft::Literal(null,_)::Nil=>Literal(null,e.dataType) case_=>e } casee:StringRegexExpression=>e.childrenmatch{ caseLiteral(null,_)::right::Nil=>Literal(null,e.dataType) caseleft::Literal(null,_)::Nil=>Literal(null,e.dataType) case_=>e } } } } 给定SQL:val query = sql("select count(null) from temp_shengli where key is not null") [java] view plain copy scala>query.queryExecution.analyzed res6:org.apache.spark.sql.catalyst.plans.logical.LogicalPlan= Aggregate[],[COUNT(null)ASc0#5L]//这里count的是null FilterISNOTNULLkey#7 MetastoreRelationdefault,temp_shengli,None 调用NullPropagation [java] view plain copy scala>NullPropagation(query.queryExecution.analyzed) res7:org.apache.spark.sql.catalyst.plans.logical.LogicalPlan= Aggregate[],[CAST(0,LongType)ASc0#5L]//优化后为0了 FilterISNOTNULLkey#7 MetastoreRelationdefault,temp_shengli,None 2.2.2、Rule:ConstantFolding 常量合并是属于Expression优化的一种,对于可以直接计算的常量,不用放到物理执行里去生成对象来计算了,直接可以在计划里就计算出来: [java] view plain copy objectConstantFoldingextendsRule[LogicalPlan]{ defapply(plan:LogicalPlan):LogicalPlan=plantransform{//先对plan进行transform caseq:LogicalPlan=>qtransformExpressionsDown{//对每个plan的expression进行transform //Skipredundantfoldingofliterals. casel:Literal=>l caseeife.foldable=>Literal(e.eval(null),e.dataType)//调用eval方法计算结果 } } } 给定SQL:val query = sql("select 1+2+3+4 from temp_shengli") [java] view plain copy scala>query.queryExecution.analyzed res23:org.apache.spark.sql.catalyst.plans.logical.LogicalPlan= Project[(((1+2)+3)+4)ASc0#21]//这里还是常量表达式 MetastoreRelationdefault,src,None 优化后: [java] view plain copy scala>query.queryExecution.optimizedPlan res24:org.apache.spark.sql.catalyst.plans.logical.LogicalPlan= Project[10ASc0#21]//优化后,直接合并为10 MetastoreRelationdefault,src,None 2.2.3、BooleanSimplification 这个是对布尔表达式的优化,有点像Java布尔表达式中的短路判断,不过这个写的倒是很优雅。 看看布尔表达式2边能不能通过只计算1边,而省去计算另一边而提高效率,称为简化布尔表达式。 解释请看我写的注释: [java] view plain copy /** *Simplifiesbooleanexpressionswheretheanswercanbedeterminedwithoutevaluatingbothsides. *Notethatthisrulecaneliminateexpressionsthatmightotherwisehavebeenevaluatedandthus *isonlysafewhenevaluationsofexpressionsdoesnotresultinsideeffects. */ objectBooleanSimplificationextendsRule[LogicalPlan]{ defapply(plan:LogicalPlan):LogicalPlan=plantransform{ caseq:LogicalPlan=>qtransformExpressionsUp{ caseand@And(left,right)=>//如果布尔表达式是AND操作,即exp1andexp2 (left,right)match{//(左边表达式,右边表达式) case(Literal(true,BooleanType),r)=>r//左边true,返回右边的<spanstyle="font-family:Arial;">bool</span><spanstyle="font-family:Arial;">值</span> case(l,Literal(true,BooleanType))=>l//右边true,返回左边的bool值 case(Literal(false,BooleanType),_)=>Literal(false)//左边都false,右边随便,反正是返回false case(_,Literal(false,BooleanType))=>Literal(false)//只要有1边是false了,都是false case(_,_)=>and } caseor@Or(left,right)=> (left,right)match{ case(Literal(true,BooleanType),_)=>Literal(true)//只要左边是true了,不用判断右边都是true case(_,Literal(true,BooleanType))=>Literal(true)//只要有一边是true,都返回true case(Literal(false,BooleanType),r)=>r//希望右边r是true case(l,Literal(false,BooleanType))=>l case(_,_)=>or } } } } 2.3 Batch:Filter Pushdown Filter Pushdown下包含了CombineFilters、PushPredicateThroughProject、PushPredicateThroughJoin、ColumnPruning Ps:感觉Filter Pushdown的名字起的有点不能涵盖全部比如ColumnPruning列裁剪。 2.3.1、Combine Filters 合并两个相邻的Filter,这个和上述Combine Limit差不多。合并2个节点,就可以 减少树的深度从而减少重复执行过滤的代价。 [java] view plain copy /** *Combinestwoadjacent[[Filter]]operatorsintoone,mergingthe *conditionsintooneconjunctivepredicate. */ objectCombineFiltersextendsRule[LogicalPlan]{ defapply(plan:LogicalPlan):LogicalPlan=plantransform{ caseff@Filter(fc,nf@Filter(nc,grandChild))=>Filter(And(nc,fc),grandChild) } } 给定SQL:val query = sql("select key from (select key from temp_shengli where key >100)a where key > 80 ") 优化前:我们看到一个filter 是另一个filter的grandChild [java] view plain copy scala>query.queryExecution.analyzed res25:org.apache.spark.sql.catalyst.plans.logical.LogicalPlan= Project[key#27] Filter(key#27>80)//filter>80 Project[key#27] Filter(key#27>100)//filter>100 MetastoreRelationdefault,src,None 优化后:其实filter也可以表达为一个复杂的boolean表达式 [java] view plain copy scala>query.queryExecution.optimizedPlan res26:org.apache.spark.sql.catalyst.plans.logical.LogicalPlan= Project[key#27] Filter((key#27>100)&&(key#27>80))//合并为1个 MetastoreRelationdefault,src,None 2.3.2 Filter Pushdown Filter Pushdown,过滤器下推。 原理就是更早的过滤掉不需要的元素来减少开销。 给定SQL:val query = sql("select key from (select * from temp_shengli)a where key>100") 生成的逻辑计划为: [java] view plain copy scala>scala>query.queryExecution.analyzed res29:org.apache.spark.sql.catalyst.plans.logical.LogicalPlan= Project[key#31] Filter(key#31>100)//先selectkey,value,然后再Filter Project[key#31,value#32] MetastoreRelationdefault,src,None 优化后的计划为: [java] view plain copy query.queryExecution.optimizedPlan res30:org.apache.spark.sql.catalyst.plans.logical.LogicalPlan= Project[key#31] Filter(key#31>100)//先filter,然后再select MetastoreRelationdefault,src,None 2.3.3、ColumnPruning 列裁剪用的比较多,就是减少不必要select的某些列。 列裁剪在3种地方可以用: 1、在聚合操作中,可以做列裁剪 2、在join操作中,左右孩子可以做列裁剪 3、合并相邻的Project的列 [java] view plain copy objectColumnPruningextendsRule[LogicalPlan]{ defapply(plan:LogicalPlan):LogicalPlan=plantransform{ //Eliminateattributesthatarenotneededtocalculatethespecifiedaggregates. casea@Aggregate(_,_,child)if(child.outputSet--a.references).nonEmpty=>////如果project的outputSet中减去a.references的元素如果不同,那么就将Aggreagte的child替换为a.references a.copy(child=Project(a.references.toSeq,child)) //EliminateunneededattributesfromeithersideofaJoin. caseProject(projectList,Join(left,right,joinType,condition))=>//消除join的left和right孩子的不必要属性,将join的左右子树的列进行裁剪 //Collectthelistofoffreferencesrequiredeitheraboveortoevaluatethecondition. valallReferences:Set[Attribute]= projectList.flatMap(_.references).toSet++condition.map(_.references).getOrElse(Set.empty) /**Appliesaprojectiononlywhenthechildisproducingunnecessaryattributes*/ defprunedChild(c:LogicalPlan)= if((c.outputSet--allReferences.filter(c.outputSet.contains)).nonEmpty){ Project(allReferences.filter(c.outputSet.contains).toSeq,c) }else{ c } Project(projectList,Join(prunedChild(left),prunedChild(right),joinType,condition)) //CombineadjacentProjects. caseProject(projectList1,Project(projectList2,child))=>//合并相邻Project的列 //CreateamapofAliasestotheirvaluesfromthechildprojection. //e.g.,'SELECT...FROM(SELECTa+bASc,d...)'producesMap(c->Alias(a+b,c)). valaliasMap=projectList2.collect{ casea@Alias(e,_)=>(a.toAttribute:Expression,a) }.toMap //Substituteanyattributesthatareproducedbythechildprojection,sothatwesafely //eliminateit. //e.g.,'SELECTc+1FROM(SELECTa+bASC...'produces'SELECTa+b+1...' //TODO:FixTransformBasetoavoidthecastbelow. valsubstitutedProjection=projectList1.map(_.transform{ caseaifaliasMap.contains(a)=>aliasMap(a) }).asInstanceOf[Seq[NamedExpression]] Project(substitutedProjection,child) //Eliminateno-opProjects caseProject(projectList,child)ifchild.output==projectList=>child } } 分别举三个例子来对应三种情况进行说明: 1、在聚合操作中,可以做列裁剪 给定SQL:val query = sql("SELECT 1+1 as shengli, key from (select key, value from temp_shengli)a group by key") 优化前: [java] view plain copy res57:org.apache.spark.sql.catalyst.plans.logical.LogicalPlan= Aggregate[key#51],[(1+1)ASshengli#49,key#51] Project[key#51,value#52]//优化前默认selectkey和value两列 MetastoreRelationdefault,temp_shengli,None 优化后: [java] view plain copy scala>ColumnPruning1(query.queryExecution.analyzed) MetastoreRelationdefault,temp_shengli,None res59:org.apache.spark.sql.catalyst.plans.logical.LogicalPlan= Aggregate[key#51],[(1+1)ASshengli#49,key#51] Project[key#51]//优化后,列裁剪掉了value,只selectkey MetastoreRelationdefault,temp_shengli,None 2、 在join操作中,左右孩子可以做列裁剪 给定SQL:val query = sql("select a.value qween from (select * from temp_shengli) a join (select * from temp_shengli)b on a.key =b.key ") 没有优化之前: [java] view plain copy scala>query.queryExecution.analyzed res51:org.apache.spark.sql.catalyst.plans.logical.LogicalPlan= Project[value#42ASqween#39] JoinInner,Some((key#41=key#43)) Project[key#41,value#42]//这里多select了一列,即value MetastoreRelationdefault,temp_shengli,None Project[key#43,value#44]//这里多select了一列,即value MetastoreRelationdefault,temp_shengli,None 优化后:(ColumnPruning2是我自己调试用的) [java] view plain copy scala>ColumnPruning2(query.queryExecution.analyzed) allReferencesis->Set(key#35,key#37) MetastoreRelationdefault,temp_shengli,None MetastoreRelationdefault,temp_shengli,None res47:org.apache.spark.sql.catalyst.plans.logical.LogicalPlan= Project[key#35ASqween#33] JoinInner,Some((key#35=key#37)) Project[key#35]//经过列裁剪之后,leftChild只需要selectkey这一个列 MetastoreRelationdefault,temp_shengli,None Project[key#37]//经过列裁剪之后,rightChild只需要selectkey这一个列 MetastoreRelationdefault,temp_shengli,None 3、 合并相邻的Project的列,裁剪 给定SQL:val query = sql("SELECT c + 1 FROM (SELECT 1 + 1 as c from temp_shengli ) a ") 优化前: [java] view plain copy scala>query.queryExecution.analyzed res61:org.apache.spark.sql.catalyst.plans.logical.LogicalPlan= Project[(c#56+1)ASc0#57] Project[(1+1)ASc#56] MetastoreRelationdefault,temp_shengli,None 优化后: [java] view plain copy scala>query.queryExecution.optimizedPlan res62:org.apache.spark.sql.catalyst.plans.logical.LogicalPlan= Project[(2ASc#56+1)ASc0#57]//将子查询里的c代入到外层select里的c,直接计算结果 MetastoreRelationdefault,temp_shengli,None 三、总结: 本文介绍了Optimizer在Catalyst里的作用即将Analyzed Logical Plan 经过对Logical Plan和Expression进行Rule的应用transfrom,从而达到树的节点进行合并和优化。其中主要的优化的策略总结起来是合并、列裁剪、过滤器下推几大类。 Catalyst应该在不断迭代中,本文只是基于spark1.0.0进行研究,后续如果新加入的优化策略也会在后续补充进来。 欢迎大家讨论,共同进步! ——EOF—— 原创文章,转载请注明: 转载自:OopsOutOfMemory盛利的Blog,作者:OopsOutOfMemory 本文链接地址:http://blog.csdn.net/oopsoom/article/details/38121259 注:本文基于署名-非商业性使用-禁止演绎 2.5 中国大陆(CC BY-NC-ND 2.5 CN)协议,欢迎转载、转发和评论,但是请保留本文作者署名和文章链接。如若需要用于商业目的或者与授权方面的协商,请联系我。

优秀的个人博客,低调大师

Spark-SparkSQL深入学习系列四(转自OopsOutOfMemory)

/**Spark SQL源码分析系列文章*/ 前几篇文章介绍了SparkSQL的Catalyst的核心运行流程、SqlParser,和Analyzer,本来打算直接写Optimizer的,但是发现忘记介绍TreeNode这个Catalyst的核心概念,介绍这个可以更好的理解Optimizer是如何对Analyzed Logical Plan进行优化的生成Optimized Logical Plan,本文就将TreeNode基本架构进行解释。 一、TreeNode类型 TreeNode Library是Catalyst的核心类库,语法树的构建都是由一个个TreeNode组成。TreeNode本身是一个BaseType <: TreeNode[BaseType] 的类型,并且实现了Product这个trait,这样可以存放异构的元素了。 TreeNode有三种形态: BinaryNode 、 UnaryNode 、 Leaf Node . 在Catalyst里,这些Node都是继承自Logical Plan,可以说每一个TreeNode节点就是一个Logical Plan(包含Expression)(直接继承自TreeNode) 主要继承关系类图如下: 1、BinaryNode 二元节点,即有左右孩子的二叉节点 [java] view plain copy [[TreeNode]]thathastwochildren,[[left]]and[[right]]. traitBinaryNode[BaseType<:TreeNode[BaseType]]{ defleft:BaseType defright:BaseType defchildren=Seq(left,right) } abstractclassBinaryNodeextendsLogicalPlanwithtrees.BinaryNode[LogicalPlan]{ self:Product=> } 节点定义比较简单,左孩子,右孩子都是BaseType。 children是一个Seq(left, right) 下面列出主要继承二元节点的类,可以当查询手册用 :) 这里提示下平常常用的二元节点:Join和Union 2、UnaryNode 一元节点,即只有一个孩子节点 [java] view plain copy A[[TreeNode]]withasingle[[child]]. traitUnaryNode[BaseType<:TreeNode[BaseType]]{ defchild:BaseType defchildren=child::Nil } abstractclassUnaryNodeextendsLogicalPlanwithtrees.UnaryNode[LogicalPlan]{ self:Product=> } 下面列出主要继承一元节点的类,可以当查询手册用 :) 常用的二元节点有,Project,Subquery,Filter,Limit...等 3、Leaf Node 叶子节点,没有孩子节点的节点。 [java] view plain copy A[[TreeNode]]withnochildren. traitLeafNode[BaseType<:TreeNode[BaseType]]{ defchildren=Nil } abstractclassLeafNodeextendsLogicalPlanwithtrees.LeafNode[LogicalPlan]{ self:Product=> //Leafnodesbydefinitioncannotreferenceanyinputattributes. overridedefreferences=Set.empty } 下面列出主要继承叶子节点的类,可以当查询手册用 :) 提示常用的叶子节点:Command类系列,一些Funtion函数,以及Unresolved Relation...etc. 二、TreeNode 核心方法 简单介绍一个TreeNode这个类的属性和方法 currentId 一颗树里的TreeNode有个唯一的id,类型是Java.util.concurrent.atomic.AtomicLong原子类型。 [java] view plain copy privatevalcurrentId=newjava.util.concurrent.atomic.AtomicLong protecteddefnextId()=currentId.getAndIncrement() sameInstance 判断2个实例是否是同一个的时候,只需要判断TreeNode的id。 [java] view plain copy defsameInstance(other:TreeNode[_]):Boolean={ this.id==other.id } fastEquals ,更常用的一个快捷的判定方法,没有重写Object.Equals,这样防止scala编译器生成case class equals 方法 [java] view plain copy deffastEquals(other:TreeNode[_]):Boolean={ sameInstance(other)||this==other } map,flatMap,collect都是递归的对子节点进行应用PartialFunction,其它方法还有很多,篇幅有限这里不一一描述了。 2.1、核心方法 transform 方法 transform该方法接受一个PartialFunction,就是就是前一篇文章Analyzer里提到的Batch里面的Rule。 是会将Rule迭代应用到该节点的所有子节点,最后返回这个节点的副本(一个和当前节点不同的节点,后面会介绍,其实就是利用反射来返回一个修改后的节点)。 如果rule没有对一个节点进行PartialFunction的操作,就返回这个节点本身。 来看一个例子: [java] view plain copy objectGlobalAggregatesextendsRule[LogicalPlan]{ defapply(plan:LogicalPlan):LogicalPlan=plantransform{//apply方法这里调用了logicalplan(TreeNode)的transform方法来应用一个PartialFunction。 caseProject(projectList,child)ifcontainsAggregates(projectList)=> Aggregate(Nil,projectList,child) } defcontainsAggregates(exprs:Seq[Expression]):Boolean={ exprs.foreach(_.foreach{ caseagg:AggregateExpression=>returntrue case_=> }) false } } 这个方法真正的调用是transformChildrenDown,这里提到了用先序遍历来对子节点进行递归的Rule应用。 如果在对当前节点应用rule成功,修改后的节点afterRule,来对其children节点进行rule的应用。 transformDown方法: [java] view plain copy /** *Returnsacopyofthisnodewhere`rule`hasbeenrecursivelyappliedtoitandallofits *children(pre-order).When`rule`doesnotapplytoagivennodeitisleftunchanged. *@paramrulethefunctionusedtotransformthisnodeschildren */ eftransformDown(rule:PartialFunction[BaseType,BaseType]):BaseType={ valafterRule=rule.applyOrElse(this,identity[BaseType]) //Checkifunchangedandthenpossiblyreturnoldcopytoavoidgcchurn. if(thisfastEqualsafterRule){ transformChildrenDown(rule)//修改前节点this.transformChildrenDown(rule) }else{ afterRule.transformChildrenDown(rule)//修改后节点进行transformChildrenDown } 最重要的方法transformChildrenDown: 对children节点进行递归的调用PartialFunction,利用最终返回的newArgs来生成一个新的节点,这里调用了makeCopy()来生成节点。 transformChildrenDown方法: [java] view plain copy /** *Returnsacopyofthisnodewhere`rule`hasbeenrecursivelyappliedtoallthechildrenof *thisnode.When`rule`doesnotapplytoagivennodeitisleftunchanged. *@paramrulethefunctionusedtotransformthisnodeschildren */ deftransformChildrenDown(rule:PartialFunction[BaseType,BaseType]):this.type={ varchanged=false valnewArgs=productIterator.map{ casearg:TreeNode[_]ifchildrencontainsarg=> valnewChild=arg.asInstanceOf[BaseType].transformDown(rule)//递归子节点应用rule if(!(newChildfastEqualsarg)){ changed=true newChild }else{ arg } caseSome(arg:TreeNode[_])ifchildrencontainsarg=> valnewChild=arg.asInstanceOf[BaseType].transformDown(rule) if(!(newChildfastEqualsarg)){ changed=true Some(newChild) }else{ Some(arg) } casem:Map[_,_]=>m caseargs:Traversable[_]=>args.map{ casearg:TreeNode[_]ifchildrencontainsarg=> valnewChild=arg.asInstanceOf[BaseType].transformDown(rule) if(!(newChildfastEqualsarg)){ changed=true newChild }else{ arg } caseother=>other } casenonChild:AnyRef=>nonChild casenull=>null }.toArray if(changed)makeCopy(newArgs)elsethis//根据作用结果返回的newArgs数组,反射生成新的节点副本。 } makeCopy方法,反射生成节点副本 [java] view plain copy /** *Createsacopyofthistypeoftreenodeafteratransformation. *Mustbeoverriddenbychildclassesthathaveconstructorarguments *thatarenotpresentintheproductIterator. *@paramnewArgsthenewproductarguments. */ defmakeCopy(newArgs:Array[AnyRef]):this.type=attachTree(this,"makeCopy"){ try{ valdefaultCtor=getClass.getConstructors.head//反射获取默认构造函数的第一个 if(otherCopyArgs.isEmpty){ defaultCtor.newInstance(newArgs:_*).asInstanceOf[this.type]//反射生成当前节点类型的节点 }else{ defaultCtor.newInstance((newArgs++otherCopyArgs).toArray:_*).asInstanceOf[this.type]//如果还有其它参数,++ } }catch{ casee:java.lang.IllegalArgumentException=> thrownewTreeNodeException( this,s"Failedtocopynode.IsotherCopyArgsspecifiedcorrectlyfor$nodeName?" +s"Exceptionmessage:${e.getMessage}.") } } 三、TreeNode实例 现在准备从一段sql来出发,画一下这个spark sql的整体树的transformation。 SELECT * FROM (SELECT * FROM src) a join (select * from src)b on a.key=b.key 首先,我们先执行一下,在控制台里看一下生成的计划: [java] view plain copy <spanstyle="font-size:12px;">sbt/sbthive/console Using/usr/java/defaultasdefaultJAVA_HOME. Note,thiswillbeoverriddenby-java-homeifitisset. [info]Loadingprojectdefinitionfrom/app/hadoop/shengli/spark/project/project [info]Loadingprojectdefinitionfrom/app/hadoop/shengli/spark/project [info]Setcurrentprojecttoroot(inbuildfile:/app/hadoop/shengli/spark/) [info]Startingscalainterpreter... [info] importorg.apache.spark.sql.catalyst.analysis._ importorg.apache.spark.sql.catalyst.dsl._ importorg.apache.spark.sql.catalyst.errors._ importorg.apache.spark.sql.catalyst.expressions._ importorg.apache.spark.sql.catalyst.plans.logical._ importorg.apache.spark.sql.catalyst.rules._ importorg.apache.spark.sql.catalyst.types._ importorg.apache.spark.sql.catalyst.util._ importorg.apache.spark.sql.execution importorg.apache.spark.sql.hive._ importorg.apache.spark.sql.hive.test.TestHive._ importorg.apache.spark.sql.parquet.ParquetTestData scala>valquery=sql("SELECT*FROM(SELECT*FROMsrc)ajoin(select*fromsrc)bona.key=b.key")</span> 3.1、UnResolve Logical Plan 第一步生成UnResolve Logical Plan 如下: [java] view plain copy scala>query.queryExecution.logical res0:org.apache.spark.sql.catalyst.plans.logical.LogicalPlan= Project[*] JoinInner,Some(('a.key='b.key)) Subquerya Project[*] UnresolvedRelationNone,src,None Subqueryb Project[*] UnresolvedRelationNone,src,None 如果画成树是这样的,仅个人理解: 我将一开始介绍的三种Node分别用绿色UnaryNode,红色Binary Node 和 蓝色 LeafNode 来表示。 3.2、Analyzed Logical Plan Analyzer会将允用Batch的Rules来对Unresolved Logical Plan Tree 进行rule应用,这里用来EliminateAnalysisOperators将Subquery给消除掉,Batch("Resolution将Atrribute和Relation给Resolve了,Analyzed Logical Plan Tree如下图: 3.3、Optimized Plan 我把Catalyst里的Optimizer戏称为Spark SQL的优化大师,因为整个Spark SQL的优化都是在这里进行的,后面会有文章来讲解Optimizer。 在这里,优化的不明显,因为SQL本身不复杂 [java] view plain copy scala>query.queryExecution.optimizedPlan res3:org.apache.spark.sql.catalyst.plans.logical.LogicalPlan= Project[key#0,value#1,key#2,value#3] JoinInner,Some((key#0=key#2)) MetastoreRelationdefault,src,None MetastoreRelationdefault,src,None 生成的树如下图: 3.4、executedPlan 最后一步是最终生成的物理执行计划,里面涉及到了Hive的TableScan,涉及到了HashJoin操作,还涉及到了Exchange,Exchange涉及到了Shuffle和Partition操作。 [java] view plain copy scala>query.queryExecution.executedPlan res4:org.apache.spark.sql.execution.SparkPlan= Project[key#0:0,value#1:1,key#2:2,value#3:3] HashJoin[key#0],[key#2],BuildRight Exchange(HashPartitioning[key#0:0],150) HiveTableScan[key#0,value#1],(MetastoreRelationdefault,src,None),None Exchange(HashPartitioning[key#2:0],150) HiveTableScan[key#2,value#3],(MetastoreRelationdefault,src,None),None 生成的物理执行树如图: 四、总结: 本文介绍了Spark SQL的Catalyst框架核心TreeNode类库,绘制了TreeNode继承关系的类图,了解了TreeNode这个类在Catalyst所起到的作用。语法树中的Logical Plan均派生自TreeNode,并且Logical Plan派生出TreeNode的三种形态,即Binary Node, Unary Node, Leaft Node。 正式这几种节点,组成了Spark SQl的Catalyst的语法树。 TreeNode的transform方法是核心的方法,它接受一个rule,会对当前节点的孩子节点进行递归的调用rule,最后会返回一个TreeNode的copy,这种操作就是transformation,贯穿了Spark SQL执行的几个核心阶段,如Analyze,Optimize阶段。 最后用一个实际的例子,展示出来Spark SQL的执行树生成流程。 我目前的理解就是这些,如果分析不到位的地方,请大家多多指正。 ——EOF—— 原创文章,转载请注明: 转载自:OopsOutOfMemory盛利的Blog,作者:OopsOutOfMemory 本文链接地址:http://blog.csdn.net/oopsoom/article/details/38084079 注:本文基于署名-非商业性使用-禁止演绎 2.5 中国大陆(CC BY-NC-ND 2.5 CN)协议,欢迎转载、转发和评论,但是请保留本文作者署名和文章链接。如若需要用于商业目的或者与授权方面的协商,请联系我。

优秀的个人博客,低调大师

Spark-SparkSQL深入学习系列八(转自OopsOutOfMemory)

/**Spark SQL源码分析系列文章*/ 在SQL的世界里,除了官方提供的常用的处理函数之外,一般都会提供可扩展的对外自定义函数接口,这已经成为一种事实的标准。 在前面Spark SQL源码分析之核心流程一文中,已经介绍了SparkSQL Catalyst Analyzer的作用,其中包含了ResolveFunctions这个解析函数的功能。但是随着Spark1.1版本的发布,Spark SQL的代码有很多新完善和新功能了,和我先前基于1.0的源码分析多少有些不同,比如支持UDF: spark1.0及以前的实现: [java] view plain copy protected[sql]lazyvalcatalog:Catalog=newSimpleCatalog @transient protected[sql]lazyvalanalyzer:Analyzer= newAnalyzer(catalog,EmptyFunctionRegistry,caseSensitive=true)//EmptyFunctionRegistry空实现 @transient protected[sql]valoptimizer=Optimizer Spark1.1及以后的实现: [java] view plain copy protected[sql]lazyvalfunctionRegistry:FunctionRegistry=newSimpleFunctionRegistry//SimpleFunctionRegistry实现,支持简单的UDF @transient protected[sql]lazyvalanalyzer:Analyzer= newAnalyzer(catalog,functionRegistry,caseSensitive=true) 一、引子: 对于SQL语句中的函数,会经过SqlParser的的解析成UnresolvedFunction。UnresolvedFunction最后会被Analyzer解析。 SqlParser: 除了非官方定义的函数外,还可以定义自定义函数,sql parser会进行解析。 [java] view plain copy ident~"("~repsep(expression,",")<~")"^^{ caseudfName~_~exprs=>UnresolvedFunction(udfName,exprs) 将SqlParser传入的udfName和exprs封装成一个class class UnresolvedFunction继承自Expression。 只是这个Expression的dataType等一系列属性和eval计算方法均无法访问,强制访问会抛出异常,因为它没有被Resolved,只是一个载体。 [java] view plain copy caseclassUnresolvedFunction(name:String,children:Seq[Expression])extendsExpression{ overridedefdataType=thrownewUnresolvedException(this,"dataType") overridedeffoldable=thrownewUnresolvedException(this,"foldable") overridedefnullable=thrownewUnresolvedException(this,"nullable") overridelazyvalresolved=false //Unresolvedfunctionsaretransientatcompiletimeanddon'tgetevaluatedduringexecution. overridedefeval(input:Row=null):EvaluatedType= thrownewTreeNodeException(this,s"Nofunctiontoevaluateexpression.type:${this.nodeName}") overridedeftoString=s"'$name(${children.mkString(",")})" }<strong></strong> Analyzer: Analyzer初始化的时候会需要Catalog,database和table的元数据关系,以及FunctionRegistry来维护UDF名称和UDF实现的元数据,这里使用SimpleFunctionRegistry。 [java] view plain copy /** *Replaces[[UnresolvedFunction]]swithconcrete[[catalyst.expressions.ExpressionExpressions]]. */ objectResolveFunctionsextendsRule[LogicalPlan]{ defapply(plan:LogicalPlan):LogicalPlan=plantransform{ caseq:LogicalPlan=> qtransformExpressions{//对当前LogicalPlan进行transformExpressions操作 caseu@UnresolvedFunction(name,children)ifu.childrenResolved=>//如果遍历到了UnresolvedFunction registry.lookupFunction(name,children)//从UDF元数据表里查找udf函数 } } } 二、UDF注册 2.1UDFRegistration registerFunction("len", (x:String)=>x.length) registerFunction是UDFRegistration下的方法,SQLContext现在实现了UDFRegistration这个trait,只要导入SQLContext,即可以使用udf功能。 UDFRegistration核心方法registerFunction: registerFunction方法签名def registerFunction[T: TypeTag](name: String, func: Function1[_, T]): Unit 接受一个udfName 和 一个FunctionN,可以是Function1 到Function22。即这个udf的参数只支持1-22个。(scala的痛啊) 内部builder通过ScalaUdf来构造一个Expression,这里ScalaUdf继承自Expression(可以简单的理解目前的SimpleUDF即是一个Catalyst的一个Expression),传入scala的function作为UDF的实现,并且用反射检查字段类型是否是Catalyst允许的,见ScalaReflection. [java] view plain copy defregisterFunction[T:TypeTag](name:String,func:Function1[_,T]):Unit={ defbuilder(e:Seq[Expression])=ScalaUdf(func,ScalaReflection.schemaFor(typeTag[T]).dataType,e)//构造Expression functionRegistry.registerFunction(name,builder)//向SQLContext的functionRegistry(维护了一个hashMap来管理udf映射)注册 2.2 注册Function: 注意:这里FunctionBuilder是一个type FunctionBuilder = Seq[Expression] => Expression [java] view plain copy classSimpleFunctionRegistryextendsFunctionRegistry{ valfunctionBuilders=newmutable.HashMap[String,FunctionBuilder]()//udf映射关系维护[udfName,Expression] defregisterFunction(name:String,builder:FunctionBuilder)={//putexpression进Map functionBuilders.put(name,builder) } overridedeflookupFunction(name:String,children:Seq[Expression]):Expression={ functionBuilders(name)(children)//查找udf,返回Expression } } 至此,我们将一个scala function注册为一个catalyst的一个Expression,这就是spark的simple udf。 三、UDF计算: UDF既然已经被封装为catalyst树里的一个Expression节点,那么计算的时候也就是计算ScalaUdf的eval方法。 先通过Row和表达式计算function所需要的参数,最后通过反射调用function,来达到计算udf的目的。 ScalaUdf继承自Expression: scalaUdf接受一个function, dataType,和一系列表达式。 比较简单,看注释即可: [java] view plain copy caseclassScalaUdf(function:AnyRef,dataType:DataType,children:Seq[Expression]) extendsExpression{ typeEvaluatedType=Any defnullable=true overridedeftoString=s"scalaUDF(${children.mkString(",")})" overridedefeval(input:Row):Any={ valresult=children.sizematch{ case0=>function.asInstanceOf[()=>Any]() case1=>function.asInstanceOf[(Any)=>Any](children(0).eval(input))//反射调用function case2=> function.asInstanceOf[(Any,Any)=>Any]( children(0).eval(input),//表达式参数计算 children(1).eval(input)) case3=> function.asInstanceOf[(Any,Any,Any)=>Any]( children(0).eval(input), children(1).eval(input), children(2).eval(input)) case4=> ...... case22=>//scalafunction只支持22个参数,这里枚举了。 function.asInstanceOf[(Any,Any,Any,Any,Any,Any,Any,Any,Any,Any,Any,Any,Any,Any,Any,Any,Any,Any,Any,Any,Any,Any)=>Any]( children(0).eval(input), children(1).eval(input), children(2).eval(input), children(3).eval(input), children(4).eval(input), children(5).eval(input), children(6).eval(input), children(7).eval(input), children(8).eval(input), children(9).eval(input), children(10).eval(input), children(11).eval(input), children(12).eval(input), children(13).eval(input), children(14).eval(input), children(15).eval(input), children(16).eval(input), children(17).eval(input), children(18).eval(input), children(19).eval(input), children(20).eval(input), children(21).eval(input)) 四、总结 Spark目前的UDF其实就是scala function。将scala function封装到一个Catalyst Expression当中,在进行sql计算时,使用同样的Eval方法对当前输入Row进行计算。 编写一个spark udf非常简单,只需给UDF起个函数名,并且传递一个scala function即可。依靠scala函数编程的表现能力,使得编写scala udf比较简单,且相较hive的udf更容易使人理解。 ——EOF—— 原创文章,转载请注明: 转载自:OopsOutOfMemory盛利的Blog,作者:OopsOutOfMemory 本文链接地址:http://blog.csdn.net/oopsoom/article/details/39395641 注:本文基于署名-非商业性使用-禁止演绎 2.5 中国大陆(CC BY-NC-ND 2.5 CN)协议,欢迎转载、转发和评论,但是请保留本文作者署名和文章链接。如若需要用于商业目的或者与授权方面的协商,请联系我。

优秀的个人博客,低调大师

Spark-SparkSQL深入学习系列九(转自OopsOutOfMemory)

/**Spark SQL源码分析系列文章*/ SparkSQL 可以将数据缓存到内存中,我们可以见到的通过调用cache table tableName即可将一张表缓存到内存中,来极大的提高查询效率。 这就涉及到内存中的数据的存储形式,我们知道基于关系型的数据可以存储为基于行存储结构 或 者基于列存储结构,或者基于行和列的混合存储,即Row Based Storage、Column Based Storage、 PAX Storage。 Spark SQL 的内存数据是如何组织的? Spark SQL 将数据加载到内存是以列的存储结构。称为In-Memory Columnar Storage。 若直接存储JavaObject 会产生很大的内存开销,并且这样是基于Row的存储结构。查询某些列速度略慢,虽然数据以及载入内存,查询效率还是低于面向列的存储结构。 基于Row的Java Object存储: 内存开销大,且容易FULL GC,按列查询比较慢。 基于Column的ByteBuffer存储(Spark SQL): 内存开销小,按列查询速度较快。 Spark SQL的In-Memory Columnar Storage是位于spark列下面org.apache.spark.sql.columnar包内: 核心的类有 ColumnBuilder, InMemoryColumnarTableScan, ColumnAccessor, ColumnType. 如果列有压缩的情况:compression包下面有具体的build列和access列的类。 一、引子 当我们调用spark sql 里的cache table command时,会生成一CacheCommand,这个Command是一个物理计划。 [java] view plain copy scala>valcached=sql("cachetablesrc") [java] view plain copy cached:org.apache.spark.sql.SchemaRDD= SchemaRDD[0]atRDDatSchemaRDD.scala:103 ==QueryPlan== ==PhysicalPlan== CacheCommandsrc,true 这里打印出来tableName是src, 和一个是否要cache的boolean flag. 我们看下CacheCommand的构造: CacheCommand支持2种操作,一种是把数据源加载带内存中,一种是将数据源从内存中卸载。 对应于SQLContext下的cacheTable和uncacheTabele。 [java] view plain copy caseclassCacheCommand(tableName:String,doCache:Boolean)(@transientcontext:SQLContext) extendsLeafNodewithCommand{ overrideprotected[sql]lazyvalsideEffectResult={ if(doCache){ context.cacheTable(tableName)//缓存表到内存 }else{ context.uncacheTable(tableName)//从内存中移除该表的数据 } Seq.empty[Any] } overridedefexecute():RDD[Row]={ sideEffectResult context.emptyResult } overridedefoutput:Seq[Attribute]=Seq.empty } 如果调用cached.collect(),则会根据Command命令来执行cache或者uncache操作,这里我们执行cache操作。 cached.collect()将会调用SQLContext下的cacheTable函数: 首先通过catalog查询关系,构造一个SchemaRDD。 [java] view plain copy /**ReturnsthespecifiedtableasaSchemaRDD*/ deftable(tableName:String):SchemaRDD= newSchemaRDD(this,catalog.lookupRelation(None,tableName)) 找到该Schema的analyzed计划。匹配构造InMemoryRelation: [java] view plain copy /**Cachesthespecifiedtablein-memory.*/ defcacheTable(tableName:String):Unit={ valcurrentTable=table(tableName).queryExecution.analyzed//构造schemaRDD并将其执行analyze计划操作 valasInMemoryRelation=currentTablematch{ case_:InMemoryRelation=>//如果已经是InMemoryRelation,则返回 currentTable.logicalPlan case_=>//如果不是(默认刚刚cache的时候是空的)则构建一个内存关系InMemoryRelation InMemoryRelation(useCompression,columnBatchSize,executePlan(currentTable).executedPlan) } //将构建好的InMemoryRelation注册到catalog里。 catalog.registerTable(None,tableName,asInMemoryRelation) } 二、InMemoryRelation InMemoryRelation继承自LogicalPlan,是Spark1.1 Spark SQL里新添加的一种TreeNode,也是catalyst里的一种plan. 现在TreeNode变成了4种: 1、BinaryNode 二元节点 2、LeafNode 叶子节点 3、UnaryNode 单孩子节点 4、InMemoryRelation 内存关系型节点 类图如下: 值得注意的是,_cachedColumnBuffers这个类型为RDD[Array[ByteBuffer]]的私有字段。 这个封装就是面向列的存储ByteBuffer。前面提到相较于plain java object存储记录,用ByteBuffer能显著的提高存储效率,减少内存占用。并且按列查询的速度会非常快。 InMemoryRelation具体实现如下: 构造一个InMemoryRelation需要该Relation的output Attributes,是否需要useCoompression来压缩,默认为false,一次处理的多少行数据batchSize, child 即SparkPlan。 [java] view plain copy private[sql]caseclassInMemoryRelation( output:Seq[Attribute],//输出属性,比如src表里就是[key,value] useCompression:Boolean,//操作时是否使用压缩,默认false batchSize:Int,//批的大小量 child:SparkPlan)//sparkplan具体child 可以通过设置: spark.sql.inMemoryColumnarStorage.compressed 为true来设置内存中的列存储是否需要压缩。 spark.sql.inMemoryColumnarStorage.batchSize 来设置一次处理多少row spark.sql.defaultSizeInBytes 来设置初始化的column的bufferbytes的默认大小,这里只是其中一个参数。 这些参数都可以在源码中设置,都在SQL Conf [java] view plain copy private[spark]objectSQLConf{ valCOMPRESS_CACHED="spark.sql.inMemoryColumnarStorage.compressed" valCOLUMN_BATCH_SIZE="spark.sql.inMemoryColumnarStorage.batchSize" valDEFAULT_SIZE_IN_BYTES="spark.sql.defaultSizeInBytes" 再回到case class InMemoryRelation: _cachedColumnBuffers就是我们最终将table放入内存的存储句柄,是一个RDD[Array[ByteBuffer]。 缓存主流程: 1、判断_cachedColumnBuffers是否为null,如果不是null,则已经Cache了当前table,重复cache不会触发cache操作。 2、child是SparkPlan,即执行hive table scan,测试我拿sbt/sbt hive/console里test里的src table为例,操作是扫描这张表。这个表有2个字的key是int, value 是string 3、拿到child的output, 这里的output就是 key, value2个列。 4、执行mapPartitions操作,对当前RDD的每个分区的数据进行操作。 5、对于每一个分区,迭代里面的数据生成新的Iterator。每个Iterator里面是Array[ByteBuffer] 6、对于child.output的每一列,都会生成一个ColumnBuilder,最后组合为一个columnBuilders是一个数组。 7、数组内每个CommandBuilder持有一个ByteBuffer 8、遍历原始分区的记录,将对于的行转为列,并将数据存到ByteBuffer内。 9、最后将此RDD调用cache方法,将RDD缓存。 10、将cached赋给_cachedColumnBuffers。 此操作总结下来是:执行hive table scan操作,返回的MapPartitionsRDD对其重新定义mapPartition方法,将其行转列,并且最终cache到内存中。 所有流程如下: [java] view plain copy //Ifthecachedcolumnbufferswerenotpassedin,wecalculatethemintheconstructor. //AsinSpark,theactualworkofcachingislazy. if(_cachedColumnBuffers==null){//判断是否已经cache了当前table valoutput=child.output /** *child.output res65:Seq[org.apache.spark.sql.catalyst.expressions.Attribute]=ArrayBuffer(key#6,value#7) */ valcached=child.execute().mapPartitions{baseIterator=> /** *child.execute()是Row的集合,迭代Row *res66:Array[org.apache.spark.sql.catalyst.expressions.Row]=Array([238,val_238]) * *valrow1=child.execute().take(1) *res67:Array[org.apache.spark.sql.catalyst.expressions.Row]=Array([238,val_238]) **/ /* *对每个Partition进行map,映射生成一个Iterator[Array[ByteBuffer],对应java的Iterator<List<ByteBuffer>> **/ newIterator[Array[ByteBuffer]]{ defnext()={ //遍历每一列,首先attribute是key为IntegerType,然后attribute是value是String //最后封装成一个Array,index0是IntColumnBuilder,1是StringColumnBuilder valcolumnBuilders=output.map{attribute=> valcolumnType=ColumnType(attribute.dataType) valinitialBufferSize=columnType.defaultSize*batchSize ColumnBuilder(columnType.typeId,initialBufferSize,attribute.name,useCompression) }.toArray //src表里Row是[238,val_238]这行Row的length就是2 varrow:Row=null varrowCount=0 //batchSize默认1000 while(baseIterator.hasNext&&rowCount<batchSize){ //遍历每一条记录 row=baseIterator.next() vari=0 //这里rowlength是2,i的取值是0和1 while(i<row.length){ //获取columnBuilders,0是IntColumnBuilder, //BasicColumnBuilder的appendFrom //Appends`row(ordinal)`tothecolumnbuilder. columnBuilders(i).appendFrom(row,i) i+=1 } //该行已经插入完毕 rowCount+=1 } //limitandrewind,Returnsthefinalcolumnarbytebuffer. columnBuilders.map(_.build()) } defhasNext=baseIterator.hasNext } }.cache() cached.setName(child.toString) _cachedColumnBuffers=cached } 三、Columnar Storage 初始化ColumnBuilders: [java] view plain copy valcolumnBuilders=output.map{attribute=> valcolumnType=ColumnType(attribute.dataType) valinitialBufferSize=columnType.defaultSize*batchSize ColumnBuilder(columnType.typeId,initialBufferSize,attribute.name,useCompression) }.toArray 这里会声明一个数组,来对应每一列的存储,如下图: 然后初始化类型builder的时候会传入的参数: initialBufferSize:文章开头的图中会有ByteBuffer,ByteBuffer的初始化大小是如何计算的? initialBufferSize = 列类型默认长度 × batchSize ,默认batchSize是1000 拿Int类型举例,initialBufferSize of IntegerType = 4 * 1000 attribute.name即字段名age,name etc。。。 ColumnType: ColumnType封装了 该类型的 typeId 和 该类型的 defaultSize。并且提供了extract、append\getField方法,来向buffer里追加和获取数据。 如IntegerType typeId 为0, defaultSize 4 ...... 详细看下类图,画的不是非常严格的类图,主要为了展示目前类型系统: ColumnBuilder: ColumnBuilder的主要职责是:管理ByteBuffer,包括初始化buffer,添加数据到buffer内,检查剩余空间,和申请新的空间这几项主要职责。 initialize负责初始化buffer。 appendFrom是负责添加数据。 ensureFreeSpace确保buffer的长度动态增加。 类图如下: ByteBuffer的初始化过程: 初始化大小initialSize:拿Int举例,在前面builder初始化传入的是4×batchSize=4*1000,initialSize也就是4KB,如果没有传入initialSize,则默认是1024×1024。 列名称,是否需要压缩,都是需要传入的。 ByteBuffer声明时预留了4个字节,为了放column type id,这个在ColumnType的构造里有介绍过。 [java] view plain copy overridedefinitialize( initialSize:Int, columnName:String="", useCompression:Boolean=false)={ valsize=if(initialSize==0)DEFAULT_INITIAL_BUFFER_SIZEelseinitialSize//如果没有默认1024×1024byte this.columnName=columnName //Reserves4bytesforcolumntypeID buffer=ByteBuffer.allocate(4+size*columnType.defaultSize)//buffer的初始化长度,需要加上4byte类型ID空间。 buffer.order(ByteOrder.nativeOrder()).putInt(columnType.typeId)//根据nativeOrder排序,然后首先放入typeId } 存储的方式如下: Int的type id 是0, string的 type id 是 7. 后面就是实际存储的数据了。 ByteBuffer写入过程: 存储结构都介绍完毕,最后开始对Table进行scan了,scan后对每一个分区的每个Row进行操作遍历: 1、读每个分区的每条Row 2、获取每个列的值,从builders数组里找到索引 i 对应的bytebuffer,追加至bytebuffer。 [java] view plain copy while(baseIterator.hasNext&&rowCount<batchSize){ //遍历每一条记录 row=baseIterator.next() vari=0 //这里rowlength是2,i的取值是0和1Ps:还是拿srctable做测试,每一个Row只有2个字段,key,value所有长度为2 while(i<row.length){ //获取columnBuilders,0是IntColumnBuilder, //BasicColumnBuilder的appendFrom //Appends`row(ordinal)`tothecolumnbuilder. columnBuilders(i).appendFrom(row,i)//追加到对应的bytebuffer i+=1 } //该行已经插入完毕 rowCount+=1 } //limitandrewind,Returnsthefinalcolumnarbytebuffer. columnBuilders.map(_.build()) 追加过程: 根据当前builder的类型,从row的对应索引中取出值,最后追加到builder的bytebuffer内。 [java] view plain copy overridedefappendFrom(row:Row,ordinal:Int){ //ordinal是Row的index,0就是第一列值,1就是第二列值,获取列的值为field //最后在将该列的值put到该buffer内 valfield=columnType.getField(row,ordinal) buffer=ensureFreeSpace(buffer,columnType.actualSize(field))//动态扩容 columnType.append(field,buffer) } ensureFreeSpace: 主要是操作buffer,如果要追加的数据大于剩余空间,就扩大buffer。 [java] view plain copy //确保剩余空间能容下,如果剩余空间小于要放入的大小,则重新分配一看内存空间 private[columnar]defensureFreeSpace(orig:ByteBuffer,size:Int)={ if(orig.remaining>=size){//当前buffer剩余空间比要追加的数据大,则什么都不做,返回自身 orig }else{//否则扩容 //growinstepsofinitialsize valcapacity=orig.capacity() valnewSize=capacity+size.max(capacity/8+1) valpos=orig.position() orig.clear() ByteBuffer .allocate(newSize) .order(ByteOrder.nativeOrder()) .put(orig.array(),0,pos) } } ...... 最后调用MapPartitionsRDD.cache(),将该RDD缓存并添加到spark cache管理中。 至此,我们将一张spark sql table缓存到了spark的jvm中。 四、总结 对于数据的存储结构,我们常常关注持久化的存储结构,并且在长久时间内有了很多种高效结构。 但是在实时性的要求下,内存数据库越来越被关注,如何优化内存数据库的存储结构,是一个重点,也是一个难点。 对于Spark SQL 和 Shark 里的列存储 是一种优化方案,提高了关系查询中列查询的速度,和减少了内存占用。但是中存储方式还是比较简单的,没有额外的元数据和索引来提高查询效率,希望以后能了解到更多的In-Memory Storage。 ——EOF—— 创文章,转载请注明: 转载自:OopsOutOfMemory盛利的Blog,作者:OopsOutOfMemory 本文链接地址:http://blog.csdn.net/oopsoom/article/details/39525483 注:本文基于署名-非商业性使用-禁止演绎 2.5 中国大陆(CC BY-NC-ND 2.5 CN)协议,欢迎转载、转发和评论,但是请保留本文作者署名和文章链接。如若需要用于商业目的或者与授权方面的协商,请联系我。

优秀的个人博客,低调大师

Spark-SparkSQL深入学习系列十一(转自OopsOutOfMemory)

上周Spark1.2刚发布,周末在家没事,把这个特性给了解一下,顺便分析下源码,看一看这个特性是如何设计及实现的。 /**SparkSQL源码分析系列文章*/ (Ps: External DataSource使用篇地址:Spark SQL之External DataSource外部数据源(一)示例http://blog.csdn.net/oopsoom/article/details/42061077) 一、Sources包核心 Spark SQL在Spark1.2中提供了External DataSource API,开发者可以根据接口来实现自己的外部数据源,如avro, csv, json, parquet等等。 在Spark SQL源代码的org/spark/sql/sources目录下,我们会看到关于External DataSource的相关代码。这里特别介绍几个: 1、DDLParser 专门负责解析外部数据源SQL的SqlParser,解析createtemporarytable xxx using options (key 'value', key 'value') 创建加载外部数据源表的语句。 [java] view plain copy protectedlazyvalcreateTable:Parser[LogicalPlan]= CREATE~TEMPORARY~TABLE~>ident~(USING~>className)~(OPTIONS~>options)^^{ casetableName~provider~opts=> CreateTableUsing(tableName,provider,opts) } 2、CreateTableUsing 一个RunnableCommand,通过反射从外部数据源lib中实例化Relation,然后注册到为temp table。 [java] view plain copy private[sql]caseclassCreateTableUsing( tableName:String, provider:String,//org.apache.spark.sql.json options:Map[String,String])extendsRunnableCommand{ defrun(sqlContext:SQLContext)={ valloader=Utils.getContextOrSparkClassLoader valclazz:Class[_]=tryloader.loadClass(provider)catch{//doreflection casecnf:java.lang.ClassNotFoundException=> tryloader.loadClass(provider+".DefaultSource")catch{ casecnf:java.lang.ClassNotFoundException=> sys.error(s"Failedtoloadclassfordatasource:$provider") } } valdataSource=clazz.newInstance().asInstanceOf[org.apache.spark.sql.sources.RelationProvider]//json包DefaultDataSource valrelation=dataSource.createRelation(sqlContext,newCaseInsensitiveMap(options))//创建JsonRelation sqlContext.baseRelationToSchemaRDD(relation).registerTempTable(tableName)//注册 Seq.empty } } 2、DataSourcesStrategy 在 Strategy 一文中,我已讲过Streategy的作用,用来Plan生成物理计划的。这里提供了一种专门为了解析外部数据源的策略。 最后会根据不同的BaseRelation生产不同的PhysicalRDD。不同的BaseRelation的scan策略下文会介绍。 [java] view plain copy private[sql]objectDataSourceStrategyextendsStrategy{ defapply(plan:LogicalPlan):Seq[SparkPlan]=planmatch{ casePhysicalOperation(projectList,filters,l@LogicalRelation(t:CatalystScan))=> pruneFilterProjectRaw( l, projectList, filters, (a,f)=>t.buildScan(a,f))::Nil ...... casel@LogicalRelation(t:TableScan)=> execution.PhysicalRDD(l.output,t.buildScan())::Nil case_=>Nil } 3、interfaces.scala 该文件定义了一系列可扩展的外部数据源接口,对于想要接入的外部数据源,我们只需实现该接口即可。里面比较重要的traitRelationProvider 和BaseRelation,下文会详细介绍。 4、filters.scala 该Filter定义了如何在加载外部数据源的时候,就进行过滤。注意哦,是加载外部数据源到Table里的时候,而不是Spark里进行filter。这个有点像hbase的coprocessor,查询过滤在Server上就做了,不在Client端做过滤。 5、LogicalRelation 封装了baseRelation,继承了catalyst的LeafNode,实现MultiInstanceRelation。 二、External DataSource注册流程 用spark sql下sql/json来做示例, 画了一张流程图,如下: 注册外部数据源的表的流程: 1、提供一个外部数据源文件,比如json文件。 2、提供一个实现了外部数据源所需要的interfaces的类库,比如sql下得json包,在1.2版本后改为了External Datasource实现。 3、引入SQLContext,使用DDL创建表,如createtemporarytable xxx using options (key 'value', key 'value') 4、External Datasource的DDLParser将对该SQL进行Parse 5、Parse后封装成为一个CreateTableUsing类的对象。该类是一个RunnableCommand,其run方法会直接执行创建表语句。 6、该类会通过反射来创建一个org.apache.spark.sql.sources.RelationProvider,该trait定义要createRelation,如json,则创建JSONRelation,若avro,则创建AvroRelation。 7、得到external releation后,直接调用SQLContext的baseRelationToSchemaRDD转换为SchemaRDD 8、最后registerTempTable(tableName) 来注册为Table,可以用SQL来查询了。 三、External DataSource解析流程 先看图,图如下: Spark SQL解析SQL流程如下: 1、Analyzer通过Rule解析,将UnresolvedRelation解析为JsonRelation。 2、通过Parse,Analyzer,Optimizer最后得到JSONRelation(file:///path/to/shengli.json,1.0) 3、通过sources下得DataSourceStrategy将LogicalPlan映射到物理计划PhysicalRDD。 4、PhysicalRDD里包含了如何查询外部数据的规则,可以调用execute()方法来执行Spark查询。 四、External Datasource Interfaces 在第一节我已经介绍过,主要的interfaces,主要看一下BaseRelation和RelationProvider。 如果我们要实现一个外部数据源,比如avro数据源,支持spark sql操作avro file。那么久必须定义AvroRelation来继承BaseRelation。同时也要实现一个RelationProvider。 BaseRelation: 是外部数据源的抽象,里面存放了 schema的映射,和 如何scan数据的规则。 [java] view plain copy abstractclassBaseRelation{ defsqlContext:SQLContext defschema:StructType [java] view plain copy abstractclassPrunedFilteredScanextendsBaseRelation{ defbuildScan(requiredColumns:Array[String],filters:Array[Filter]):RDD[Row] } 1、schema我们如果自定义Relation,必须重写schema,就是我们必须描述对于外部数据源的Schema。 2、buildScan我们定义如何查询外部数据源,提供了4种Scan的策略,对应4种BaseRelation。 我们支持4种BaseRelation,分为TableScan, PrunedScan,PrunedFilterScan,CatalystScan。 1、 TableScan: 默认的Scan策略。 2、 PrunedScan: 这里可以传入指定的列,requiredColumns,列裁剪,不需要的列不会从外部数据源加载。 3、 PrunedFilterScan: 在列裁剪的基础上,并且加入Filter机制,在加载数据也的时候就进行过滤,而不是在客户端请求返回时做Filter。 4、 CatalystScan: Catalyst的支持传入expressions来进行Scan。支持列裁剪和Filter。 RelationProvider: 我们要实现这个,接受Parse后传入的参数,来生成对应的External Relation,就是一个反射生产外部数据源Relation的接口。 [java] view plain copy traitRelationProvider{ /** *Returnsanewbaserelationwiththegivenparameters. *Note:theparameters'keywordsarecaseinsensitiveandthisinsensitivityisenforced *bytheMapthatispassedtothefunction. */ defcreateRelation(sqlContext:SQLContext,parameters:Map[String,String]):BaseRelation } 五、External Datasource定义示例 在Spark1.2之后,json和parquet也改为通过实现External API来进行外部数据源查询的。 下面以json的外部数据源定义为示例,说明是如何实现的: 1、JsonRelation 定义处理对于json文件的,schema和Scan策略,均基于JsonRDD,细节可以自行阅读JsonRDD。 [java] view plain copy private[sql]caseclassJSONRelation(fileName:String,samplingRatio:Double)( @transientvalsqlContext:SQLContext) extendsTableScan{ privatedefbaseRDD=sqlContext.sparkContext.textFile(fileName)//读取jsonfile overridevalschema= JsonRDD.inferSchema(//jsonRDD的inferSchema方法,能自动识别json的schema,和类型type。 baseRDD, samplingRatio, sqlContext.columnNameOfCorruptRecord) overridedefbuildScan()= JsonRDD.jsonStringToRow(baseRDD,schema,sqlContext.columnNameOfCorruptRecord)//这里还是JsonRDD,调用jsonStringToRow查询返回Row } 2、DefaultSource parameters中可以获取到options中传入的path等自定义参数。 这里接受传入的参数,来狗仔JsonRelation。 [java] view plain copy private[sql]classDefaultSourceextendsRelationProvider{ /**Returnsanewbaserelationwiththegivenparameters.*/ overridedefcreateRelation( sqlContext:SQLContext, parameters:Map[String,String]):BaseRelation={ valfileName=parameters.getOrElse("path",sys.error("Option'path'notspecified")) valsamplingRatio=parameters.get("samplingRatio").map(_.toDouble).getOrElse(1.0) JSONRelation(fileName,samplingRatio)(sqlContext) } } 六、总结 External DataSource源码分析下来,可以总结为3部分。 1、外部数据源的注册流程 2、外部数据源Table查询的计划解析流程 3、如何自定义一个外部数据源,重写BaseRelation定义外部数据源的schema和scan的规则。定义RelationProvider,如何生成外部数据源Relation。 External Datasource此部分API还有可能在后续的build中改动,目前只是涉及到了查询,关于其它的操作还未涉及。 ——EOF—— 原创文章,转载请注明: 转载自:OopsOutOfMemory盛利的Blog,作者:OopsOutOfMemory 本文链接地址:http://blog.csdn.net/oopsoom/article/details/42064075 注:本文基于署名-非商业性使用-禁止演绎 2.5 中国大陆(CC BY-NC-ND 2.5 CN)协议,欢迎转载、转发和评论,但是请保留本文作者署名和文章链接。如若需要用于商业目的或者与授权方面的协商,请联系我。

优秀的个人博客,低调大师

Spark-SparkSQL深入学习系列十(转自OopsOutOfMemory)

/**SparkSQL源码分析系列文章*/ 前面讲到了Spark SQL In-Memory Columnar Storage的存储结构是基于列存储的。 那么基于以上存储结构,我们查询cache在jvm内的数据又是如何查询的,本文将揭示查询In-Memory Data的方式。 一、引子 本例使用hive console里查询cache后的src表。 select value from src 当我们将src表cache到了内存后,再次查询src,可以通过analyzed执行计划来观察内部调用。 即parse后,会形成InMemoryRelation结点,最后执行物理计划时,会调用InMemoryColumnarTableScan这个结点的方法。 如下: [java] view plain copy scala>valexe=executePlan(sql("selectvaluefromsrc").queryExecution.analyzed) 14/09/2610:30:26INFOparse.ParseDriver:Parsingcommand:selectvaluefromsrc 14/09/2610:30:26INFOparse.ParseDriver:ParseCompleted exe:org.apache.spark.sql.hive.test.TestHive.QueryExecution= ==ParsedLogicalPlan== Project[value#5] InMemoryRelation[key#4,value#5],false,1000,(HiveTableScan[key#4,value#5],(MetastoreRelationdefault,src,None),None) ==AnalyzedLogicalPlan== Project[value#5] InMemoryRelation[key#4,value#5],false,1000,(HiveTableScan[key#4,value#5],(MetastoreRelationdefault,src,None),None) ==OptimizedLogicalPlan== Project[value#5] InMemoryRelation[key#4,value#5],false,1000,(HiveTableScan[key#4,value#5],(MetastoreRelationdefault,src,None),None) ==PhysicalPlan== InMemoryColumnarTableScan[value#5],(InMemoryRelation[key#4,value#5],false,1000,(HiveTableScan[key#4,value#5],(MetastoreRelationdefault,src,None),None))//查询内存中表的入口 CodeGeneration:false ==RDD== 二、InMemoryColumnarTableScan InMemoryColumnarTableScan是Catalyst里的一个叶子结点,包含了要查询的attributes,和InMemoryRelation(封装了我们缓存的In-Columnar Storage数据结构)。 执行叶子节点,出发execute方法对内存数据进行查询。 1、查询时,调用InMemoryRelation,对其封装的内存数据结构的每个分区进行操作。 2、获取要请求的attributes,如上,查询请求的是src表的value属性。 3、根据目的查询表达式,来获取在对应存储结构中,请求列的index索引。 4、通过ColumnAccessor来对每个buffer进行访问,获取对应查询数据,并封装为Row对象返回。 [java] view plain copy private[sql]caseclassInMemoryColumnarTableScan( attributes:Seq[Attribute], relation:InMemoryRelation) extendsLeafNode{ overridedefoutput:Seq[Attribute]=attributes overridedefexecute()={ relation.cachedColumnBuffers.mapPartitions{iterator=> //Findtheordinalsoftherequestedcolumns.Ifnonearerequested,usethefirst. valrequestedColumns=if(attributes.isEmpty){ Seq(0) }else{ attributes.map(a=>relation.output.indexWhere(_.exprId==a.exprId))//根据表达式exprId找出对应列的ByteBuffer的索引 } iterator .map(batch=>requestedColumns.map(batch(_)).map(ColumnAccessor(_)))//根据索引取得对应请求列的ByteBuffer,并封装为ColumnAccessor。 .flatMap{columnAccessors=> valnextRow=newGenericMutableRow(columnAccessors.length)//Row的长度 newIterator[Row]{ overridedefnext()={ vari=0 while(i<nextRow.length){ columnAccessors(i).extractTo(nextRow,i)//根据对应index和长度,从byterbuffer里取得值,封装到row里 i+=1 } nextRow } overridedefhasNext=columnAccessors.head.hasNext } } } } } 查询请求的列,如下: [java] view plain copy scala>exe.optimizedPlan res93:org.apache.spark.sql.catalyst.plans.logical.LogicalPlan= Project[value#5] InMemoryRelation[key#4,value#5],false,1000,(HiveTableScan[key#4,value#5],(MetastoreRelationdefault,src,None),None) scala>valrelation=exe.optimizedPlan(1) relation:org.apache.spark.sql.catalyst.plans.logical.LogicalPlan= InMemoryRelation[key#4,value#5],false,1000,(HiveTableScan[key#4,value#5],(MetastoreRelationdefault,src,None),None) scala>valrequest_relation=exe.executedPlan request_relation:org.apache.spark.sql.execution.SparkPlan= InMemoryColumnarTableScan[value#5],(InMemoryRelation[key#4,value#5],false,1000,(HiveTableScan[key#4,value#5],(MetastoreRelationdefault,src,None),None)) scala>request_relation.output//请求的列,我们请求的只有value列 res95:Seq[org.apache.spark.sql.catalyst.expressions.Attribute]=ArrayBuffer(value#5) scala>relation.output//默认保存在relation中的所有列 res96:Seq[org.apache.spark.sql.catalyst.expressions.Attribute]=ArrayBuffer(key#4,value#5) scala>valattributes=request_relation.output attributes:Seq[org.apache.spark.sql.catalyst.expressions.Attribute]=ArrayBuffer(value#5) 整个流程很简洁,关键步骤是第三步。根据ExprId来查找到,请求列的索引 attributes.map(a => relation.output.indexWhere(_.exprId == a.exprId)) [java] view plain copy //根据exprId找出对应ID scala>valattr_index=attributes.map(a=>relation.output.indexWhere(_.exprId==a.exprId)) attr_index:Seq[Int]=ArrayBuffer(1)//找到请求的列value的索引是1,我们查询就从Index为1的bytebuffer中,请求数据 scala>relation.output.foreach(e=>println(e.exprId)) ExprId(4)//对应<spanstyle="font-family:Arial,Helvetica,sans-serif;">[key#4,value#5]</span> ExprId(5) scala>request_relation.output.foreach(e=>println(e.exprId)) ExprId(5) 三、ColumnAccessor ColumnAccessor对应每一种类型,类图如下: 最后返回一个新的迭代器: [java] view plain copy newIterator[Row]{ overridedefnext()={ vari=0 while(i<nextRow.length){//请求列的长度 columnAccessors(i).extractTo(nextRow,i)//调用columnType.setField(row,ordinal,extractSingle(buffer))解析buffer i+=1 } nextRow//返回解析后的row } overridedefhasNext=columnAccessors.head.hasNext } 四、总结 Spark SQLIn-Memory Columnar Storage的查询相对来说还是比较简单的,其查询思想主要和存储的数据结构有关。 即存储时,按每列放到一个bytebuffer,形成一个bytebuffer数组。 查询时,根据请求列的exprId查找到上述数组的索引,然后使用ColumnAccessor对buffer中字段进行解析,最后封装为Row对象,返回。 ——EOF—— 创文章,转载请注明: 转载自:OopsOutOfMemory盛利的Blog,作者:OopsOutOfMemory 本文链接地址:http://blog.csdn.net/oopsoom/article/details/39577419 注:本文基于署名-非商业性使用-禁止演绎 2.5 中国大陆(CC BY-NC-ND 2.5 CN)协议,欢迎转载、转发和评论,但是请保留本文作者署名和文章链接。如若需要用于商业目的或者与授权方面的协商,请联系我。

优秀的个人博客,低调大师

学习Docker容器网络模型 - 搭建分布式Zookeeper集群

ZooKeeper是一个流行的分布式协调服务。它提供了两种不同的部署方式:单机模式和分布式模式。其中单机模式的部署非常简单,网上也有很多资料,我们今天会利用Docker搭建分布式Zookeeper集群,并来帮助大家熟悉Docker中容器网络模型的使用。 ZooKeeper集群中所有的节点作为一个整体对分布式应用提供服务。节点中有两个的角色:Leader和Follower。在整个集群运行过程中,只有一个Leader,其他节点的都是Follower,如果ZK集群在运行过程中Leader出了问题,系统会采用选举算法重新在集群节点选出一个Leader。 Zookeeper节点之间是利用点对点的方式互相联结在一起的,这样的点对点部署方式对利用Docker容器搭建ZK集群提出了挑战。这是因为Zookeeper集群中每个节点需要在启动之前获得集群中

优秀的个人博客,低调大师

Mahout分类算法学习之实现Naive Bayes分类示例

1.简介 (1) 贝叶斯分类器的分类原理发源于古典概率理论,是通过某对象的先验概率,利用贝叶斯公式计算出其后验概率,即该对象属于某一类的概率,选择具有最大后验概率的类作为该对象所属的类。朴素贝叶斯分类器(Naive Bayes Classifier)做了一个简单的假定:给定目标值时属性之间相互条件独立,即给定元组的类标号,假定属性值有条件地相互独立,即在属性间不存在依赖关系。朴素贝叶斯分类模型所需估计的参数很少,对缺失数据不太敏感,算法也比较简单。 (2) Mahout 实现了Traditional Naive Bayes 和Complementary Naive Bayes,后者是在前者的基础上增加了结果分析功能(Result Analyzer). (3) 主要相关的Mahout类: org.apache.mahout.classifier.naivebayes.NaiveBayesModel org.apache.mahout.classifier.naivebayes.StandardNaiveBayesClassifier org.apache.mahout.classifier.naivebayes.ComplementaryNaiveBayesClassifier 2.数据 使用20 newsgroups data (http://people.csail.mit.edu/jrennie/20Newsgroups/20news-bydate.tar.gz) ,数据集按时间分为训练数据和测试数据,总大小约为85MB,每个数据文件为一条信息,文件头部几行指定消息的发送者、长度、类型、使用软件,以及主题等,然后用空行将其与正文隔开,正文没有固定的格式。 3.目标 根据新闻文档内容,将其分到不同的文档类型中。 4.程序 使用Mahout自带示例程序,主要的训练类和测试类分别为TrainNaiveBayesJob.java和TestNaiveBayesDriver.java,JAR包为mahout-core-0.7-job.jar,详细代码见(mahout-distribution-0.7/core/src/main/java/org/apache/mahout/classifier/naivebayes/trainning,mahout-distribution-0.7/core/src/main/java/org/apache/mahout/classifier/naivebayes/test). 5.步骤 (1) 数据准备 ①将20news-bydate.tar.gz解压,并将20news-bydate中的所有子文夹中的内容复制到20news-all中,该步骤已经完成,20news-all文件夹存放在hdfs:/share/data/ Mahout_examples_Data_Set中 ②将20news-all放在hdfs的用户根目录下 user@hadoop:~/workspace$hadoop dfs -cp /share/data/Mahout_examples_Data_Set/20news-all . ③从20newsgroups data创建序列文件(sequence files) user@hadoop:~/workspace$mahout seqdirectory -i 20news-all -o 20news-seq ④将序列文件转化为向量 user@hadoop:~/workspace$mahout seq2sparse -i ./20news-seq -o ./20news-vectors -lnorm -nv -wt tfidf ⑤将向量数据集分为训练数据和检测数据,以随机40-60拆分 user@hadoop:~/workspace$mahout split -i ./20news-vectors/tfidf-vectors --trainingOutput ./20news-train-vectors --testOutput ./20news-test-vectors --randomSelectionPct 40 --overwrite --sequenceFiles -xm sequential (2)训练朴素贝叶斯模型 user@hadoop:~/workspace$mahout trainnb -i ./20news-train-vectors -el -o ./model -li ./labelindex -ow -c (3)检验朴素贝叶斯模型 user@hadoop:~/workspace$mahout testnb -i ./20news-train-vectors -m ./model -l ./labelindex -ow -o 20news-testing -c 结果如下: (4)检测模型分类效果 user@hadoop:~/workspace$mahout testnb -i ./20news-test-vectors -m ./model -l ./labelindex -ow -o ./20news-testing -c 结果如下: (5)查看结果,将序列文件转化为文本 user@hadoop:~/workspace$mahout seqdumper -i ./20news-testing/part-m-00000 -o ./20news_testing.res user@hadoop:~/workspace$cat 20news_testging.res 结果如下:

优秀的个人博客,低调大师

Mahout学习之运行canopy算法错误及解决办法

一:将Text转换成Vector序列文件时 在Hadoop中运行编译打包好的jar程序,可能会报下面的错误: Exceptioninthread"main"java.lang.NoClassDefFoundError: org/apache/mahout/common/AbstractJob 书中和网上给的解决办法都是:把Mahout根目录下的相应的jar包复制到Hadoop根目录下的lib文件夹下,同时重启Hadoop 但是到了小编这里不管怎么尝试,都不能解决,最终放弃了打包成jar运行的念头,就在对源码进行了修改,在eclipse运行了 二:java.lang.Exception: java.lang.ClassCastException: org.apache.hadoop.io.IntWritable cannot be cast to org.apache.hadoop.io.Text 此种错误,是由于map和reduce端函数格式输入输出不一致,导致数据类型不匹配 在次要注意一个特别容易出错的地方:Mapper和Reducer类中的函数必须是map和reduce,名字不能改,因为是继承Mapper类和Reducer类,如果函数名字改变了的话也可能造成以上的错误,或者Reducer端不输出 三:当在命令行里直接用命令转化文件格式时抛出如下错误: ERROR common.AbstractJob: Unexpected --seqFileDir while processing Job-Specific Options 注:转化命令为:bin/mahout clusterdump --seqFileDir /home/thinkgamer/document/canopy/output/clusters-0-final/ --pointsDir /home/thinkgamer/document/canopy/output/clusteredPoints/ --output /home/thinkgamer/document/canopy/clusteranalyze.txt 上网搜了搜热心的网友给出的解决办法是:将--seqFileDir换成--input即可

优秀的个人博客,低调大师

Hadoop学习笔记(二):MapReduce的特性-计数器、排序

计数器 计数器是一种收集作业统计信息的有效手段,用于质量控制或应用级统计。说白了就是统计整个mr作业所有数据行中符合某个if条件的数量,(除某些内置计数器之外)。仅当一个作业执行成功之后,计数器的值才是完整可靠的。如果一个任务在作业执行期间失败,则相关计数器值会减小,计数器是全局的。 计数器分为以下几种: 1)内置计数器,内置的作业计数器实际上由jobtracker维护,而不必在整个网络中发送; 2)用户自定义的java计数器,由其关联任务维护,并定期传到tasktracker,再由tasktracker传给jobtracker,可以定义多个枚举类型,每个枚举类型有多个字段,枚举类型名称即为组名,枚举字段名称即为计数器名称。 Reporter对象的incrCounter()方法重载: public void incrCounter(enum,long amout) 3)动态计数器,不由java枚举类型定义的计数器,由于在编译阶段就已指定java枚举类型的字段,故无法使用枚举类型动态新建计数器。 Reporter对象的incrCounter()方法重载: public void incrCounter(String group,String counter,long amout) 自定义java计数器由于使用枚举类型,默认名称为(枚举名称)$(枚举字段名称),可读性较差,可以通过设置一个属性文件将计数器重命名。属性文件名称为(使用该计数器的类名)_(组名).properties,文件内容见下面的代码吧,不好描述了~ 代码中使用了自定义java计数器和动态计数器: import java.io.IOException; import java.util.Iterator; import org.apache.hadoop.conf.Configured; import org.apache.hadoop.util.Tool; import org.apache.hadoop.util.ToolRunner; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapred.JobClient; import org.apache.hadoop.mapred.JobConf; import org.apache.hadoop.mapred.MapReduceBase; import org.apache.hadoop.mapred.Mapper; import org.apache.hadoop.mapred.OutputCollector; import org.apache.hadoop.mapred.Reducer; import org.apache.hadoop.mapred.Reporter; import common.JobBuilder; import common.NcdcRecordParser; public class TestTemperatureWithCounters extends Configured implements Tool { enum Temperature { MISSING, MALFORMED, VALIDATED } static class MaxTemperatureMapperWithCounters extends MapReduceBase implements Mapper<LongWritable, Text, Text, IntWritable> { private NcdcRecordParser parser = new NcdcRecordParser(); @Override public void map(LongWritable key, Text value, OutputCollector<Text, IntWritable> output, Reporter reporter) throws IOException { parser.parse(value); if (parser.isValidTemperature()) { int airTemperature = parser.getAirTemperature(); output.collect(new Text(parser.getYear()), new IntWritable( airTemperature)); reporter.incrCounter(Temperature.VALIDATED, 1); } else if (parser.isMalformedTemperature()) { reporter.incrCounter(Temperature.MALFORMED, 1); } else if (parser.isMissingTemperature()) { reporter.incrCounter(Temperature.MISSING, 1); } // dynarnic counter reporter.incrCounter("TemperatureCountByYear", parser.getYear(), 1); } } static class MaxTemperatureReducer extends MapReduceBase implements Reducer<Text, IntWritable, Text, IntWritable> { @Override public void reduce(Text key, Iterator<IntWritable> values, OutputCollector<Text, IntWritable> output, Reporter reporter) throws IOException { int maxValue = Integer.MIN_VALUE; while (values.hasNext()) { maxValue = Math.max(maxValue, values.next().get()); } output.collect(key, new IntWritable(maxValue)); } } @Override public int run(String[] args) throws Exception { JobConf conf = JobBuilder.parseInputAndOutput(this, getConf(), args); if (conf == null) { return -1; } conf.setOutputKeyClass(Text.class); conf.setOutputValueClass(IntWritable.class); conf.setMapperClass(MaxTemperatureMapperWithCounters.class); conf.setReducerClass(MaxTemperatureReducer.class); JobClient.runJob(conf); return 0; } /** * @param args */ public static void main(String[] args) throws Exception { int exitCode = ToolRunner.run(new TestTemperatureWithCounters(), args); System.exit(exitCode); //args //hdfs://192.168.174.128:9000/user/root/input/Temperature.txt hdfs://192.168.174.128:9000/user/root/output/testCounters0006 } } 这里提一句,在写的时候犯了一个低级错误:由于在帮助类NcdcRecordParser中开始是将boolean isMalformed=false; 变量声明处直接设置初始值,而不是在parse方法中设置为false。而在引用的MaxTemperatureMapperWithCounters类中却是在map函数之外实例化NcdcRecordParser的,导致当第一次isMalformed赋值为true之后,值一直为true,(由于同一个tasktracker调用的同一个map作业中,只会实例化一次Map类,为输入分片中每条记录调用一次map函数)马虎大意了~ 记录一下。 下面贴上控制台输出的计数器结果截图:(未使用配置文件重命名自定义java计数器) 重命名自定义java计数器配置文件:(名称为TestTemperatureWithCounters_Temperature.properties) CounterGroupName= Air Temperature Records MISSING.name=Missing MALFORMED.name=Malformed VALIDATED.name=Validated 使用配置文件重命名自定义java计数器截图: 排序 个人感觉排序是整个MR作业中比较重要的一块,所以需要反复研究,透彻研究! 部分排序: MapReduce的每个reducer的输入都是按键排序的。系统执行排序的过程-将Map的输出作为输入传递给reducer---称为shuffle。 整个排序过程分为map端操作和reduce端操作; 1)在map端:map函数的输出将利用缓冲的方式写到内存,并进行预排序。在这个过程中,如果map的输出大于环形内存缓冲区的阀值(缓冲区默认大小100M,阀值默认为80%),则会将写入溢出文件。当输入分片执行map函数全部结束,即写入内存和溢出文件结束后,将执行patitioner(这个是重点。。。。。。) patitioner执行后会将输出合并成一个已分区且已排序的输出文件,这是排序操作的第一步。(在这个输出文件写入到本地磁盘之前,将执行combiner操作和压缩操作,ps:如果有这些操作的话~) 这是map端的排序了。这样可以保证每个map到所有reduce的分区都是安key值已排序的。那么reduce接收的将是多个已排序的输入分区数据。(ps:并不是连在一起整个排好序的文件了~) 比如:假设存在map01,map02,reduce01,map01输出给reduce01的分区数据key值为01,03,05;map02输出给reduce01的分区数据key值位02,04,06。 2)在reduce端:map端输出文件位于运行map任务的tasktracker的本地磁盘,reduce端通过线程定期询问jobtracker以获得map输出的位置。复制这些map端输出文件到reduce端,并执行合并操作。将已排序的map端输出合并为一个或者多个合并后文件作为reduce操作的输入。(合并后的文件依然是排好序的;合并操作次数由io.sort.factor属性控制) 这里面有几个地方需要注意:1) map函数和reduce函数应该尽量少用内存,避免在map中堆积数据(需要留给shuffle~) 2) 避免map端多次溢出写磁盘,来获得最佳性能(估算map操作输出的大小,然后通过合理设置各项属性~) 全排序: 部分排序的reduce输入是有序的,但是多个reduce之间并不是一个接一个的有序的。所以reduce的输出是不能拼接起来有序的。(除非把reduce个数设置为1,但是这样就体现不出分布式的优势了~) 为了达到全排序的效果,需要自定义map操作输出时候的partion操作,让key值不同区间的数据落到不同区间,这样就保证了每个reduce的输入数据集和其他的reduce输入数据集连接起来是有序的。就实现了全排序的效果!比如:让key值属于[0,10)的数据非配到一个partition,[10,20)的数据分配到一个partition,以此类推。这样将reduce的输出文件直接拼接就能得到一个完整有序的最终全排序的结果了! 但是这里有个问题,由于数据是不规律的,向上面提到的区间[0,10)[10,20)这样划分区间肯定无法保证每个区间中的数据是均匀分布的,这样整个MR操作的时间将由最多数据的那个reduce决定,这样肯定是不可取的。为了达到这个目的,有一个取样器的概念。所谓取样器就是按一定规则从整个大数据集中取样出一小部分数据来观察大致的分布用来划分区间。以达到分区后每个reduce的输入能大概均匀分布的目的。 上代码: import java.net.URI; import org.apache.hadoop.conf.Configured; import org.apache.hadoop.filecache.DistributedCache; import org.apache.hadoop.fs.Path; import org.apache.hadoop.mapred.JobConf; import org.apache.hadoop.util.Tool; import org.apache.hadoop.util.ToolRunner; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.SequenceFile.CompressionType; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapred.FileInputFormat; import org.apache.hadoop.mapred.JobClient; import org.apache.hadoop.mapred.SequenceFileInputFormat; import org.apache.hadoop.mapred.SequenceFileOutputFormat; import org.apache.hadoop.mapred.lib.InputSampler; import org.apache.hadoop.mapred.lib.TotalOrderPartitioner; import common.JobBuilder; public class SortByTemperatureUsingTotalOrderPartitioner extends Configured implements Tool { @Override public int run(String[] args) throws Exception { JobConf conf = JobBuilder.parseInputAndOutput(this, this.getConf(), args); if (conf == null) { return -1; } conf.setInputFormat(SequenceFileInputFormat.class); conf.setOutputKeyClass(IntWritable.class); conf.setOutputFormat(SequenceFileOutputFormat.class); // SequenceFileOutputFormat.setCompressOutput(conf, true); // SequenceFileOutputFormat.setOutputCompressorClass(conf, GzipCodec.class); // SequenceFileOutputFormat.setOutputCompressionType(conf,CompressionType.BLOCK); conf.setPartitionerClass(TotalOrderPartitioner.class); InputSampler.Sampler<IntWritable, Text> sampler = new InputSampler.RandomSampler<IntWritable, Text>( 0.1, 10000, 10); Path input = FileInputFormat.getInputPaths(conf)[0]; input = input.makeQualified(input.getFileSystem(conf)); Path partitionFile = new Path(input, "_partitions"); TotalOrderPartitioner.setPartitionFile(conf, partitionFile); InputSampler.writePartitionFile(conf, sampler); URI partitionUri = new URI(partitionFile.toString() + "#_partitions"); DistributedCache.addCacheFile(partitionUri, conf); DistributedCache.createSymlink(conf); JobClient.runJob(conf); return 0; } /** * @param args */ public static void main(String[] args) throws Exception { int exitCode = ToolRunner.run( new SortByTemperatureUsingTotalOrderPartitioner(), args); System.exit(exitCode); //hdfs://192.168.174.128:9000/user/root/input/Temperature-seq -D mapred.reduce.tasks=3 hdfs://192.168.174.128:9000/user/root/output/Temperature-totalsort } } 由于我现在还是在伪分布式环境开发,只能有一个map一个reduce 这里设置tasks=3会报错了,而且本地由于只有一个reduce,其实这个所谓的全排序在我这里是看不到啥实际意义了。等过段时间装个分布式环境吧。 其中writePartitionFile(JobConfjob,InputSampler.Sampler<K,V>sampler)这个函数是将采样的结果排序,然后按照分区的个数n,将排序后的结果平均分为n分,取n-1个分割点,这个分割点具体取的时候,运用了一些4舍5入的方法,最简答的理解就是取后n-1个组中每组的第一个采样值就OK了。 这块需要再研究研究~ 辅助排序 辅助排序使用到的几个关键函数: setPartitionerClass(class);//自定义分区 setOutputValueGroupingComparator(class);// 提供分组规则, 使得不同 Key的中间数据,能够被分发到同一个reduce 处理,在下面的例子中即是将同一年份数据分发到同一reduce;这里设置group,就是按key的哪一个参数进行聚合, setOutputKeyComparatorClass(class);// 中间数据在被reduce处理之前,会先被集中排序,setOutputKeyComparatorClass提供排序规则,在下面的例子中即是将同一reduce接收到的数据以年份升序,温度降序排序;这个是设置key的比较器,设置聚合的key的一个二次排序方法 代码中:key为组合(年份,温度) value为nullwritable import java.io.IOException; import java.util.Iterator; import org.apache.hadoop.conf.Configured; import org.apache.hadoop.util.Tool; import org.apache.hadoop.util.ToolRunner; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.NullWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.io.WritableComparable; import org.apache.hadoop.io.WritableComparator; import org.apache.hadoop.mapred.JobClient; import org.apache.hadoop.mapred.JobConf; import org.apache.hadoop.mapred.MapReduceBase; import org.apache.hadoop.mapred.Mapper; import org.apache.hadoop.mapred.OutputCollector; import org.apache.hadoop.mapred.Partitioner; import org.apache.hadoop.mapred.Reducer; import org.apache.hadoop.mapred.Reporter; import common.IntPair; import common.JobBuilder; import common.NcdcRecordParser; public class MaxTemperatureUsingSecondarySort extends Configured implements Tool { static class MaxTemperatureMapper extends MapReduceBase implements Mapper<LongWritable, Text, IntPair, NullWritable> { private NcdcRecordParser parser = new NcdcRecordParser(); @Override public void map(LongWritable key, Text value, OutputCollector<IntPair, NullWritable> output, Reporter reporter) throws IOException { parser.parse(value); if (parser.isValidTemperature()) { output.collect( new IntPair(parser.getYearInt(), parser .getAirTemperature()), NullWritable.get()); } } } static class MaxTemperatureReducer extends MapReduceBase implements Reducer<IntPair, NullWritable, IntPair, NullWritable> { @Override public void reduce(IntPair key, Iterator<NullWritable> values, OutputCollector<IntPair, NullWritable> output, Reporter reporter) throws IOException { output.collect(key, NullWritable.get()); } } public static class FirstPartitioner implements Partitioner<IntPair, NullWritable> { @Override public void configure(JobConf conf) { } @Override public int getPartition(IntPair key, NullWritable value, int numPartitions) { return Math.abs(key.getFirst() * 127) % numPartitions; } } public static class KeyComparator extends WritableComparator { protected KeyComparator() { super(IntPair.class, true); } @Override public int compare(WritableComparable w1, WritableComparable w2) { IntPair ip1 = (IntPair) w1; IntPair ip2 = (IntPair) w2; int cmp = Integer.compare(ip1.getFirst(), ip2.getFirst()); if (cmp != 0) { return cmp; } return -Integer.compare(ip1.getSecond(), ip2.getSecond()); } } public static class GroupComparator extends WritableComparator { protected GroupComparator() { super(IntPair.class, true); } @Override public int compare(WritableComparable w1, WritableComparable w2) { IntPair ip1 = (IntPair) w1; IntPair ip2 = (IntPair) w2; return Integer.compare(ip1.getFirst(), ip2.getFirst()); } } @Override public int run(String[] args) throws Exception { JobConf conf = JobBuilder.parseInputAndOutput(this, getConf(), args); if (conf == null) return -1; conf.setMapperClass(MaxTemperatureMapper.class); conf.setPartitionerClass(FirstPartitioner.class); conf.setOutputKeyComparatorClass(KeyComparator.class); conf.setOutputValueGroupingComparator(GroupComparator.class); conf.setReducerClass(MaxTemperatureReducer.class); conf.setOutputKeyClass(IntPair.class); conf.setOutputValueClass(NullWritable.class); JobClient.runJob(conf); return 0; } public static void main(String[] args) throws Exception { int exitCode = ToolRunner.run(new MaxTemperatureUsingSecondarySort(), args); System.exit(exitCode); } } 这里有些地方不大理解,如果使用默认reduce的情况下(注释掉conf.setReducerClass这一行)不理解为啥所有记录的key都成了温度最大的那条记录的key,即同一年份第一条记录的key, 如果继续使用默认的OutputValueGroupingComparator(由于现在只有一个reduce,故注释掉不会影响现有逻辑,注释掉conf.setOutputValueGroupingComparator这一行),才会是所有记录年份升序,温度降序排列的结果。 需要在分布式上执行多个map,reduce的时候试一下,到时候需要再研究研究。 (之前看到别人的博客上有提到过好像是 reduce如果判断为key值相等的情况下,则会只使用第一条记录的key,不会再去取之后记录的key,好像是这个原因了,我试验了下将GroupComparator.class中的逻辑改成和KeyComparator.class中逻辑一致,将会输出所有记录去掉重复行后的年份升序,温度降序排列的结果) 好像又不是这么理解的,暂时留着吧,有时间继续查资料了~

优秀的个人博客,低调大师

Hadoop学习第四天之hadoop命令操作(上)

Hadoop命令操作 格式:hadoop fs 可以在/etc/profile 中添加 alias hf=’hadoopfs’ #简化每次命令的书写 -ls<path> 表示对hdfs下一级目录的查看 -lsr<path> 表示对hdfs目录的递归查看 -mkdir <path> 创建目录 -put<src> <des> 从linux上传文件到hdfs (同-copyFromLocal)可以上传多个文件,最后一个为目的地址 -get<src> <des> 从hdfs下载文件到linux (同-copyToLocal),可以在下载的时候重命名文件 -text<path> 查看文件内容 -rm<path> 删除文件,显示红框内容显示删除成功,否则会显示删除不成功的错误提示 -rmr<path> 表示递归删除文件, 删除文件(夹) -touchz <path> //创建一个空文件 -help [cmd] //显示命令的帮助信息 -du <path> //显示目录中所有文件大小,前面是文件(目录)大小,后面是完整的路径 -dus <path> //显示指定路径的空间大小 -count[-q] <path> //显示目录中目录数量、文件数量、空间大小、完整路径 -mv <src> <dst> //移动多个文件到目标目录(都是hdfs目录) -cp <src> <dst> //复制多个文件到目标目录(都是hdfs目录) -copyFromLocal //同put -moveFromLocal //从本地文件移动到hdfs -getmerge <src> <localdst> //将源目录中的所有文件排序合并到一个文件中 -cat <src> //在终端显示文件内容,同-text

资源下载

更多资源
优质分享App

优质分享App

近一个月的开发和优化,本站点的第一个app全新上线。该app采用极致压缩,本体才4.36MB。系统里面做了大量数据访问、缓存优化。方便用户在手机上查看文章。后续会推出HarmonyOS的适配版本。

Spring

Spring

Spring框架(Spring Framework)是由Rod Johnson于2002年提出的开源Java企业级应用框架,旨在通过使用JavaBean替代传统EJB实现方式降低企业级编程开发的复杂性。该框架基于简单性、可测试性和松耦合性设计理念,提供核心容器、应用上下文、数据访问集成等模块,支持整合Hibernate、Struts等第三方框架,其适用范围不仅限于服务器端开发,绝大多数Java应用均可从中受益。

Rocky Linux

Rocky Linux

Rocky Linux(中文名:洛基)是由Gregory Kurtzer于2020年12月发起的企业级Linux发行版,作为CentOS稳定版停止维护后与RHEL(Red Hat Enterprise Linux)完全兼容的开源替代方案,由社区拥有并管理,支持x86_64、aarch64等架构。其通过重新编译RHEL源代码提供长期稳定性,采用模块化包装和SELinux安全架构,默认包含GNOME桌面环境及XFS文件系统,支持十年生命周期更新。

Sublime Text

Sublime Text

Sublime Text具有漂亮的用户界面和强大的功能,例如代码缩略图,Python的插件,代码段等。还可自定义键绑定,菜单和工具栏。Sublime Text 的主要功能包括:拼写检查,书签,完整的 Python API , Goto 功能,即时项目切换,多选择,多窗口等等。Sublime Text 是一个跨平台的编辑器,同时支持Windows、Linux、Mac OS X等操作系统。

用户登录
用户注册