首页 文章 精选 留言 我的

精选列表

搜索[学习],共10000篇文章
优秀的个人博客,低调大师

《从零开始学Swift》学习笔记(Day 61)——Core Foundation框架之内存管理

在Swift原生数据类型、Foundation框架数据类型和Core Foundation框架数据类型之间转换过程中,虽然是大部分是可以零开销桥接,零开销并不意味着内存什么都不用管。Swift类型内存管理是采用ARC,Foundation类型和Core Foundation类型内存管理都是采用MRC或ARC,CoreFoundation类型内存管理是基于C语言风格的,它有一个对象所有权的概念 Objective-C的MRC内存管理 Core Foundation的内存管理与Objective-C的MRC内存管理密不可分,先介绍一下Objective-C的MRC内存管理。 所有Objective-C类都继承NSObject类,每个NSObject对象都有一个内部计数器,这个计数器跟踪对象的引用次数,被称为“引用计数”(Reference Count,简称RC)。当对象被创建时候,引用计数为1。为了保证对象的存在,可以调用retain方法保持对象,retain方法会使其引用计数加1,如果不需要这个对象可以调用release或autorelease方法,release或autorelease方法使其引用计数减1。当对象的引用计数为0的时候,系统运行环境才会释放对象内存。 引用计数示例如图所示,首先在第①步调用者A中创建了一个NSObject对象,这时该对象引用计数为1。在第②步调用者B中想使用这个NSObject对象,于是使用NSObject对象引用,但是为了防止使用过程中NSObject对象被释放,可以调用retain方法使引用计数加1,这时引用计数为2。在第③步调用者A中调用release或autorelease方法,使引用计数减1,这时引用计数为1。在第④步调用者C中调用release或autorelease方法,只是获得NSObject对象引用,并没有调用retain、release或autorelease方法,因此没有引起引用计数的变化。在第⑤步调用者B中调用release或autorelease方法使引用计数减1,这时引用计数为0。这个时候NSObject对象就内存就可以释放了。 来总结一下: 1.谁创建或拷贝对象,他也一定要负责调用NSObject对象release或autorelease方法,使引用计数减1,如图中调用者A在第①步,负责创建了NSObject对象,那么调用者A也必须是负责使引用计数减1,见第④步。 2.谁调用retain方法使引用计数加1,它也一定要负责调用NSObject对象release或autorelease方法,使引用计数减1,如图中调用者B在第②步,调用者B调用NSObject对象retain方法使引用计数加1,那么调用者B也必须是负责使引用计数减1,见第⑤步。 对象所有权 一个对象可以有一个或多个所有者,从所有者的角度看是对这个对象具有了“所有权”,从上图中看,调用者A和调用者B是所有者,他们可能是一段程序,可能是一个对象。他们对NSObject对象具有所有权,不再使用时候他们应该负责放弃对象所有权,当对象没有所有者时,引用计数为0,它才可以被释放。 如上图如果按照对象所有权解释:调用者A创建或拷贝NSObject对象,这时调用者A就具有了NSObject对象的所有权,见第①步。调用者B调用NSObject对象retain方法,就获得了也NSObject对象的所有权,见第②步。调用者A调用NSObject对象release方法,放弃NSObject对象的所有权,见第③步。调用者C只是使用NSObject对象没有获得NSObject对象的所有权,见第④步。调用者B调用NSObject对象release方法,放弃NSObject对象的所有权,见第⑤步,但是调用者B使用这个NSObject对象过程中,由于其他调用者放弃所有权,导致NSObject对象被释放,那么调用者B中程序就会发生运行期错误。 本文转自 tony关东升 51CTO博客,原文链接:http://blog.51cto.com/tonyguan/1748666,如需转载请自行联系原作者

优秀的个人博客,低调大师

Hadoop MapReduce概念学习系列之如何进行DeBug调试(二十五)

写程序几乎一大半的时间是调试,分布式程序调试的成本更高。 那么分布式的代码程序该如何调试呢?下面我们一起来 MapReduce 代码如何使用 Debug 来调试。 仍然以美国气象站为例子。 MapReduce 的Debug 调试 这里我们以 Temperature 为例 1、 在Temperature.java里进行改动 改动地方1 : 改为, 我们通过数组来传入输入路径和输出路径。 改动地方2: 改为, 2 、开启hadoop集群启动和链接 3 、在Temperature.java里的Map函数中打入断点,以及在Reducer函数中打入断点。 4、右键,debug as -> 1 Java Application ,来进行断点调试 5、出现以下界面 首先在Map函数里 6、查看key 、value 。知识点(系统键+Prt Sc Sys Rq进行抓图) 7、按F6,然后查看temperature值 8、按F6,查看weatherStationId 和 temperature 9 、点击 10 、再次查看key和value 11、按F6,查看temperature 12 、按F6,查看weatherStationId 13 、将Map中的断点放开,下面到Reducer 再到Reducer函数里 14 、Reducer中去,查看key和value 15、按下F6,查看sum 16 、然后,自己可以加断点,也可以放开断点,按F6往下走。 最后,全部放开,各部分调试都没问题, 17、返回界面 18 查看 即表明,03103气象站的平均气温是82. 其实 程序进入debug调试后,后续的调试步骤跟 Eclipse 调试 java程序是一样的,这里就不再赘述。 本文转自大数据躺过的坑博客园博客,原文链接:http://www.cnblogs.com/zlslch/p/5078045.html,如需转载请自行联系原作者

优秀的个人博客,低调大师

Storm概念学习系列 之数据流模型、Storm数据流模型

数据流模型 数据流模型是由数据流、数据处理任务、数据节点、数据处理任务实例等构成的一种数据模型。本节将介绍的数据流模型如图1所示。 分布式流处理系统由多个数据处理节点(node)组成,每个数据处理节点上运行有多个数据任务实例,每个数据任务实例属于一个数据任务定义。任务实例是在任务定义的基础上,添加了输入流过滤条件和强制输出周期属性后,可实际推送到数据处理节点上运行的逻辑实体;数据任务定义包含输入数据流、数据处理逻辑和输出数据流属性。 数据流模型简介 首先介绍数据流模型中的一些重要概念。1. 数据流 数据流是时间分布和数量上无限的一系列数据记录的集合体。数据记录是数据流的最小组成单元,每条数据记录包括三类数据:数据流名称(stream name)、标识数据(key)和具体数据处理逻辑所需的数据(value)。 图1 数据流处理流程图 2. 定义数据处理任务 定义数据处理任务只是定义一个数据处理任务的基本属性,任务还无法直接执行,必须将其实现为具体的任务实例。数据处理任务的基本属性包括输入流、输出流和数据处理逻辑。 (1)输入流(可选) 输入流描述该任务依赖哪些数据流作为输入,是一个数据流名称列表;数据流产生源不会依赖其他数据流,可忽略该配置。 (2)输出流(可选) 输出流描述该任务产生哪个数据流,是一个数据流名称;数据流处理链末级任务不会产生新的数据流,可忽略该配置。 (3)数据处理逻辑 数据处理逻辑描述该任务具体的处理逻辑,如由独立进程执行的外部处理逻辑。 3. 数据处理节点 数据处理节点是可容纳多个数据处理任务实例运行的实体机器,每个数据处理节点的IPv4地址必须保证唯一。 4. 数据处理任务实例 对一个数据处理任务定义进行具体约束后,可将其推送到某个处理节点上运行具体的逻辑实体。数据处理任务基本属性包括数据处理任务定义、输入流过滤条件、强制输出周期,下面进行具体介绍。 (1)数据处理任务定义 数据处理任务定义指向该任务实例对应的数据处理任务定义实体。 (2)输入流过滤条件 输入流过滤条件是一个布尔类型表达式列表,描述每个输入流中符合什么条件的数据记录可以作为有效数据交给处理逻辑。若某个输入流中所有数据记录都是有效数据,则可直接用true表示。 (3)强制输出周期(可选) 强制输出周期描述以什么频率强制该任务实例产生输出流记录,可以用输入流记录数或间隔时间作为周期。如果忽略该配置,则输出流记录产生周期完全由处理逻辑自身决定,不受框架约束。 Storm数据流模型 数据流(Stream)是Storm中对数据进行的抽象,它是时间上无界的Tuple元组序列。在Topology中,Spout是Stream的源头,负责为Topology从特定数据源发射Stream(Spout并不需要接收流,只会发射流);Bolt可以接收任意多个流作为输入,然后进行数据的加工处理过程,如果需要,Bolt还可以发射出新的流给下级Bolt处理。Topology内部Spout和Bolt之间的数据流关系图如图2所示。 图2 Spout和Bolt中的数据流关系图 Topology中每一个计算组件(Spout和Bolt)都有一个并行执行度(Task),在创建Topology时可以指定,Storm会在集群内分配对应并行度个数的线程来同时执行这一组件。Storm提供了若干种数据流分发(Stream Grouping)策略来解决在两个组件(Spout和Bolt)之间发送Tuple。在定义Topology时,需要为每个Bolt指定接收什么样的流作为其输入。 本文转自大数据躺过的坑博客园博客,原文链接:http://www.cnblogs.com/zlslch/p/5989745.html,如需转载请自行联系原作者

优秀的个人博客,低调大师

Storm概念学习系列之Stream消息流 和 Stream Grouping 消息流组

Stream消息流是Storm中最关键的抽象,是一个没有边界的Tuple序列。 Stream Grouping 消息流组是用来定义一个流如何分配到Tuple到Bolt。 Stream消息流和Stream Grouping消息流组 Storm核心的抽象概念是“流”。流是一个分布式并行创建和处理的无界的连续元组(Tuple)。流通过给流元组中字段命名来定义。在默认情况下,元组可以包含整型、长整型、短整型、字节、字符串、双精度浮点数、单精度浮点数、布尔型和字节数组。 Stream消息流Stream消息流是Storm中最关键的抽象,是一个没有边界的Tuple序列,这些Tuple以分布式的方式并行地创建和处理。源源不断传递的元组Tuple就组成了流Stream,定义消息流主要是定义消息流中的Tuple。Tuple的定义在前面的博文已经做了详细介绍,本博文不再累述。消息流Tuple中的每个字段都有一个名字,并且不同Tuple对应字段的类型必须相同。两个Tuple的第一个字段的类型必须相同,第二个字段的类型必须相同,但是第一个字段和第二个字段可以有不同的类型。 每个消息流在定义时都会分配一个ID,因为单向消息流很普遍,OutputFieldsDeclarer定义了一些方法可以定义一个流而不用指定其ID。在这种情况下,该流有一个默认的ID。 Storm将每个待处理或者新产生的Tuple封装成“消息”,而一个消息流(Stream)则是一个没有边界的Tuple序列,而这些Tuple会以一种分布式的方式被并行地创建和处理。 Stream Grouping消息流组 定义Topology的其中一步是定义每个Bolt接受何种流作为输入。Stream Grouping(消息流组)就是用来定义一个流如何分配Tuple到Bolt。Storm包括6种流分组类型。 1)随机分组(Shuffle Grouping):随机分发元组到Bolt的任务,保证每个任务获得相等数量的元组。 2)字段分组(Fields Grouping):根据指定字段分割数据流并分组。例如,根据“user-id”字段,具有该字段的Tuple被分到相同的Bolt,不同的“user-id”值则会被分配到不同的Bolt。 3)全部分组(All Grouping):对于每一个Tuple来说,所有的Bolt都会收到,所有的Tuple被复制到Bolt的所有任务上,需小心使用该分组。 4)全局分组(Global Grouping):全部的流都分配到Bolt的同一个任务,就是分配给ID最小的Task。 5)无分组(None Grouping):不分组的含义是,流不关心到底谁会收到它的Tuple。目前无分组等效于随机分组,不同的是Storm将把无分组的Bolt放到订阅Bolt或Spout的同一线程中执行(在可能实现的前提下)。 6)直接分组(Direct Grouping):这是一个特别的分组类型。元组生产者决定元组由哪个元组消费者任务接收。该分组仅能被声明为direct stream的流使用。元组必须通过emitDirect方法直接发射。Bolt获取消费者任务ID,可以使用已提供的TopologyContext类或保持引用OutputCollector类的emit方法的输出,元组发送后返回任务ID列表。 本文转自大数据躺过的坑博客园博客,原文链接:http://www.cnblogs.com/zlslch/p/5989723.html,如需转载请自行联系原作者

优秀的个人博客,低调大师

《从零开始学Swift》学习笔记(Day 20)——函数中参数的传递引用

参数的传递引用 类是引用类型,其他的数据类型如整型、浮点型、布尔型、字符、字符串、元组、集合、枚举和结构体全部是值类型。 有的时候就是要将一个值类型参数以引用方式传递,这也是可以实现的,Swift提供的inout关键字就可以实现。看下面的一个示例: 1 2 3 4 5 6 7 8 9 10 11 funcincrement(inoutvalue:Double,amount:Double= 1.0 ){ value+=amount } varvalue:Double= 10.0 increment(&value) print(value) increment(&value,amount: 100.0 ) print(value) 代码increment(&value)是调用函数increment,增长量是默认值,其中&value(在变量前面加&符号,取出value地址)是传递引用方式,它在定义函数时,参数标识与inout是相互对应的。 代码increment(&value,amount:100.0)也是调用函数increment,增长量是100.0。 上述代码输出结果如下: 11.0 111.0 本文转自 tony关东升 51CTO博客,原文链接:http://blog.51cto.com/tonyguan/1746249,如需转载请自行联系原作者

优秀的个人博客,低调大师

Spark RDD概念学习系列之rdd持久化、广播、累加器(十八)

1、rdd持久化 2、广播 3、累加器 1、rdd持久化 通过spark-shell,可以快速的验证我们的想法和操作! 启动hdfs集群 spark@SparkSingleNode:/usr/local/hadoop/hadoop-2.6.0$sbin/start-dfs.sh 启动spark集群 spark@SparkSingleNode:/usr/local/spark/spark-1.5.2-bin-hadoop2.6$sbin/start-all.sh 启动spark-shell spark@SparkSingleNode:/usr/local/spark/spark-1.5.2-bin-hadoop2.6/bin$./spark-shell --master spark://SparkSingleNode:7077 --executor-memory 1g reduce scala>sc res0: org.apache.spark.SparkContext = org.apache.spark.SparkContext@3bcc8f13 scala> val numbers = sc.parallelize <console>:21: error: missing arguments for method parallelize in class SparkContext; follow this method with `_' if you want to treat it as a partially applied function val numbers = sc.parallelize ^ scala>val numbers = sc.parallelize(1 to 100) numbers: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[0] at parallelize at <console>:21 scala>numbers.reduce(_+_) took 11.790246 s res1: Int = 5050 可见,reduce是个action。 scala>val result = numbers.map(2*_) result: org.apache.spark.rdd.RDD[Int] = MapPartitionsRDD[1] at map at <console>:23 scala>val data = result.collect reduce源码 /** * Reduces the elements of this RDD using the specified commutative and * associative binary operator. */ def reduce(f: (T, T) => T): T = withScope { val cleanF = sc.clean(f) val reducePartition: Iterator[T] => Option[T] = iter => { if (iter.hasNext) { Some(iter.reduceLeft(cleanF)) } else { None } } var jobResult: Option[T] = None val mergeResult = (index: Int, taskResult: Option[T]) => { if (taskResult.isDefined) { jobResult = jobResult match { case Some(value) => Some(f(value, taskResult.get)) case None => taskResult } } } sc.runJob(this, reducePartition, mergeResult) // Get the final result out of our Option, or throw an exception if the RDD was empty jobResult.getOrElse(throw new UnsupportedOperationException("empty collection")) } 可见,这也是一个action操作。 collect data: Array[Int] = Array(2, 4, 6, 8, 10, 12, 14, 16, 18, 20, 22, 24, 26, 28, 30, 32, 34, 36, 38, 40, 42, 44, 46, 48, 50, 52, 54, 56, 58, 60, 62, 64, 66, 68, 70, 72, 74, 76, 78, 80, 82, 84, 86, 88, 90, 92, 94, 96, 98, 100, 102, 104, 106, 108, 110, 112, 114, 116, 118, 120, 122, 124, 126, 128, 130, 132, 134, 136, 138, 140, 142, 144, 146, 148, 150, 152, 154, 156, 158, 160, 162, 164, 166, 168, 170, 172, 174, 176, 178, 180, 182, 184, 186, 188, 190, 192, 194, 196, 198, 200) scala> collect源码 /** * Return an array that contains all of the elements in this RDD. */ def collect(): Array[T] = withScope { val results = sc.runJob(this, (iter: Iterator[T]) => iter.toArray) Array.concat(results: _*) } 可见,这也是一个action操作。 从收集结果的角度来说,如果想要在命令行终端中,看到执行结果,就必须collect。 从源码的角度来说,凡是action级别的操作,都会触发sc.rubJob。这点,spark里是一个应用程序允许有多个Job,而hadoop里一个应用程序只能一个Job。 count scala>numbers res2: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[0] at parallelize at <console>:21 scala>1 to 100 res3: scala.collection.immutable.Range.Inclusive = Range(1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17, 18, 19, 20, 21, 22, 23, 24, 25, 26, 27, 28, 29, 30, 31, 32, 33, 34, 35, 36, 37, 38, 39, 40, 41, 42, 43, 44, 45, 46, 47, 48, 49, 50, 51, 52, 53, 54, 55, 56, 57, 58, 59, 60, 61, 62, 63, 64, 65, 66, 67, 68, 69, 70, 71, 72, 73, 74, 75, 76, 77, 78, 79, 80, 81, 82, 83, 84, 85, 86, 87, 88, 89, 90, 91, 92, 93, 94, 95, 96, 97, 98, 99, 100) scala>numbers.count took 0.649005 s res4: Long = 100 count源码 /** * Return the number of elements in the RDD. */ def count(): Long = sc.runJob(this, Utils.getIteratorSize _).sum 可见,这也是一个action操作。 take scala>val topN = numbers.take(5) topN: Array[Int] = Array(1, 2, 3, 4, 5) take源码 /** * Take the first num elements of the RDD. It works by first scanning one partition, and use the * results from that partition to estimate the number of additional partitions needed to satisfy * the limit. * * @note due to complications in the internal implementation, this method will raise * an exception if called on an RDD of `Nothing` or `Null`. */ def take(num: Int): Array[T] = withScope { if (num == 0) { new Array[T](0) } else { val buf = new ArrayBuffer[T] val totalParts = this.partitions.length var partsScanned = 0 while (buf.size < num && partsScanned < totalParts) { // The number of partitions to try in this iteration. It is ok for this number to be // greater than totalParts because we actually cap it at totalParts in runJob. var numPartsToTry = 1 if (partsScanned > 0) { // If we didn't find any rows after the previous iteration, quadruple and retry. // Otherwise, interpolate the number of partitions we need to try, but overestimate // it by 50%. We also cap the estimation in the end. if (buf.size == 0) { numPartsToTry = partsScanned * 4 } else { // the left side of max is >=1 whenever partsScanned >= 2 numPartsToTry = Math.max((1.5 * num * partsScanned / buf.size).toInt - partsScanned, 1) numPartsToTry = Math.min(numPartsToTry, partsScanned * 4) } } val left = num - buf.size val p = partsScanned until math.min(partsScanned + numPartsToTry, totalParts) val res = sc.runJob(this, (it: Iterator[T]) => it.take(left).toArray, p) res.foreach(buf ++= _.take(num - buf.size)) partsScanned += numPartsToTry } buf.toArray } } 可见,这也是一个action操作。 countByKey scala>val scores = Array(Tuple2(1,100),Tuple2(1,100),Tuple2(2,100),Tuple2(2,100),Tuple2(3,100)) scores: Array[(Int, Int)] = Array((1,100), (1,100), (2,100), (2,100), (3,100)) scala> val content = sc.parallelize <console>:21: error: missing arguments for method parallelize in class SparkContext; follow this method with `_' if you want to treat it as a partially applied function val content = sc.parallelize ^ scala>val content = sc.parallelize(scores) content: org.apache.spark.rdd.RDD[(Int, Int)] = ParallelCollectionRDD[0] at parallelize at <console>:23 scala>val data = content.countByKey() took 10.556634 s data: scala.collection.Map[Int,Long] = Map(2 -> 2, 1 -> 2, 3 -> 1) countByKey源码 /** * Count the number of elements for each key, collecting the results to a local Map. * * Note that this method should only be used if the resulting map is expected to be small, as * the whole thing is loaded into the driver's memory. * To handle very large results, consider using rdd.mapValues(_ => 1L).reduceByKey(_ + _), which * returns an RDD[T, Long] instead of a map. */ def countByKey(): Map[K, Long] = self.withScope { self.mapValues(_ => 1L).reduceByKey(_ + _).collect().toMap } 可见,这也是一个action操作。 saveAsTextFile 之前,在 rdd实战(rdd基本操作实战及transformation和action流程图)(源码) scala>val partitionsReadmeRdd = sc.textFile("hdfs://SparkSingleNode:9000/README.md").flatMap(_.split(" ")).map(word =>(word,1)).reduceByKey(_+_,1).saveAsTextFile("~/partition1README.txt") 这里呢。 scala>val partitionsReadmeRdd = sc.textFile("/README.md").flatMap(_.split(" ")).map(word =>(word,1)).reduceByKey(_+_,1).saveAsTextFile("/partition1README.txt") scala>val partitionsReadmeRdd = sc.textFile("/README.md").flatMap(_.split(" ")).map(word =>(word,1)).reduceByKey(_+_,1).saveAsTextFile("/partition1README.txt") saveAsTextFile源码 /** * Save this RDD as a text file, using string representations of elements. */ def saveAsTextFile(path: String): Unit = withScope { // https://issues.apache.org/jira/browse/SPARK-2075 // // NullWritable is a `Comparable` in Hadoop 1.+, so the compiler cannot find an implicit // Ordering for it and will use the default `null`. However, it's a `Comparable[NullWritable]` // in Hadoop 2.+, so the compiler will call the implicit `Ordering.ordered` method to create an // Ordering for `NullWritable`. That's why the compiler will generate different anonymous // classes for `saveAsTextFile` in Hadoop 1.+ and Hadoop 2.+. // // Therefore, here we provide an explicit Ordering `null` to make sure the compiler generate // same bytecodes for `saveAsTextFile`. val nullWritableClassTag = implicitly[ClassTag[NullWritable]] val textClassTag = implicitly[ClassTag[Text]] val r = this.mapPartitions { iter => val text = new Text() iter.map { x => text.set(x.toString) (NullWritable.get(), text) } } RDD.rddToPairRDDFunctions(r)(nullWritableClassTag, textClassTag, null) .saveAsHadoopFile[TextOutputFormat[NullWritable, Text]](path) } /** * Save this RDD as a compressed text file, using string representations of elements. */ def saveAsTextFile(path: String, codec: Class[_ <: CompressionCodec]): Unit = withScope { // https://issues.apache.org/jira/browse/SPARK-2075 val nullWritableClassTag = implicitly[ClassTag[NullWritable]] val textClassTag = implicitly[ClassTag[Text]] val r = this.mapPartitions { iter => val text = new Text() iter.map { x => text.set(x.toString) (NullWritable.get(), text) } } RDD.rddToPairRDDFunctions(r)(nullWritableClassTag, textClassTag, null) .saveAsHadoopFile[TextOutputFormat[NullWritable, Text]](path, codec) } saveAsTextFile不仅,可保存在集群里,也可以保存到本地,这就要看hadoop的运行模式。 由此可见,它也是个action操作。 以上是rdd持久化的第一个方面,就是action级别的操作。 rdd持久化的第二个方面,就是通过persist。为什么在spark里,随处可见persist的身影呢? 原因一:spark在默认情况下,数据是放在内存中,适合高速迭代。比如在一个stage里,有1000个步骤,它其实只在第1个步骤输入数据,在第1000个步骤输出数据,在中间不产生临时数据。但是,分布式系统,分享非常高,所以,容出错,设计到容错。 由于,rdd是有血统继承关系的,即lineager。如果后面的rdd数据分片出错了或rdd本身出错了,则,可根据其前面依赖的lineager,算出来。 但是,假设1000个步骤,如果之前,没有父rdd进行persist或cache的话,则要重头开始了。亲! 什么时候,该persist? 1、在某个步骤非常费时的情况下,不好使 (手动) 2、计算链条特别长的情况下 (手动) 3、checkpoint所在的rdd也一定要持久化数据 (注意:在checkpoint之前,进行persist) (手动) checkpoint是rdd的算子, 先写,某个具体rdd.checkpoint 或 某个具体rdd.cache ,再写, 某个具体rdd.persist 4、shuffle之后 (因为shuffle之后,要网络传输,风险大) (手动) 5、shuffle之前 (框架,默认给我们做的,把数据持久化到本地磁盘) checkpoint源码 /** * Mark this RDD for checkpointing. It will be saved to a file inside the checkpoint * directory set with `SparkContext#setCheckpointDir` and all references to its parent * RDDs will be removed. This function must be called before any job has been * executed on this RDD. It is strongly recommended that this RDD is persisted in * memory, otherwise saving it on a file will require recomputation. */ def checkpoint(): Unit = RDDCheckpointData.synchronized { // NOTE: we use a global lock here due to complexities downstream with ensuring // children RDD partitions point to the correct parent partitions. In the future // we should revisit this consideration. if (context.checkpointDir.isEmpty) { throw new SparkException("Checkpoint directory has not been set in the SparkContext") } else if (checkpointData.isEmpty) { checkpointData = Some(new ReliableRDDCheckpointData(this)) } } persist源码 /** * Mark this RDD for persisting using the specified level. * * @param newLevel the target storage level * @param allowOverride whether to override any existing level with the new one */ private def persist(newLevel: StorageLevel, allowOverride: Boolean): this.type = { // TODO: Handle changes of StorageLevel if (storageLevel != StorageLevel.NONE && newLevel != storageLevel && !allowOverride) { throw new UnsupportedOperationException( "Cannot change storage level of an RDD after it was already assigned a level") } // If this is the first time this RDD is marked for persisting, register it // with the SparkContext for cleanups and accounting. Do this only once. if (storageLevel == StorageLevel.NONE) { sc.cleaner.foreach(_.registerRDDForCleanup(this)) sc.persistRDD(this) } storageLevel = newLevel this } /** * Set this RDD's storage level to persist its values across operations after the first time * it is computed. This can only be used to assign a new storage level if the RDD does not * have a storage level set yet. Local checkpointing is an exception. */ def persist(newLevel: StorageLevel): this.type = { if (isLocallyCheckpointed) { // This means the user previously called localCheckpoint(), which should have already // marked this RDD for persisting. Here we should override the old storage level with // one that is explicitly requested by the user (after adapting it to use disk). persist(LocalRDDCheckpointData.transformStorageLevel(newLevel), allowOverride = true) } else { persist(newLevel, allowOverride = false) } } /** Persist this RDD with the default storage level (`MEMORY_ONLY`). */ def persist(): this.type = persist(StorageLevel.MEMORY_ONLY) /** Persist this RDD with the default storage level (`MEMORY_ONLY`). */ def cache(): this.type = persist() StorageLevel里有很多类型 这里,牵扯到序列化。 问,为什么要序列化? 答:节省空间,减少体积。内存不够时,把MEMORY中的数据,进行序列化。 当然,也有不好一面,序列化时,会反序列化,反序列化耗cpu。 MEMORY_AND_DISK 假设,我们制定数据存储方式是,MEMORY_AND_DISK。则,是不是同时,存储到内存和磁盘呢? 答:不是啊,亲。spark一定是优先考虑内存的啊,只要内存足够,不会考虑磁盘。若内存不够了,则才放部分数据到磁盘。 极大地减少数据丢失概率发生。 MEMORY_ONLY 假设,我们制定数据存储方式是,MEMORY_ONLY。则,只放到内存。当内存不够了,会出现OOM。或数据丢失。 OFF_HEAP 这牵扯到Tachyon,基于内存的分布式系统 为什么有2分副本?好处是? 假设,一个计算特别耗时,而且,又是基于内存,如果其中一份副本崩溃掉,则可迅速切换到另一份副本去计算。这就是“空间换时间”!非常重要 这不是并行计算,这是计算后的结果,放2份副本。 scala>val partitionsReadmeRdd = sc.textFile("/README.md").flatMap(_.split(" ")).map(word =>(word,1)).reduceByKey(_+_,1).count took 6.270138 s scala>val partitionsReadmeRdd = sc.textFile("/README.md").flatMap(_.split(" ")).map(word =>(word,1)).reduceByKey(_+_,1).cache.count took 4.147545 s scala>val partitionsReadmeRdd = sc.textFile("/README.md").flatMap(_.split(" ")).map(word =>(word,1)).reduceByKey(_+_,1).cache.count took 4.914212 s scala>val cacheRdd = sc.textFile("/README.md").flatMap(_.split(" ")).map(word =>(word,1)).reduceByKey(_+_,1).cache scala>cacheRdd.count took 3.371621 scala>val cacheRdd = sc.textFile("/README.md").flatMap(_.split(" ")).map(word =>(word,1)).reduceByKey(_+_,1).cache scala>cacheRdd.count took 0.943499 s 我的天啊! scala>sc.textFile("/README.md").flatMap(_.split(" ")).map(word =>(word,1)).reduceByKey(_+_,1).cache.count took 5.603903 scala>sc.textFile("/README.md").flatMap(_.split(" ")).map(word =>(word,1)).reduceByKey(_+_,1).cache.count took 4.146627 scala>sc.textFile("/README.md").flatMap(_.split(" ")).map(word =>(word,1)).reduceByKey(_+_,1).cache.count took 3.071122 cache之后,一定不能立即有其他算子! 实际工程中, cache之后,如果有其他算子,则会,重新触发这个工作过程。 注意:cache,不是action cache缓存,怎么让它失效? 答:unpersist persist是lazy级别的,unpersist是eager级别的。cache是persist的一个特殊情况。 cache和persist的区别? 答:persist可以放到磁盘、放到内存、同时放到内存和磁盘。以及多份副本 cache只能放到内存,以及只能一份副本。 persisit在内存不够时,保存在磁盘的哪个目录? 答:local的process。 好的,以上是,rdd持久化的两个方面。 rdd持久化的第一个方面,就是常用的action级别的操作。 rdd持久化的第二个方面,就是持久化的不同方式,以及它内部的运行情况 小知识:cache之后,一定不能立即有其他算子!实际工程中, cache之后,如果有其他算子,则会,重新触发这个工作过程。 一般都不会跨机器抓内存,宁愿排队。宁愿数据不动代码动。 2、广播 为什么要有,rdd广播? 答:大变量、join、冗余、减少数据移动、通信、状态、集群消息、共享、网络传输慢要提前、数据量大耗网络、减少通信、要同步。 为什么大变量,需要广播呢? 答:原因是,每个task运行,读取全集数据时,task本身执行时,每次都要拷贝一份数据副本,如果变量比较大,如一百万,则要拷贝一百万。 (2)广播(线程中共享)不必每个task都拷贝一份副本,因为它是全局唯一的,极大的减少oom,减少通信,冗余、共享变量等。广播是将数据广播到Executor的内存中,其内部所以的任务都会只享有全局唯一的变量,减少网络传输。 text在读取数据时候,拷贝一份的数据副本(变量),因为函数式编程(变量不变),不拷贝状态容易被改变,数据量小(1、引用较小2、数据本身小),变量大容易产生oom(task拷贝数据 在内存中运行),网络传输慢,要提前,冗余、共享,减少通信。 广播变量: 广播变量允许程序员将一个只读的变量缓存在每台机器上,而不用在任务之间传递变量。广播变量可被用于有效地给每个节点一个大输入数据集的副本。Spark还尝试使用高效地广播算法来分发变量,进而减少通信的开销。 Spark的动作通过一系列的步骤执行,这些步骤由分布式的洗牌操作分开。Spark自动地广播每个步骤每个任务需要的通用数据。这些广播数据被序列化地缓存,在运行任务之前被反序列化出来。这意味着当我们需要在多个阶段的任务之间使用相同的数据,或者以反序列化形式缓存数据是十分重要的时候,显式地创建广播变量才有用。 (本段摘自:http://blog.csdn.net/happyanger6/article/details/46576831) 广播工作机制图 广播工作机制图 参考: http://blog.csdn.net/kxr0502/article/details/50574561 问:读广播,会消耗网络传输吗? 答:不消耗,广播是放在内存中。读取它,不消耗。 问:广播变量是不是就是向每一个executor,广播一份数据,而不是向每一个task,广播一份数据?这样对吗? 答:对 广播是由Driver发给当前Application分配的所有Executor内存级别的全局只读变量,Executor中的线程池中的线程共享该全局变量,极大的减少了网络传输(否则的话每个Task都要传输一次该变量)并极大的节省了内存,当然也隐形的提高的CPU的有效工作。 实战创建广播: scala>val number = 10 number: Int = 10 scala>val broadcastNumber = sc.broadcast(number) 16/09/29 17:26:47 INFO storage.MemoryStore: ensureFreeSpace(40) called with curMem=1782734, maxMem=560497950 16/09/29 17:26:47 INFO storage.MemoryStore: Block broadcast_38 stored as values in memory (estimated size 40.0 B, free 532.8 MB) 16/09/29 17:26:48 INFO storage.MemoryStore: ensureFreeSpace(97) called with curMem=1782774, maxMem=560497950 16/09/29 17:26:48 INFO storage.MemoryStore: Block broadcast_38_piece0 stored as bytes in memory (estimated size 97.0 B, free 532.8 MB) 16/09/29 17:26:48 INFO storage.BlockManagerInfo: Added broadcast_38_piece0 in memory on 192.168.80.128:40914 (size: 97.0 B, free: 534.4 MB) 16/09/29 17:26:48 INFO spark.SparkContext: Created broadcast 38 from broadcast at <console>:23broadcastNumber: org.apache.spark.broadcast.Broadcast[Int] = Broadcast(38) scala> val data = sc.parallelize <console>:21: error: missing arguments for method parallelize in class SparkContext; follow this method with `_' if you want to treat it as a partially applied function val data = sc.parallelize ^ scala>val data = sc.parallelize(1 to 100) data: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[61] at parallelize at <console>:21 scala>val bn = data.map(_* broadcastNumber.value) bn: org.apache.spark.rdd.RDD[Int] = MapPartitionsRDD[62] at map at <console>:27 scala> 我们知道,是test是要广播变量,但,我们编程,对rdd。 //通过在一个变量v上调用SparkContext.broadcast(v)可以创建广播变量。广播变量是围绕着v的封装,可以通过value方法访问这个变量。 问:广播变量里有很多变量吗? 答:当然可以有很多,用java bin或scala封装,就可以了。 如,在这里。广播变量是,broadcastNumber, 里,有变量value等。 scala>val broadcastNumber = sc.broadcast(number) scala>val bn = data.map(_* broadcastNumber.value) scala>bn.collect res12: Array[Int] = Array(10, 20, 30, 40, 50, 60, 70, 80, 90, 100, 110, 120, 130, 140, 150, 160, 170, 180, 190, 200, 210, 220, 230, 240, 250, 260, 270, 280, 290, 300, 310, 320, 330, 340, 350, 360, 370, 380, 390, 400, 410, 420, 430, 440, 450, 460, 470, 480, 490, 500, 510, 520, 530, 540, 550, 560, 570, 580, 590, 600, 610, 620, 630, 640, 650, 660, 670, 680, 690, 700, 710, 720, 730, 740, 750, 760, 770, 780, 790, 800, 810, 820, 830, 840, 850, 860, 870, 880, 890, 900, 910, 920, 930, 940, 950, 960, 970, 980, 990, 1000) scala> 由此,可见,通过机制、流程图和实战,深度剖析对广播全面详解! broadcast源码分析 参考:http://www.cnblogs.com/seaspring/p/5682053.html BroadcastManager源码 /* * Licensed to the Apache Software Foundation (ASF) under one or more * contributor license agreements. See the NOTICE file distributed with * this work for additional information regarding copyright ownership. * The ASF licenses this file to You under the Apache License, Version 2.0 * (the "License"); you may not use this file except in compliance with * the License. You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. */ package org.apache.spark.broadcast import java.util.concurrent.atomic.AtomicLong import scala.reflect.ClassTag import org.apache.spark._ import org.apache.spark.util.Utils private[spark] class BroadcastManager( val isDriver: Boolean, conf: SparkConf, securityManager: SecurityManager) extends Logging { private var initialized = false private var broadcastFactory: BroadcastFactory = null initialize() // Called by SparkContext or Executor before using Broadcast private def initialize() { synchronized { if (!initialized) { val broadcastFactoryClass = conf.get("spark.broadcast.factory", "org.apache.spark.broadcast.TorrentBroadcastFactory") broadcastFactory = Utils.classForName(broadcastFactoryClass).newInstance.asInstanceOf[BroadcastFactory] // Initialize appropriate BroadcastFactory and BroadcastObject broadcastFactory.initialize(isDriver, conf, securityManager) initialized = true } } } def stop() { broadcastFactory.stop() } private val nextBroadcastId = new AtomicLong(0) def newBroadcast[T: ClassTag](value_ : T, isLocal: Boolean): Broadcast[T] = { broadcastFactory.newBroadcast[T](value_, isLocal, nextBroadcastId.getAndIncrement()) } def unbroadcast(id: Long, removeFromDriver: Boolean, blocking: Boolean) { broadcastFactory.unbroadcast(id, removeFromDriver, blocking) } } Broadcast源码 /* * Licensed to the Apache Software Foundation (ASF) under one or more * contributor license agreements. See the NOTICE file distributed with * this work for additional information regarding copyright ownership. * The ASF licenses this file to You under the Apache License, Version 2.0 * (the "License"); you may not use this file except in compliance with * the License. You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. */ package org.apache.spark.broadcast import java.io.Serializable import org.apache.spark.SparkException import org.apache.spark.Logging import org.apache.spark.util.Utils import scala.reflect.ClassTag /** * A broadcast variable. Broadcast variables allow the programmer to keep a read-only variable * cached on each machine rather than shipping a copy of it with tasks. They can be used, for * example, to give every node a copy of a large input dataset in an efficient manner. Spark also * attempts to distribute broadcast variables using efficient broadcast algorithms to reduce * communication cost. * * Broadcast variables are created from a variable `v` by calling * [[org.apache.spark.SparkContext#broadcast]]. * The broadcast variable is a wrapper around `v`, and its value can be accessed by calling the * `value` method. The interpreter session below shows this: * * {{{ * scala> val broadcastVar = sc.broadcast(Array(1, 2, 3)) * broadcastVar: org.apache.spark.broadcast.Broadcast[Array[Int]] = Broadcast(0) * * scala> broadcastVar.value * res0: Array[Int] = Array(1, 2, 3) * }}} * * After the broadcast variable is created, it should be used instead of the value `v` in any * functions run on the cluster so that `v` is not shipped to the nodes more than once. * In addition, the object `v` should not be modified after it is broadcast in order to ensure * that all nodes get the same value of the broadcast variable (e.g. if the variable is shipped * to a new node later). * * @param id A unique identifier for the broadcast variable. * @tparam T Type of the data contained in the broadcast variable. */ abstract class Broadcast[T: ClassTag](val id: Long) extends Serializable with Logging { /** * Flag signifying whether the broadcast variable is valid * (that is, not already destroyed) or not. */ @volatile private var _isValid = true private var _destroySite = "" /** Get the broadcasted value. */ def value: T = { assertValid() getValue() } /** * Asynchronously delete cached copies of this broadcast on the executors. * If the broadcast is used after this is called, it will need to be re-sent to each executor. */ def unpersist() { unpersist(blocking = false) } /** * Delete cached copies of this broadcast on the executors. If the broadcast is used after * this is called, it will need to be re-sent to each executor. * @param blocking Whether to block until unpersisting has completed */ def unpersist(blocking: Boolean) { assertValid() doUnpersist(blocking) } /** * Destroy all data and metadata related to this broadcast variable. Use this with caution; * once a broadcast variable has been destroyed, it cannot be used again. * This method blocks until destroy has completed */ def destroy() { destroy(blocking = true) } /** * Destroy all data and metadata related to this broadcast variable. Use this with caution; * once a broadcast variable has been destroyed, it cannot be used again. * @param blocking Whether to block until destroy has completed */ private[spark] def destroy(blocking: Boolean) { assertValid() _isValid = false _destroySite = Utils.getCallSite().shortForm logInfo("Destroying %s (from %s)".format(toString, _destroySite)) doDestroy(blocking) } /** * Whether this Broadcast is actually usable. This should be false once persisted state is * removed from the driver. */ private[spark] def isValid: Boolean = { _isValid } /** * Actually get the broadcasted value. Concrete implementations of Broadcast class must * define their own way to get the value. */ protected def getValue(): T /** * Actually unpersist the broadcasted value on the executors. Concrete implementations of * Broadcast class must define their own logic to unpersist their own data. */ protected def doUnpersist(blocking: Boolean) /** * Actually destroy all data and metadata related to this broadcast variable. * Implementation of Broadcast class must define their own logic to destroy their own * state. */ protected def doDestroy(blocking: Boolean) /** Check if this broadcast is valid. If not valid, exception is thrown. */ protected def assertValid() { if (!_isValid) { throw new SparkException( "Attempted to use %s after it was destroyed (%s) ".format(toString, _destroySite)) } } override def toString: String = "Broadcast(" + id + ")" } 其他的,不一一赘述了。 3、累加器 为什么需要,累加器? 答:第一种情况,是,test把数据副本运行起来。 第二种情况,有全局变量和局部变量,有了广播,为什么还需要累加器? (3)累加器(获取全局唯一的状态对象,SparkContext创建,被Driver控制,在Text实际运行的时候,每次都可以保证修改之后获取全局唯一的对象,Driver中可读,Executor可读) 累加器是仅仅被相关操作累加的变量,因此可以在并行中被有效地支持。它可以被用来实现计数器和总和。Spark原生地只支持数字类型的累加器,编程者可以添加新类型的支持。如果创建累加器时指定了名字,可以在Spark的UI界面看到。这有利于理解每个执行阶段的进程。(对于python还不支持) 累加器通过对一个初始化了的变量v调用SparkContext.accumulator(v)来创建。在集群上运行的任务可以通过add或者"+="方法在累加器上进行累加操作。但是,它们不能读取它的值。只有驱动程序能够读取它的值,通过累加器的value方法。 累加器的特征:全局的,Accumulator:对于Executor只能修改但不可读,只对Driver可读(因为通过Driver控制整个集群的状态),不同的executor 修改,不会彼此覆盖(枷锁机制) 累加器实战: scala>val sum = sc.accumulator(0) sum: org.apache.spark.Accumulator[Int] = 0 scala>val data = sc.parallelize(1 to 100) data: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[63] at parallelize at <console>:21 scala>val result = data.foreach(item =>sum += item) took 6.548568 s result: Unit = () scala> println(sum) 5050 累加器在记录集群全局唯一的状态的时候极其重要,保持唯一的全局状态的变量,所以其重要性不言而喻。 Driver中取值,Executor中计算, 1、累计器全局(全集群)唯一,只增不减(Executor中的task去修改,即累加); 2、累加器是Executor共享; 我的理解应该是对的,集群全局变量,谁操作,从driver上拿去操作,然后下个Executor在用的时候,拿上个Executor执行的结果,也就是从Driver那里拿。 accumulator源码 /* * Licensed to the Apache Software Foundation (ASF) under one or more * contributor license agreements. See the NOTICE file distributed with * this work for additional information regarding copyright ownership. * The ASF licenses this file to You under the Apache License, Version 2.0 * (the "License"); you may not use this file except in compliance with * the License. You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. */ package org.apache.spark import java.io.{ObjectInputStream, Serializable} import scala.collection.generic.Growable import scala.collection.Map import scala.collection.mutable import scala.ref.WeakReference import scala.reflect.ClassTag import org.apache.spark.serializer.JavaSerializer import org.apache.spark.util.Utils /** * A data type that can be accumulated, ie has an commutative and associative "add" operation, * but where the result type, `R`, may be different from the element type being added, `T`. * * You must define how to add data, and how to merge two of these together. For some data types, * such as a counter, these might be the same operation. In that case, you can use the simpler * [[org.apache.spark.Accumulator]]. They won't always be the same, though -- e.g., imagine you are * accumulating a set. You will add items to the set, and you will union two sets together. * * @param initialValue initial value of accumulator * @param param helper object defining how to add elements of type `R` and `T` * @param name human-readable name for use in Spark's web UI * @param internal if this [[Accumulable]] is internal. Internal [[Accumulable]]s will be reported * to the driver via heartbeats. For internal [[Accumulable]]s, `R` must be * thread safe so that they can be reported correctly. * @tparam R the full accumulated data (result type) * @tparam T partial data that can be added in */ class Accumulable[R, T] private[spark] ( @transient initialValue: R, param: AccumulableParam[R, T], val name: Option[String], internal: Boolean) extends Serializable { private[spark] def this( @transient initialValue: R, param: AccumulableParam[R, T], internal: Boolean) = { this(initialValue, param, None, internal) } def this(@transient initialValue: R, param: AccumulableParam[R, T], name: Option[String]) = this(initialValue, param, name, false) def this(@transient initialValue: R, param: AccumulableParam[R, T]) = this(initialValue, param, None) val id: Long = Accumulators.newId @volatile @transient private var value_ : R = initialValue // Current value on master val zero = param.zero(initialValue) // Zero value to be passed to workers private var deserialized = false Accumulators.register(this) /** * If this [[Accumulable]] is internal. Internal [[Accumulable]]s will be reported to the driver * via heartbeats. For internal [[Accumulable]]s, `R` must be thread safe so that they can be * reported correctly. */ private[spark] def isInternal: Boolean = internal /** * Add more data to this accumulator / accumulable * @param term the data to add */ def += (term: T) { value_ = param.addAccumulator(value_, term) } /** * Add more data to this accumulator / accumulable * @param term the data to add */ def add(term: T) { value_ = param.addAccumulator(value_, term) } /** * Merge two accumulable objects together * * Normally, a user will not want to use this version, but will instead call `+=`. * @param term the other `R` that will get merged with this */ def ++= (term: R) { value_ = param.addInPlace(value_, term)} /** * Merge two accumulable objects together * * Normally, a user will not want to use this version, but will instead call `add`. * @param term the other `R` that will get merged with this */ def merge(term: R) { value_ = param.addInPlace(value_, term)} /** * Access the accumulator's current value; only allowed on master. */ def value: R = { if (!deserialized) { value_ } else { throw new UnsupportedOperationException("Can't read accumulator value in task") } } /** * Get the current value of this accumulator from within a task. * * This is NOT the global value of the accumulator. To get the global value after a * completed operation on the dataset, call `value`. * * The typical use of this method is to directly mutate the local value, eg., to add * an element to a Set. */ def localValue: R = value_ /** * Set the accumulator's value; only allowed on master. */ def value_= (newValue: R) { if (!deserialized) { value_ = newValue } else { throw new UnsupportedOperationException("Can't assign accumulator value in task") } } /** * Set the accumulator's value; only allowed on master */ def setValue(newValue: R) { this.value = newValue } // Called by Java when deserializing an object private def readObject(in: ObjectInputStream): Unit = Utils.tryOrIOException { in.defaultReadObject() value_ = zero deserialized = true // Automatically register the accumulator when it is deserialized with the task closure. // // Note internal accumulators sent with task are deserialized before the TaskContext is created // and are registered in the TaskContext constructor. Other internal accumulators, such SQL // metrics, still need to register here. val taskContext = TaskContext.get() if (taskContext != null) { taskContext.registerAccumulator(this) } } override def toString: String = if (value_ == null) "null" else value_.toString } /** * Helper object defining how to accumulate values of a particular type. An implicit * AccumulableParam needs to be available when you create [[Accumulable]]s of a specific type. * * @tparam R the full accumulated data (result type) * @tparam T partial data that can be added in */ trait AccumulableParam[R, T] extends Serializable { /** * Add additional data to the accumulator value. Is allowed to modify and return `r` * for efficiency (to avoid allocating objects). * * @param r the current value of the accumulator * @param t the data to be added to the accumulator * @return the new value of the accumulator */ def addAccumulator(r: R, t: T): R /** * Merge two accumulated values together. Is allowed to modify and return the first value * for efficiency (to avoid allocating objects). * * @param r1 one set of accumulated data * @param r2 another set of accumulated data * @return both data sets merged together */ def addInPlace(r1: R, r2: R): R /** * Return the "zero" (identity) value for an accumulator type, given its initial value. For * example, if R was a vector of N dimensions, this would return a vector of N zeroes. */ def zero(initialValue: R): R } private[spark] class GrowableAccumulableParam[R <% Growable[T] with TraversableOnce[T] with Serializable: ClassTag, T] extends AccumulableParam[R, T] { def addAccumulator(growable: R, elem: T): R = { growable += elem growable } def addInPlace(t1: R, t2: R): R = { t1 ++= t2 t1 } def zero(initialValue: R): R = { // We need to clone initialValue, but it's hard to specify that R should also be Cloneable. // Instead we'll serialize it to a buffer and load it back. val ser = new JavaSerializer(new SparkConf(false)).newInstance() val copy = ser.deserialize[R](ser.serialize(initialValue)) copy.clear() // In case it contained stuff copy } } /** * A simpler value of [[Accumulable]] where the result type being accumulated is the same * as the types of elements being merged, i.e. variables that are only "added" to through an * associative operation and can therefore be efficiently supported in parallel. They can be used * to implement counters (as in MapReduce) or sums. Spark natively supports accumulators of numeric * value types, and programmers can add support for new types. * * An accumulator is created from an initial value `v` by calling [[SparkContext#accumulator]]. * Tasks running on the cluster can then add to it using the [[Accumulable#+=]] operator. * However, they cannot read its value. Only the driver program can read the accumulator's value, * using its value method. * * The interpreter session below shows an accumulator being used to add up the elements of an array: * * {{{ * scala> val accum = sc.accumulator(0) * accum: spark.Accumulator[Int] = 0 * * scala> sc.parallelize(Array(1, 2, 3, 4)).foreach(x => accum += x) * ... * 10/09/29 18:41:08 INFO SparkContext: Tasks finished in 0.317106 s * * scala> accum.value * res2: Int = 10 * }}} * * @param initialValue initial value of accumulator * @param param helper object defining how to add elements of type `T` * @tparam T result type */ class Accumulator[T] private[spark] ( @transient private[spark] val initialValue: T, param: AccumulatorParam[T], name: Option[String], internal: Boolean) extends Accumulable[T, T](initialValue, param, name, internal) { def this(initialValue: T, param: AccumulatorParam[T], name: Option[String]) = { this(initialValue, param, name, false) } def this(initialValue: T, param: AccumulatorParam[T]) = { this(initialValue, param, None, false) } } /** * A simpler version of [[org.apache.spark.AccumulableParam]] where the only data type you can add * in is the same type as the accumulated value. An implicit AccumulatorParam object needs to be * available when you create Accumulators of a specific type. * * @tparam T type of value to accumulate */ trait AccumulatorParam[T] extends AccumulableParam[T, T] { def addAccumulator(t1: T, t2: T): T = { addInPlace(t1, t2) } } object AccumulatorParam { // The following implicit objects were in SparkContext before 1.2 and users had to // `import SparkContext._` to enable them. Now we move them here to make the compiler find // them automatically. However, as there are duplicate codes in SparkContext for backward // compatibility, please update them accordingly if you modify the following implicit objects. implicit object DoubleAccumulatorParam extends AccumulatorParam[Double] { def addInPlace(t1: Double, t2: Double): Double = t1 + t2 def zero(initialValue: Double): Double = 0.0 } implicit object IntAccumulatorParam extends AccumulatorParam[Int] { def addInPlace(t1: Int, t2: Int): Int = t1 + t2 def zero(initialValue: Int): Int = 0 } implicit object LongAccumulatorParam extends AccumulatorParam[Long] { def addInPlace(t1: Long, t2: Long): Long = t1 + t2 def zero(initialValue: Long): Long = 0L } implicit object FloatAccumulatorParam extends AccumulatorParam[Float] { def addInPlace(t1: Float, t2: Float): Float = t1 + t2 def zero(initialValue: Float): Float = 0f } // TODO: Add AccumulatorParams for other types, e.g. lists and strings } // TODO: The multi-thread support in accumulators is kind of lame; check // if there's a more intuitive way of doing it right private[spark] object Accumulators extends Logging { /** * This global map holds the original accumulator objects that are created on the driver. * It keeps weak references to these objects so that accumulators can be garbage-collected * once the RDDs and user-code that reference them are cleaned up. */ val originals = mutable.Map[Long, WeakReference[Accumulable[_, _]]]() private var lastId: Long = 0 def newId(): Long = synchronized { lastId += 1 lastId } def register(a: Accumulable[_, _]): Unit = synchronized { originals(a.id) = new WeakReference[Accumulable[_, _]](a) } def remove(accId: Long) { synchronized { originals.remove(accId) } } // Add values to the original accumulators with some given IDs def add(values: Map[Long, Any]): Unit = synchronized { for ((id, value) <- values) { if (originals.contains(id)) { // Since we are now storing weak references, we must check whether the underlying data // is valid. originals(id).get match { case Some(accum) => accum.asInstanceOf[Accumulable[Any, Any]] ++= value case None => throw new IllegalAccessError("Attempted to access garbage collected Accumulator.") } } else { logWarning(s"Ignoring accumulator update for unknown accumulator id $id") } } } } private[spark] object InternalAccumulator { val PEAK_EXECUTION_MEMORY = "peakExecutionMemory" val TEST_ACCUMULATOR = "testAccumulator" // For testing only. // This needs to be a def since we don't want to reuse the same accumulator across stages. private def maybeTestAccumulator: Option[Accumulator[Long]] = { if (sys.props.contains("spark.testing")) { Some(new Accumulator( 0L, AccumulatorParam.LongAccumulatorParam, Some(TEST_ACCUMULATOR), internal = true)) } else { None } } /** * Accumulators for tracking internal metrics. * * These accumulators are created with the stage such that all tasks in the stage will * add to the same set of accumulators. We do this to report the distribution of accumulator * values across all tasks within each stage. */ def create(sc: SparkContext): Seq[Accumulator[Long]] = { val internalAccumulators = Seq( // Execution memory refers to the memory used by internal data structures created // during shuffles, aggregations and joins. The value of this accumulator should be // approximately the sum of the peak sizes across all such data structures created // in this task. For SQL jobs, this only tracks all unsafe operators and ExternalSort. new Accumulator( 0L, AccumulatorParam.LongAccumulatorParam, Some(PEAK_EXECUTION_MEMORY), internal = true) ) ++ maybeTestAccumulator.toSeq internalAccumulators.foreach { accumulator => sc.cleaner.foreach(_.registerAccumulatorForCleanup(accumulator)) } internalAccumulators } } 本文转自大数据躺过的坑博客园博客,原文链接:http://www.cnblogs.com/zlslch/p/5914696.html,如需转载请自行联系原作者

优秀的个人博客,低调大师

《从零开始学Swift》学习笔记(Day 57)——Swift编码规范之注释规范:

前面说到Swift注释的语法有两种:单行注释(//)和多行注释(/*...*/)。这里来介绍一下他们的使用规范。 文件注释 文件注释就在每一个文件开头添加注释,文件注释通常包括如下信息:版权信息、文件名、所在模块、作者信息、历史版本信息、文件内容和作用等。 下面看一个文件注释的示例: 1 2 3 4 5 6 7 8 9 10 11 12 /* Copyright(C)2015EorientInc.AllRightsReserved. SeeLICENSE.txtforthissample’slicensinginformation Description: ThisfilecontainsthefoundationalsubclassofNSOperation. History: 15/7/22:CreatedbyTonyGuan. 15/8/20:Addsocketlibrary 15/8/22:Addmathlibrary */ 这个注释只是提供了版权信息、文件内容和历史版本信息等,文件注释要根据自己实际情况包括内容。 文档注释 文档注释就是这种注释内容能够生成API帮助文档。文档注释主要对类型、属性、方法或函数等功能。 文档注释是稍微将单行注释(//)和多行注释(/*...*/)做一点“手脚”后,就成为了文档注释,单行文档注释(///)和多行文档注释(/**...*/)。 下面代码示例: 1 2 3 4 5 6 7 8 9 10 11 12 import Foundation /** Theprotocolthattypesmayimplementiftheywishtobe notifiedofsignificantoperationlifecycleevents. */ protocolOperationObserver{ ///Invokedimmediatelypriortothe`Operation`'s`execute()`method. funcoperationDidStart(operation:Operation) } 代码中使用了文档注释。 可以使用一些工具将这些文档注释生成API文件 代码注释 程序代码中处理文档注释还需要在一些关键的地方添加代码注释,文档注释一般是给一些看不到源代码的人看的帮助文档,而代码注释是给阅读源代码人参考的。代码注释一般是采用单行注释(//)和多行注释(/*...*/)。 有的时候也会在代码的尾端进行注释,这要求注释内容极短,应该在有足够的空白来分开代码和注释。尾端注释示例代码如下: 1 2 3 init(timeout:NSTimeInterval){ self.timeout=timeout //初始化 } 使用地标注释 随着编码过程深入,工程代码量会增加,任何在这大量的代码中能快速找到需要方法或者是刚才修改过代码呢? 在Swift代码中使用地标注释,然后就可以使用Xcode工具在代码中快速查找了。地标注释有三个: MARK,用于方法或函数的注释。 TODO,表示这里代码有没有完成,还要处理。 FIXME,表示这里修改了代码。 这些注释会出现在Xcode的Jump Bar中。来看一个示例: 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 class ViewController:UIViewController, UITableViewDataSource,UITableViewDelegate{ varlistTeams:[[String:String]]! overridefuncviewDidLoad(){ super .viewDidLoad() ... } overridefuncdidReceiveMemoryWarning(){ super .didReceiveMemoryWarning() //TODO:释放资源//使用TODO注释 } //MARK:UITableViewDataSource协议方法//使用MARK注释 functableView(tableView:UITableView, numberOfRowsInSectionsection:Int)->Int{ return self.listTeams.count } functableView(tableView:UITableView, cellForRowAtIndexPathindexPath:NSIndexPath)->UITableViewCell{ letcellIdentifier= "CellIdentifier" letcell:UITableViewCell!=tableView .dequeueReusableCellWithIdentifier(cellIdentifier, forIndexPath:indexPath)as?UITableViewCell //FIXME:修改bug//使用了FIXME注释 letrow=indexPath.row letrowDict=self.listTeams[row]as[String:String] ... return cell } //MARK:UITableViewDelegate协议方法//使用MARK注释 functableView(tableView:UITableView, didSelectRowAtIndexPathindexPath:NSIndexPath){ ... } } 上述代码中使用三种地标注释,在使用时候后面要跟有一个冒号(:)。 注释之后如果使用呢?打开Xcode的Jump Bar,如下图,这些地标注释会在下拉列表中粗体显示,点击列表项就会跳转到注释行。 本文转自 tony关东升 51CTO博客,原文链接:http://blog.51cto.com/tonyguan/1748343,如需转载请自行联系原作者

优秀的个人博客,低调大师

Hadoop MapReduce概念学习系列之在MapReduce编程时,三大接口抉择(十六)

在编写MapReduce程序方面,可以直接调用Java API接口, 也可以通过Pipes接口使用C/C++编写并行程序, 还可以调用Streaming接口使用任何可以操作标准输入/输出的计算机编程语言来编写MapReduce应用程序。 本文转自大数据躺过的坑博客园博客,原文链接:http://www.cnblogs.com/zlslch/p/5453350.html,如需转载请自行联系原作者

优秀的个人博客,低调大师

Hadoop HDFS概念学习系列之HDFS的一致性(十八)

对于一致性,可以分为从客户端和服务端两个不同的视角。 从客户端来看,一致性主要指的是多并发访问时更新过的数据如何获取的问题。从服务端来看,则是更新如何复制分布到整个系统,以保证数据最终一致。一致性是因为有并发读写才有的问题,因此在理解一致性的问题时,一定要注意结合考虑并发读写的场景。 从客户端角度,多进程并发访问时,更新过的数据在不同进程如何获取的不同策略,决定了不同的一致性。对于关系型数据库,要求更新过的数据能被后续的访问都能看到,这是强一致性。如果能容忍后续的部分或者全部访问不到,则是弱一致性。如果经过一段时间后要求能访问到更新后的数据,则是最终一致性 从服务端角度,如何尽快将更新后的数据分布到整个系统,降低达到最终一致性的时间窗口,是提高系统的可用度和用户体验非常重要的方面。对于分布式数据系统: N — 数据复制的份数 W — 更新数据时需要保证写完成的节点数 R — 读取数据的时候需要读取的节点数 如果W+R>N,写的节点和读的节点重叠,则是强一致性。例如对于典型的一主一备同步复制的关系型数据库,N=2,W=2,R=1,则不管读的是主库还是备库的数据,都是一致的。 如果W+R<=N,则是弱一致性。例如对于一主一备异步复制的关系型数据库,N=2,W=1,R=1,则如果读的是备库,就可能无法读取主库已经更新过的数据,所以是弱一致性。 对于分布式系统,为了保证高可用性,一般设置N>=3。不同的N,W,R组合,是在可用性和一致性之间取一个平衡,以适应不同的应用场景。 如果N=W,R=1,任何一个写节点失效,都会导致写失败,因此可用性会降低,但是由于数据分布的N个节点是同步写入的,因此可以保证强一致性。 如果N=R,W=1,只需要一个节点写入成功即可,写性能和可用性都比较高。但是读取其他节点的进程可能不能获取更新后的数据,因此是弱一致性。这种情况下,如果W<(N+1)/2,并且写入的节点不重叠的话,则会存在写冲突 文件系统的一致性模型描述了文件读/写的可见性。 HDFS牺牲了一些POSIX的需求来补偿性能,所以有些操作可能会和传统的文件系统不同。 当创建一个文件时,它在文件系统的命名空间中是可见的,代码如下: Path p = new Path("p"); fs.create(p); assertThat(fs.exists(p),is(true)); 但是对这个文件的任何写操作不保证是可见的,即使在数据流已经刷新的情况下,文件的长度很长时间也会显示为0 : Path p = new Path("p"); OutputStream out = fs.create(p); out.write("content".getBytes("UTF-8")); out.flush(); assertThat(fs.getFileStatus(p),getLen(),is(0L)); 一旦一个数据块写人成功,那么大家提出的新请求就可以看到这个块,而对当前写入的块,大家是看不见的。HDFS提供了使所有缓存和DataNode之间的数据强制同步的方法,这个方法是FSDataOutputStream中的sync()函数。当sync()函数返回成功时,HDFS就可以保证此时写入的文件数据是一致的并且对于所有新的用户都是可见的。即使HDFS客户端之间发生冲突,也会导致数据丢失,代码如下: Path p = new Path("p"); FSDataOutputStream out = fs.create(p); out.write("content".getBytes("UTF-8")); out.flush(); out.sync(); assertThat(fs.getFileStatus(p),getLen(),is(((long) "content" .length())); 这个操作类似于UNIX系统中的fsync系统调用,为一个文件描述符提交缓存数据,利用Java API写入本地数据,这样就可以保证看到刷新流并且同步之后的数据,代码如下: FileOutputStream out = new FileOutStream(localFile); out.write("content".getBytes("UTF-8")); out.flush(); // flush to operatig system out.getFD().sync(); // sync to disk assertThat(fs.getFileStatus(p),getLen(),is(((long) "content" .length())); 在HDFS中关闭一个文件也隐式地执行了sync()函数,代码如下: Path p = new Path("p"); OutputStream out = fs.create(p); out.write("content".getBytes("UTF-8")); out.close(); assertThat(fs.getFileStatus(p),getLen(),is(((long) "content" .length())); 下面来了解一致性模型对应用设计的重要性。文件系统的一致性与设计应用程序的方法有关。如果不调用sync(),那么需要做好因客户端或者系统发生故障而丢失部分数据做好准备。对大多数应用程序来说,这是不可接受的,所以需要在合适的地方调用sync(),比如在写入一定量的数据之后。尽管sync()被设计用来最大限度地减少HDFS的负担,但是它仍然有不可忽视的开销,所以需要在数据健壮性和吞吐最之间做好权衡,其中一个较好的参考平衡点就是:通过测试应用程序来选择不同sync()频率间的最佳平衡点。 本文转自大数据躺过的坑博客园博客,原文链接:http://www.cnblogs.com/zlslch/p/5140724.html,如需转载请自行联系原作者

优秀的个人博客,低调大师

Hadoop Hive概念学习系列之hive里的优化和高级功能(十四)

在一些特定的业务场景下,使用hive默认的配置对数据进行分析,虽然默认的配置能够实现业务需求,但是分析效率可能会很低。 Hive有针对性地对不同的查询进行了优化。在Hive里可以通过修改配置的方式进行优化。 以下,几种方式调优的属性。 1、列裁剪 在通过Hive读取数据的时候,并不是所有的需求都要获取表内的所有的数据。有些只需要读取所有列中的几列,而忽略其他列的的数据。 例如,表Table1包含5个列Column1、Column2、Column3、Column4、Column5。下面的语句只会在表Table1中读取Column1、Column2、Column5三列, Column3和Column4将被忽略。 SELECTColumn1,Column2 FROM Table1 WHEREColumn5<1000; 列裁剪的设置为hive.optimize.cp,默认为true。 2、分区裁剪 在Hive中,可以根据多个维度对Hive表进行分区操作,且分区也可以多层嵌套。当有需要对目标表的某一个区域内的数据进行分析而不需要设计其他区域时,可以使用分区裁剪,将目标区域以条件的形式放在HiveQL中。 SELECT * FROM(SELECT c1 FROM T GROUP BY c1) idi WHERE idi.prtn = 100; SELECT * FROM T1 JOIN(SELECT * FROM T2) idi ON (T1.c1=idi.c2) WHERE idi.prtn=100; 以上语句在执行时,会在子查询中考虑idi.prtn=100这个条件,从而减少读入的数据的分区数目。分区裁剪的配置为hive.optimize.pruner,默认为hive。 3、join Hive同样支持join多表连接查询,例如内连接、外连接(左外连接、右外连接、全外连接)、半连接等。对于一条语句中有多个join的情况,当join的条件相同时,即使有多张表,都会合并为一个MapReduce,并不是多个MapReduce。 SELECT pv.ppid, u.name FROM propertie_view pc JOIN user u ON(pv.userid = u.userid) JOIN usergroup x ON(u.userid = x.userid) 如果join的条件不相同,MapReduce的任务数目和join操作的数据是对应的,例如: SELECT pv.ppid, u.name FROM propertie_view pv JOIN user u ON(pv.userid = u.userid) JOIN usergroup x ON(u.age = x.age) 在使用写有join操作的查询语句时,由于在join操作的Reduce阶段,join执行时会将join操作符左边的表的内容加载进内存,所以写语句时应将条目少的表/子查询放在join操作符的左边,这样可以有效减少内存溢出错误的几率。 4、MapJoin MapJoin的合理使用同样能起到调优的效果。在实际的应用中有可能遇到join执行效率很低甚至不能执行的状况。例如,需要做不等值join,或者在join时有一个表极小。由于MapJoin会把小表全部读入内存中,Join操作在Map阶段完成,在Map阶段直接将另外一个表的数据和内存中表数据做匹配,所以不会对任务进行速度产生很大影响。即使笛卡尔积也是如此。 例如: SELECT /*+ MAPJOIN(pc) */ pv.ppid, u.name FROM propertie_view pv JOIN user u ON (pv.userid=u.userid) 5、Group By优化 (1)Map端局部聚合 众所周知,MapReduce计算框架中Reduce起到聚合操作的作用,但并不是所有的聚合操作都需要在Reduce端完成,很多聚合操作可以先在Map端进行局部聚合,最后在Reduce端做全局聚合得出最终结果。是否在Map端进行聚合由hive.map.agger控制,默认为True。 而hive.groupby.mapagger.checkinterval用于控制在Map端进行聚合操作的条目数目,默认为100000。 (2)数据倾斜 有数据倾斜的时候需要进行负载均衡。是否需要进行负载均衡由hive.groupby.skewindata控制,默认为false。 当选项设定为true时,编译器生成的查询计划会有两个MR Job。 第一个MR Job中的Map的输出结果集合会随机分布到第一个MR Job的Reduce中,每个Reduce做局部聚合操作作为输出结果,这样处理的结果是相同的Group By Key有可能被分发到不同的Reduce中,从而实现负载均衡的目的; 第二个MR Job再根据预处理的数据结果按照Group By Key分布到相应的Reduce中,最后完成最终的聚合操作。 Hive性能调优 1、优化的常用手段 2、Hive的数据类型方面的优化 3、Hive的数据类型方面的优化 什么是数据倾斜? 由于数据的不均衡原因,导致数据分布不均匀,造成数据大量的集中到一点,造成数据热点 Hadoop框架的特性 不怕数据大,怕数据倾斜 jobs数比较多的作业运行效率相对比较低,比如即使有几百行的表,如果多次关联多次汇总,产生十几个jobs,耗时很长。 原因是map reduce作业初始化的时间是比较长的 sum,count,max,min等UDAF,不怕数据倾斜问题,hadoop在map端的汇总合并优化,使数据倾斜不成问题 count(distinct ),在数据量大的情况下,效率较低,因为count(distinct)是按group by 字段分组,按distinct字段排序,一般这种分布方式是很倾斜的 1、优化的常用手段 解决数据倾斜问题 减少job数 设置合理的map reduce的task数,能有效提升性能。 了解数据分布,自己动手解决数据倾斜问题是个不错的选择 数据量较大的情况下,慎用count(distinct)。 对小文件进行合并,是行至有效的提高调度效率的方法。 优化时把握整体,单个作业最优不如整体最优。 2、Hive的数据类型方面的优化 优化原则 合理的设置Buckets。在一些大数据join的情况下,map join有时候会内存不够。 如果使用Bucket Map Join的话,可以只把其中的一个bucket放到内存中,内存中原来放不下的内存表就变得可以放下。 这需要使用buckets的键进行join的条件连结,并且需要如下设置 set hive.optimize.bucketmapjoin = true 3、Hive的操作方面的优化 3.1 全排序 Hive的排序关键字是SORT BY, 它有意区别于传统数据库的ORDER BY也是为了强调两者的区别–SORT BY只能在单机范围内排序。 3.2 做笛卡尔积 当Hive设定为严格模式(hive.mapred.mode=strict)时,不允许在HQL语句中出现笛卡尔积 MapJoin是的解决办法 MapJoin,顾名思义,会在Map端完成Join操作。这需要将Join操作的一个或多个表完全读入内存 MapJoin的用法是在查询/子查询的SELECT关键字后面添加/*+ MAPJOIN(tablelist) */提示优化器转化为MapJoin(目前Hive的优化器不能自动优化MapJoin) 其中tablelist可以是一个表,或以逗号连接的表的列表。tablelist中的表将会读入内存,应该将小表写在这里 在大表和小表做笛卡尔积时,规避笛卡尔积的方法是,给Join添加一个Join key,原理很简单:将小表扩充一列join key,并将小表的条目复制数倍,join key各不相同;将大表扩充一列join key为随机数。 3.3 控制Hive的Map数 通常情况下,作业会通过input的目录产生一个或者多个map任务 主要的决定因素有: input的文件总个数, input的文件大小, 集群设置的文件块大小(目前为128M, 可在hive中通过set dfs.block.size;命令查看到,该参数不能自定义修改) 是不是map数越多越好? 答案是否定的。如果一个任务有很多小文件(远远小于块大小128m),则每个小文件也会被当做一个块,用一个map任务来完成,而一个map任务启动和初始化的时间远远大于逻辑处理的时间,就会造成很大的资源浪费。而且,同时可执行的map数是受限的。 是不是保证每个map处理接近128m的文件块,就高枕无忧了? 答案也是不一定。比如有一个127m的文件,正常会用一个map去完成,但这个文件只有一个或者两个小字段,却有几千万的记录,如果map处理的逻辑比较复杂,用一个map任务去做,肯定也比较耗时。 针对上面的问题3和4,我们需要采取两种方式来解决:即减少map数和增加map数; 举例 a) 假设input目录下有1个文件a,大小为780M,那么hadoop会将该文件a分隔成7个块(6个128m的块和1个12m的块),从而产生7个map数 b)假设input目录下有3个文件a,b,c,大小分别为10m,20m,130m,那么hadoop会分隔成4个块(10m,20m,128m,2m),从而产生4个map数 即如果文件大于块大小(128m),那么会拆分,如果小于块大小,则把该文件当成一个块 3.4 决定reducer个数 Hadoop MapReduce程序中,reducer个数的设定极大影响执行效率 不指定reducer个数的情况下,Hive会猜测确定一个reducer个数,基于以下两个设定: 参数1:hive.exec.reducers.bytes.per.reducer(默认为1G) 参数2 :hive.exec.reducers.max(默认为999) 计算reducer数的公式 N=min(参数2,总输入数据量/参数1) 依据Hadoop的经验,可以将参数2设定为0.95*(集群中TaskTracker个数) reduce个数并不是越多越好 同map一样,启动和初始化reduce也会消耗时间和资源; 另外,有多少个reduce,就会有多少个输出文件,如果生成了很多个小文件,那么如果这些小文件作为下一个任务的输入,则也会出现小文件过多的问题。 什么情况下只有一个reduce? 很多时候你会发现任务中不管数据量多大,不管你有没有设置调整reduce个数的参数,任务中一直都只有一个reduce任务; 其实只有一个reduce任务的情况,除了数据量小于hive.exec.reducers.bytes.per.reducer参数值的情况外,还有以下原因: a)没有group by的汇总 b)用了Order by 3.5 合并 MapReduce 操作 Multi-group by Multi-group by是Hive的一个非常好的特性,它使得Hive中利用中间结果变得非常方便。 FROM log insert overwrite table test1 select log.id group by log.id insert overwrite table test2 select log.name group by log.name 上述查询语句使用了Multi-group by特性连续group by了2次数据,使用不同的group by key。这一特性可以减少一次MapReduce操作。 Bucket 与 Sampling Bucket是指将数据以指定列的值为key进行hash,hash到指定数目的桶中。这样就可以支持高效采样了。 Sampling可以在全体数据上进行采样,这样效率自然就低,它还是要去访问所有数据。而如果一个表已经对某一列制作了bucket,就可以采样所有桶中指定序号的某个桶,这就减少了访问量。 如下例所示就是采样了test中32个桶中的第三个桶。 SELECT * FROM test 、、、TABLESAMPLE(BUCKET 3 OUT OF 32); 3.6 JOIN 原则 在使用写有 Join 操作的查询语句时有一条原则:应该将条目少的表/子查询放在 Join 操作符的左边 原因是在 Join 操作的 Reduce 阶段,位于 Join 操作符左边的表的内容会被加载进内存,将条目少的表放在左边,可以有效减少发生 OOM 错误的几率。 Map Join Join 操作在 Map 阶段完成,不再需要Reduce,前提条件是需要的数据在 Map 的过程中可以访问到。 例如: INSERT OVERWRITE TABLE phone_traffic SELECT /*+ MAPJOIN(phone_location) */ l.phone,p.location,l.traffic from phone_location p join log l on (p.phone=l.phone) 相关的参数为: hive.join.emit.interval = 1000 How many rows in the right-most join operand Hive should buffer before emitting the join result. hive.mapjoin.size.key = 10000 hive.mapjoin.cache.numrows = 10000 3.7 Group By Map 端部分聚合 并不是所有的聚合操作都需要在 Reduce 端完成,很多聚合操作都可以先在 Map 端进行部分聚合,最后在 Reduce 端得出最终结果基于 Hash 参数包括: hive.map.aggr = true 是否在 Map 端进行聚合,默认为 True hive.groupby.mapaggr.checkinterval = 100000 在 Map 端进行聚合操作的条目数目。 有数据倾斜的时候进行负载均衡 hive.groupby.skewindata = false 当选项设定为 true,生成的查询计划会有两个 MR Job。第一个 MR Job 中, Map 的输出结果集合会随机分布到 Reduce 中,每个 Reduce 做部分聚合操作,并输出结果, 这样处理的结果是相同的 Group By Key 有可能被分发到不同的 Reduce 中,从而达到负载均衡的目的; 第二个 MR Job 再根据预处理的数据结果按照 Group By Key 分布到 Reduce 中(这个过程可以保证相同的 Group By Key 被分布到同一个 Reduce 中), 最后完成最终的聚合操作。 3.8 合并小文件 文件数目过多,会给 HDFS 带来压力,并且会影响处理效率,可以通过合并 Map 和 Reduce 的结果文件来消除这样的影响: hive.merge.mapfiles = true 是否和并 Map 输出文件,默认为 True hive.merge.mapredfiles = false 是否合并 Reduce 输出文件,默认为 False hive.merge.size.per.task = 256*1000*1000 合并文件的大小 本文转自大数据躺过的坑博客园博客,原文链接:http://www.cnblogs.com/zlslch/p/6105389.html,如需转载请自行联系原作者

优秀的个人博客,低调大师

Hadoop HDFS概念学习系列之NameNode和Secondary NameNode通信模型(十一)

NameNode将对文件系统的改动追加保存到本地文件系统上的一个日志文件edits。当一个NameNode启动时,它首先从一个映像文件(fsimage)中读取HDFS的状态,接着执行日志文件中的编辑操作。然后将新的HDFS状态写人fsimage中,井使用个空的edits文件开始正常操作。因为NameNode只有在启动阶段才合并fsimage和edits,久而久之日志文件可能会变得非常庞大,特别是对于大型的集群。日志文件太大的另一个副作用是下一次NameNode启动会很长世间,NameNode和Secondary NameNode之间的通信示意图如图所示。 如上图所示。NameNode和Secondary NameNode间数据的通信使用的是HTTP协议,Secondary NameNode定期合并fsimage和edits日志,将edits日志文件大小控制在一个限度下。因为内存需求和NameNode在一个数量级上、所以通常Secondary NameNode和NameNode运行在不同的机器上。Secondary NameNode通过bin/start-dfs.sh在conf/masters中指定的节点上启动。 Secondary NameNode的检查点进程启动,是由以下两个配置参数控制的: (1)fs.checkpoint.period指定连续两次检查点的最大时间间隔,默认值是1小时。 (2)fs.checkpoint.size定义了日志文件的最大值,一旦超过这个值会导致强制执行检查点(即使没到检查点的最大时问间隔),默认值是64MB。 Secondary NameNode保存最新检查点的目录与NameNode的目录结构相同。所以NameNode 可以在需要的时候读取Secondary NameNode上的检查点镜像。 如果NameNode上除了最新的检查点以外,所有的其他历史镜像和edits文件都丢失了,NameNode可以引入这个最新的检查点。以下操作可以实现这个功能: 1) 在配置参数dfs.name.dir指定的位置建立一个空文件夹。 2) 把检查点日录的位置赋值给配置参数fs.checkpoint.dir。 3) 启动NameNode,加上-importCheckpoint。 NameNode会从fs.checkpoint.dir目录读取检查点,并把它保存在dfs.name.dir日录下。如果dfs.name.dir目录下有合法的镜像文件,NameNode会启动失败。NameNode会检查fs.checkpoint.dir 目录下镜像文件的一致性,但是不会去改动它。 本文转自大数据躺过的坑博客园博客,原文链接:http://www.cnblogs.com/zlslch/p/5081598.html,如需转载请自行联系原作者

优秀的个人博客,低调大师

Hadoop Hive概念学习系列之hive里的用户定义函数UDF(十七)

Hive可以通过实现用户定义函数(User-Defined Functions,UDF)进行扩展(事实上,大多数Hive功能都是通过扩展UDF实现的)。想要开发UDF程序,需要继承org.apache.hadoop.ql.exec.UDF类,并重载evaluate方法。Hive API提供@Description声明,使用声明可以在代码中添加UDF的具体信息。在Hive中可以使用DESCRIBE语句来展现这些信息。 Hive的源码本身就是编写UDF最好的参考资料。在Hive源代码中很容易就能找到与需求功能相似的UDF实现,只需要复制过来,并加以适当的修改就可以满足需求。 下面是一个具体的UDF例子,该例子的功能是将字符串全部转化为小写字母 package com.madhu.udf; import org.apache.hadoop.hive.ql.exec.Desription; import org.apache.hadoop.hive.ql.exec.UDF; import org.apache.hadoop.io.Text; //add jar samplecode.jar; //create temporary function to_upper as 'com.madhu.udf.UpercaseUDF'; @Desription( name="to_upper", value="_FUNC_(str) -Converts a string to uppercase", extended="Example:\n" + " > select to_upper(producer) from videos_ex;\n" + " JOHN MCTIERNAN" ) public class UpercaseUDF extends UDF{ public Text evaluate(Text input){ Text result = new Text(""); if (input != null){ result.set(input.toString().toUpperCase()); } return result; } } UDF只有加入到Hive系统路径,并且使用唯一的函数名注册后才能在Hive中使用。UDF应该被打成JAR包。 上传打好的samplecode.jar,然后如下 下面的语句可以把JAR条件放入Hive系统路径,并注册相关函数: hive >add jarsamplecode.jar 这个目录,根据自己的情况而定 Added samplecode.jar to class path Added resource:samplecode.jar hive>create temporary functionto_upperas'com.madhu.udf.UppercaseUDF'; 现在可以在Hive中使用这个函数了: hive > describe function to_upper; OK to_upper(str) -Converts a string to uppercase Time taken:0.039 seconds,Fetched:1 row(s) hive > describe function extended to_upper; OK to_upper(str) - Converts a string to uppercase Example: > select to_upper(producer) from videos_ex; JOHN MCTIERNAN Time taken:0.07 seconds,Fetched:4 row(s) 手动的话,见 3 hql语法及自定义函数 + hive的java api 自动的话,见 Hive项目开发环境搭建(Eclipse\MyEclipse + Maven) 这里,我自己写了一个hiveEvaluateUDF自定义函数,实现某一个我们自己想要的功能。比如,我这里是转换功能。开始编写代码 package cn.itcast.bigdata; import org.apache.hadoop.hive.ql.exec.UDF; public class hiveEvaluateUDF extends UDF{ public String evaluate(String str){ if (str == null | str.toString().isEmpty()){ return new String(); } return str.trim().toLowerCase(); } } hive>addjarhiveEvaluateUDF.jar; 得到 cn.itcast.bigdata.hiveEvaluateUDF hive>create temporary functionto_loweras'cn.itcast.bigdata.hiveEvaluateUDF'; 剩下的,自行去尝试。 如何用好自己写好的自定义UDF函数 方法一: 比如,我这里,有个转大写的自定义UDF函数,自己写个vi hiveupperrc文件。每次执行这个文件,这个自定义的转大写函数能用了。 方法二: 在$HIVE_HOME/scripts目录下,写个如hiveupperrc.sh脚本。 本文转自大数据躺过的坑博客园博客,原文链接:http://www.cnblogs.com/zlslch/p/6106218.html,如需转载请自行联系原作者

优秀的个人博客,低调大师

Storm概念学习系列之并行度与如何提高storm的并行度

对于storm来说,并行度的概念非常重要!大家一定要好好理解和消化。 storm的并行度,可以简单的理解为多线程。 如何提高storm的并行度?storm程序主要由spout和bolt组成的。spout和bolt在运行期间会生成task实例(new Spout或者new bolt)。 那这些task实例是需要在线程(executor)里面运行的,而线程是需要在进程(worker)里面执行的。 这些,都是可以在代码中控制的到。 1、所以想要提高storm的处理能力,最直接的就是提高executor线程的并行度。 2、提高worker的数量有什么好处呢? 可以间接提高storm的处理能力,因为一个worker进程的处理能力是有限的,如果线程太多了,是需要使用多个进程的,否则,多线程的效率也不高。 假设一个进程里面运行10个线程效率最高,如果你把100个线程都在一个进程里面运行。 3、提高task的数量有什么好处呢? 因为线程内部不能并行处理,所以就算提高线程内部的task的数量,也不能提高storm的并行度。 它的好处是,可以方便后期执行storm的rebalance(弹性计算) 【因为当一个storm程序提交之后,这个程序中的task数目就不会再变了】 本文转自大数据躺过的坑博客园博客,原文链接:http://www.cnblogs.com/zlslch/p/7247412.html,如需转载请自行联系原作者

资源下载

更多资源
优质分享App

优质分享App

近一个月的开发和优化,本站点的第一个app全新上线。该app采用极致压缩,本体才4.36MB。系统里面做了大量数据访问、缓存优化。方便用户在手机上查看文章。后续会推出HarmonyOS的适配版本。

Mario

Mario

马里奥是站在游戏界顶峰的超人气多面角色。马里奥靠吃蘑菇成长,特征是大鼻子、头戴帽子、身穿背带裤,还留着胡子。与他的双胞胎兄弟路易基一起,长年担任任天堂的招牌角色。

腾讯云软件源

腾讯云软件源

为解决软件依赖安装时官方源访问速度慢的问题,腾讯云为一些软件搭建了缓存服务。您可以通过使用腾讯云软件源站来提升依赖包的安装速度。为了方便用户自由搭建服务架构,目前腾讯云软件源站支持公网访问和内网访问。

Spring

Spring

Spring框架(Spring Framework)是由Rod Johnson于2002年提出的开源Java企业级应用框架,旨在通过使用JavaBean替代传统EJB实现方式降低企业级编程开发的复杂性。该框架基于简单性、可测试性和松耦合性设计理念,提供核心容器、应用上下文、数据访问集成等模块,支持整合Hibernate、Struts等第三方框架,其适用范围不仅限于服务器端开发,绝大多数Java应用均可从中受益。

用户登录
用户注册