首页 文章 精选 留言 我的

精选列表

搜索[官方],共10003篇文章
优秀的个人博客,低调大师

Apache Storm 官方文档 —— 常用模式

本文列出了 Storm 拓扑中使用的一些常见模式,包括: 数据流的 join 批处理 BasicBolt 内存缓存与域分组的结合 Top N 流式计算 TimeCacheMap CoordinatedBolt 与 KeyedFairBolt Joins 数据流的 join 一般指的是通过共有的域来聚合两个或多个数据流的过程。与一般的数据库中 join 操作要求有限的输入与清晰的语义不同,数据流 join 的输入往往是无限的数据集,而且并不具备明确的语义。 join 的类型一般是由应用的需求决定的。有些应用需要将两个流在某个固定时间内的所有 tuple 进行 join,另外一些应用却可能要求对每个 join 域的 join 操作过程的两侧只保留一个 tuple,而其他的应用也许还有一些其他需求。不过这些 join 类型一般都会有一个基本的模式,那就是将多个输入流进行分区。Storm 可以很容易地使用域分组的方法将多个输入流聚集到一个联结 bolt 中,比如下面这样: builder.setBolt("join", new MyJoiner(), parallelism) .fieldsGrouping("1", new Fields("joinfield1", "joinfield2")) .fieldsGrouping("2", new Fields("joinfield1", "joinfield2")) .fieldsGrouping("3", new Fields("joinfield1", "joinfield2")); 当然,上面的代码只是个例子,实际上不同的流完全可以具有不同的输入域。 批处理 通常由于效率或者其他方面的原因,你需要使用将 tuple 们组合成 batch 来处理,而不是一个个分别处理它们。比如,在做数据库更新操作或者流聚合操作时,你就会需要这样的批处理形式。 要确保数据处理的可靠性,正确的方式是在 bolt 进行批处理之前将 tuple 们缓存在一个实例变量中。在完成批处理操作之后,你就可以一起 ack 所有的缓存的 tuple 了。 如果这个批处理 bolt 还需要继续向下游发送 tuple,你可能还需要使用多锚定(multi-anchoring)来确保可靠性。具体怎么做取决于应用的需求。想要了解更多关于可靠性的工作机制的内容请参考消息的可靠性保障一文。 BasicBolt Bolt 处理 tuple 的一种基本模式是在execute方法中读取输入 tuple、发送出基于输入 tuple 的新 tuple,然后在方法末尾对 tuple 进行应答(ack)。符合这种模式的 bolt 一般是一种函数或者过滤器。对于这种基本的处理模式,Storm 提供了IBasicBolt接口来自动实现这个过程。更多内容请参考消息的可靠性保障一文。 内存缓存与域分组的结合 在 Storm 的 bolt 中保存一定的缓存也是一种比较常见的方式。尤其是在于域分组结合的时候,缓存的作用特别显著。例如,假如你有一个用于将短链接(short URLs,例如 bit.ly, t.co,等等)转化成长链接(龙 URLs)的 bolt。你可以通过一个将短链接映射到长链接的 LRU 缓存来提高系统的性能,避免反复的 HTTP 请求操作。假如现在有一个名为 “urls” 的组件用于发送短链接,另外有一个 “expand” 组件用于将短链接扩展为长链接,并且在 “expand” 内部保留一个缓存。让我们来看看下面两段代码有什么不同: builder.setBolt("expand", new ExpandUrl(), parallelism) .shuffleGrouping(1); builder.setBolt("expand", new ExpandUrl(), parallelism) .fieldsGrouping("urls", new Fields("url")); 由于域分组可以使得相同的 URL 永远被发往同一个 task,第二段代码会比第一段代码高效得多。这样可以避免在不同的 task 的缓存中的复制动作,并且看上去短 URL 可以更好地在命中缓存。 Top N Storm 中一种常见的连续计算模式是计算数据流中某种形式的 Top N 结果。假如现在有一个可以以 [“value”, “count”] 的形式发送 tuple 的 bolt,并且你需要一个可以根据 count 计算结果输出前 N 个 tuple 的 bolt。实现这个操作的最简单的方法就是使用一个对数据流进行全局分组的 bolt,并且在内存中维护一个包含 top N 结果的列表。 这种方法并不适用于大规模数据流,因为整个数据流都会发往同一个 task,会造成该 task 的内存负载过高。更好的做法是将数据流分区,同时对每个分区计算 top N 结果,然后将这些结果汇总来得到最终的全局 top N 结果。下面是这个模式的代码: builder.setBolt("rank", new RankObjects(), parallelism) .fieldsGrouping("objects", new Fields("value")); builder.setBolt("merge", new MergeObjects()) .globalGrouping("rank"); 这个方法之所以可行是因为第一个 bolt 的域分组操作确保了每个小分区在语义上的正确性。你可以在storm-starter里看到使用这个模式的一个例子。 当然,如果待处理的数据集存在较严重的数据倾斜,那么还是应该使用 partialKeyGrouping 来代替 fieldsGrouping,因为 partialKeyGrouping 可以通过两个下游 bolt 分散每个 key 的负载。 builder.setBolt("count", new CountObjects(), parallelism) .partialKeyGrouping("objects", new Fields("value")); builder.setBolt("rank" new AggregateCountsAndRank(), parallelism) .fieldsGrouping("count", new Fields("key")) builder.setBolt("merge", new MergeRanksObjects()) .globalGrouping("rank"); 这个拓扑中需要一个中间层来聚合来自上游 bolt 数据流的分区计数结果,但这一层仅仅会做一个简单的聚合处理,这样 bolt 就不会受到由于数据倾斜带来的负载压力。你可以在storm-starter中看到使用这个模式的一个例子。 支持 LRU 的 TimeCacheMap 有时候你可能会需要一个能够保留“活跃的”数据并且能够使得超时的“非活跃的”数据自动失效的缓存。TimeCacheMap是一个可以高效地实现此功能的数据结构。它还提供了一个钩子用于实现在数据失效后的回调操作。 用于分布式 RPC 的 CoordinatedBolt 与 KeyedFairBolt 在构建 Storm 上层的分布式 RPC 应用时,通常会用到两种常用的模式。现在这两种模式已经被封装为CoordinatedBolt和KeyedFairBolt,并且已经加入了 Storm 标准库中。 CoordinatedBolt将你的处理逻辑 bolt 包装起来,并且在你的 bolt 收到了指定请求的所有 tuple 之后发出通知。CoordinatedBolt中大量使用了直接数据流组来实现此功能。 KeyedFairBolt同样包装了你的处理逻辑 bolt,并且可以让你的拓扑同时处理多个 DRPC 调用,而不是每次只执行一个。 如果需要了解更多内容请参考分布式 RPC一文。 转载自并发编程网 - ifeve.com

优秀的个人博客,低调大师

Apache Storm 官方文档 —— Trident 教程

Trident 是 Storm 的一种高度抽象的实时计算模型,它可以将高吞吐量(每秒百万级)数据输入、有状态的流式处理与低延时的分布式查询无缝结合起来。如果你了解 Pig 或者 Cascading 这样的高级批处理工具,你就会发现他们和 Trident 的概念非常相似。Trident 同样有联结(join)、聚合(aggregation)、分组(grouping)、函数(function)以及过滤器(filter)这些功能。Trident 为数据库或者其他持久化存储上层的状态化、增量式处理提供了基础原语。由于 Trident 有着一致的、恰好一次的语义,因此推断出 Trident 拓扑的状态也是一件很容易的事。 使用范例 让我们先从一个使用 Trident 的例子开始。这个例子中做了两件事情: 从一个句子的输入数据流中计算出单词流的数量 实现对一个单词列表中每个单词总数的查询 为了实现这个目的,这个例子将会从下面的数据源中无限循环地读取语句数据流: FixedBatchSpout spout = new FixedBatchSpout(new Fields("sentence"), 3, new Values("the cow jumped over the moon"), new Values("the man went to the store and bought some candy"), new Values("four score and seven years ago"), new Values("how many apples can you eat")); spout.setCycle(true); 这个 Spout 会循环地访问语句集来生成语句数据流。下面的代码就是用来实现计算过程中的单词数据流统计部分: TridentTopology topology = new TridentTopology(); TridentState wordCounts = topology.newStream("spout1", spout) .each(new Fields("sentence"), new Split(), new Fields("word")) .groupBy(new Fields("word")) .persistentAggregate(new MemoryMapState.Factory(), new Count(), new Fields("count")) .parallelismHint(6); 让我们一行行地来分析上面的代码。首先我们创建了一个TridentTopology对象,这个对象提供了构造 Trident 计算过程的接口。TridentTopology有一个叫做newStream的方法,这个方法可以从一个输入数据源中读取数据创建一个新的数据流。在这个例子中,输入的数据源就是前面定义的FixedBatchSpout。输入数据源也可以是像 Kestrel 和 Kafka 这样的消息系统。Trident 会通过 ZooKeeper 一直跟踪每个输入数据源的一小部分状态(Trident 具体消费对象的相关元数据)。例如这里的 “spout1” 就对应着 ZooKeeper 中的一个节点,而 Trident 就会在该节点中存放数据源的元数据(metadata)。 Trident 会将数据流处理为很多个小块 tuple 的集合,例如,输入的句子流就会像下面这样被分割成很多个小块: 这些小块的大小主要取决于你的输入吞吐量,一般可能会在数万甚至数百万元组的级别。 Trident 为这些小块提供了一个完全成熟的批处理 API。这个 API 和你见到过的 Pig 或者 Cascading 这样的 Hadoop 的高级抽象语言很相似:你可以处理分组(group by)、联结(join)、聚合(aggregation)、函数(function)、过滤器(filter)等各种操作。当然,分别处理每个小块并不是件好事,所以,Trident 提供了适用于处理各个小块之间的聚合操作的函数,并且可以在聚合后将结果保存到持久化存储中,而且无论是内存、Memcached、Cassandra 还是其他类型的存储都可以支持。最后,Trident 还提供了用于查询实时状态结果的一级接口。而这个结果状态既可以像这个例子中演示的那样由 Trident 负责更新,也可以作为一个独立的状态数据源而存在。 再回到这个例子中,输入数据源 spout 发送出了一个名为 “sentence” 的数据流。接下来拓扑中定义了一个Split方法用于处理流中的每个 tuple,这个方法接收 “sentence” 域并将其分割成若干个单词。每个 sentence tuple 都会创建很多个单词 tuple —— 例如 “the cow jumped over the moon” 这个句子就会创建 6 个 “word” tuple,下面是Split的定义: public class Split extends BaseFunction { public void execute(TridentTuple tuple, TridentCollector collector) { String sentence = tuple.getString(0); for(String word: sentence.split(" ")) { collector.emit(new Values(word)); } } } 从上面的代码中你会发现这个过程真的很简单。这个方法中的所有操作仅仅是抓取句子、以空格分隔句子并且为每个单词发射一个 tuple。 拓扑的剩余部分负责统计单词的数量并将结果保存到持久化存储中。首先,数据流根据 “word” 域分组,然后使用Count聚合器持续聚合每个小组。persistentAggregate方法用于存储并更新 state 源中的聚合结果。在这个例子中,单词的数量结果是保存在内存中的,不过可以根据需要切换到 Memcached、Cassandra 或者其他持久化存储中。切换存储模型也非常简单,只需要像下面这样(使用trident-memcached修改persistentAggregate行中的一个参数(其中,“serverLocations” 是 Memcached 集群的地址/端口列表)即可: .persistentAggregate(MemcachedState.transactional(serverLocations), new Count(), new Fields("count")) persistentAggregate方法所存储的值就表示所有从数据流中发送出来的块的聚合结果。 Trident 的另一个很酷的特性就是它支持完全容错性和恰好一次处理的语义。如果处理过程中出现错误需要重新执行处理操作,Trident 不会向数据库中提交多次来自相同的源数据的更新操作,这就是 Trident 持久化 state 的方式。 persistentAggregate方法也可以将数据流结果传入一个TridentState对象中。这种情况下,这个TridentState就表示所有的单词统计信息。这样我们就可以使用TridentState对象来实现整个计算过程中的分布式查询部分。 接下来我们就可以在拓扑中实现 word count 的一个低延时分布式查询。这个查询接收一个由空格分隔的单词列表作为参数,然后返回这些单词的数量统计结果。这个查询看上去与普通的 RPC 调用并没有什么分别,不过在后台他们是并发执行的。下面是一个实现这种查询的例子: DRPCClient client = new DRPCClient("drpc.server.location", 3772); System.out.println(client.execute("words", "cat dog the man"); // prints the JSON-encoded result, e.g.: "[[5078]]" 如你所见,这个查询看上去只是一个普通的远程过程调用(RPC),不过在后台他是在一个 Storm 集群中并发执行的。这种查询的端到端延时一般在 10 ms 左右。当然,更大量的查询会花费更长的时间,尽管这些查询还是取决于你为这个计算过程分配了多少时间。 拓扑中的分布式查询的实现是这样的: topology.newDRPCStream("words") .each(new Fields("args"), new Split(), new Fields("word")) .groupBy(new Fields("word")) .stateQuery(wordCounts, new Fields("word"), new MapGet(), new Fields("count")) .each(new Fields("count"), new FilterNull()) .aggregate(new Fields("count"), new Sum(), new Fields("sum")); 这里还需要使用前面的TridentTopology对象来创建一个 DRPC 数据流,这个创建数据流的方法叫做 “words”。前面使用DRPCClient进行 RPC 调用的第一个参数必须与这个方法名完全相同。 在这段代码里,首先是使用Split方法来将请求的参数分割成若干个单词。这些单词构成的单词流是通过 “word” 域来分组的,而stateQuery运算符就是用来查询拓扑中第一个部分中生成的TridentState对象的。stateQuery接收一个 state(在这个例子中就是拓扑前面计算得到的单词数结果)和查询这个 state 的方法作为参数。在这个例子里,stateQuery调用了MapGet方法,用于获取每个单词的个数。由于 DRPC 数据流是和 TridentState 采用的完全相同的方式进行分组的(通过 “word” 域),每个单词查询都可以精确地定位到 TridentState 对象中的指定部分,同时 TridentState 对象中维护着对应的单词的更新状态。 接下来,个数为 0的单词会被FilterNull过滤器过滤掉,然后就可以使用Sum聚合器来获取其他的单词统计个数。接着 Trident 就会自动将结果返回给等待的客户端。 Trident 很聪明,它知道怎么以最好的性能运行拓扑。在这个拓扑中还有两个会自动发生的有趣的事: 从 state 中读取或写入的操作(例如 persistentAggregate 和 stateQuery)会自动批处理化。因此,如果当前的批处理过程需要对数据库执行 20 个更新操作,Trident 就会自动将读取或写入操作当作批处理过程,仅仅会对数据库发送一次读请求和一次写请求,而不是发送 20 次读请求和 20 次写请求(而且一般你还可以在你的 state 里使用缓存来消除读请求)。这样做就有两个方面的好处:可以按照你指定的方式来执行你的计算过程,同时还可以维持较好的性能。 Trident 的聚合器是高度优化的。在向网络中发送 tuple 之前,Trident 有时候会做部分聚合操作,而不是将一个分组的所有的 tuple 一股脑地发送到同一台机器中来执行聚合。例如,Count聚合器就是这样先计算每个小块的个数,然后向网络中发送很多个部分计数的结果,接着再将所有的部分计数结果汇总来得到最终的统计结果。这个技术与 MapReduce 的 combiner 模型很相似。 我们再来看看 Trident 的另一个例子。 Reach 这个例子是一个纯粹的 DRPC 拓扑,计算了一个指定 URL 的 Reach 数。Reach 指的是 Twitter 上能够看到一个指定的 URL 的独立用户数。要想计算 Reach,你需要先提取所有转发了该 URL 的用户,提取这些用户的关注者,将关注者放入一个 set 集合中来去除重复的关注者,然后再统计这个 set 中的数量。对于单一的一台机器来说,计算 reach 太耗时了,这个过程大概需要数千次数据库调用并生成数千万 tuple。而使用 Storm 和 Trident 就可以通过一个集群来将计算过程的每个步骤进行并行化处理。 这个拓扑会从两个 state 源中读取数据。其中一个数据库建立了 URL 和转发了该 URL 的用户列表的关联表。另一个数据库中建立了用户和用户的关注者列表的关联表。拓扑的定义是这样的: TridentState urlToTweeters = topology.newStaticState(getUrlToTweetersState()); TridentState tweetersToFollowers = topology.newStaticState(getTweeterToFollowersState()); topology.newDRPCStream("reach") .stateQuery(urlToTweeters, new Fields("args"), new MapGet(), new Fields("tweeters")) .each(new Fields("tweeters"), new ExpandList(), new Fields("tweeter")) .shuffle() .stateQuery(tweetersToFollowers, new Fields("tweeter"), new MapGet(), new Fields("followers")) .parallelismHint(200) .each(new Fields("followers"), new ExpandList(), new Fields("follower")) .groupBy(new Fields("follower")) .aggregate(new One(), new Fields("one")) .parallelismHint(20) .aggregate(new Count(), new Fields("reach")); 这个拓扑使用newStaticState方法创建了两个分别对应外部于两个外部数据库的TridentState对象。在拓扑的后续部分就可以对这两个TridentState对象执行查询操作。和 state 的所有数据源一样,为了最大程度地提升效率,对这些数据库的查询将会自动地批处理化。 拓扑的定义很直接 —— 就是一个简单的批处理 job。首先,会通过查询 urlToTweeters 数据库来获取转发了 URL 的用户列表,然后就可以调用ExpandList方法来为每个 tweeter 创建一个 tuple。 接下来必须要获取每个 tweeter 的关注者。由于需要调用 shuffle 方法将所有的 tweeter 均衡分配到拓扑的所有 worker 中,所以这个步骤必须并发进行,这一点非常重要。然后就可以查询关注者数据库来获取每个 tweeter 的关注者列表。你可能注意到了这个过程的并行度非常高,因为这是整个计算过程中复杂度最高的部分。 再接下来,关注者就会被放入一个单独的 set 集合中用于计数。这里包含两个步骤。首先,会根据 “follower” 域来执行 “group by” 分组操作,并在每个组上运行One聚合器。“One”聚合器的作用仅仅是为每个组发送一个包含数字 1 的 tuple。然后,就可以通过统计这些 one 结果来得到关注者 set 的大小,也就是真正的关注者数量。下面是 “One” 聚合器的定义: public class One implements CombinerAggregator<Integer> { public Integer init(TridentTuple tuple) { return 1; } public Integer combine(Integer val1, Integer val2) { return 1; } public Integer zero() { return 1; } } 这是一个“组合聚合器”,它知道怎样在向网络中发送 tuple 之前以最好的效率进行部分聚合操作。同样,Sum 也是一个组合聚合器,所以在拓扑结尾的全局统计操作也会有很高的效率。 下面让我们再来看看 Trident 中的一些细节。 域(Fields)与元组(tuples) Trident 的数据模型 TridentTuple 是一个指定的值列表。在一个拓扑中,tuple 是在一系列操作中不断生成的。这些操作一般会输入一个“输入域”(input fields)集合,然后发送出一个“方法域”(function fields)的集合。输入域主要用于选取一个 tuple 的子集作为操作的输入,而“方法域”主要用于为该操作的输出结果域命名。 我们来看看这样一个场景。假设你有一个名为 “stream” 的数据流,其中包含域 “x”、“y” 和 “z”。如果要运行一个接收 “y” 作为输入的过滤器 MyFilter,你可以这样写: stream.each(new Fields("y"), new MyFilter()) 再假设 MyFilter 的实现是这样的: public class MyFilter extends BaseFilter { public boolean isKeep(TridentTuple tuple) { return tuple.getInteger(0) < 10; } } 这样就会保留所有 “y” 域的值小于 10 的 tuple。MyFilter 输入的 TridentTuple 将会仅包含有 “y” 域。值得注意的是,Trident 可以在选取输入域时以一种非常高效的方式来投射 tuple 的子集:这个投射过程非常灵活。 我们再来看看 “function fields” 是怎么工作的。假设你有这样一个函数: public class AddAndMultiply extends BaseFunction { public void execute(TridentTuple tuple, TridentCollector collector) { int i1 = tuple.getInteger(0); int i2 = tuple.getInteger(1); collector.emit(new Values(i1 + i2, i1 * i2)); } } 这个函数接收两个数字作为输入,然后发送出两个新值:分别是两个数字的和和乘积。再假定你有一个包含 “x”、“y” 和 “z” 域的数据流,你可以这样使用这个函数: stream.each(new Fields("x", "y"), new AddAndMultiply(), new Fields("added", "multiplied")); 这个函数的输出增加了两个新的域。因此,这个 each 调用的输出 tuple 会包含 5 个域:“x”、“y” 、“z”、“added” 和 “multiplied”。其中 “added” 与 AddAndMultiply 的第一个输出值相对应,“multiplied” 和 AddAndMultiply 的第二个输出值相对应。 另一方面,通过聚合器,函数域也可以替换输入 tuple 的域。假如你有一个包含域 “val1” 和域 “val2” 的数据流,通过这样的操作: stream.aggregate(new Fields("val2"), new Sum(), new Fields("sum")) 就会使得输出数据流中只包含一个只带有 “sum” 的域的 tuple,这个 “sum” 域就代表了在哪个批处理块中所有的 “val2” 域的总和值。 通过数据流分组,输出就可以同时包含用于分组的域以及由聚合器发送的域。举个例子: stream.groupBy(new Fields("val1")) .aggregate(new Fields("val2"), new Sum(), new Fields("sum")) 这个操作就会使得输出同时包含域 “val1” 以及域 “sum”。 State 实时计算的一个关键问题就在于如何管理状态(state),使得在失败与重试操作之后的更新过程仍然是幂等的。错误是不可消除的,所以在出现节点故障或者其他问题发生时批处理操作还需要进行重试。不过这里最大的问题就在于怎样执行一种合适的状态更新操作(不管是针对外部数据库还是拓扑内部的状态),来使得每个消息都能够被执行且仅仅被执行一次。 这个问题很麻烦,接下来的例子里面就有这样的问题。假如你正在对你的数据流做一个计数聚合操作,并且打算将计数结果存储到一个数据库中。如果你仅仅把计数结果存到数据库里就完事了的话,那么在你继续准备更新某个块的状态的时候,你没法知道到底这个状态有没有被更新过。这个数据块有可能在更新数据库的步骤上成功了,但在后续的步骤中失败了,也有可能先失败了,没有进行更新数据库的操作。你完全不知道到底发生了什么。 Trident 通过下面两件事情解决了这个问题: 在 Trident 中为每个数据块标记了一个唯一的 id,这个 id 就叫做“事务 id”(transaction id)。如果数据块由于失败回滚了,那么它持有的事务 id 不会改变。 State 的更新操作是按照数据块的顺序进行的。也就是说,在成功执行完块 2 的更新操作之前,不会执行块 3 的更新操作。 基于这两个基础特性,你的 state 更新就可以实现恰好一次(exactly-once)的语义。与仅仅向数据库中存储计数不同,这里你可以以一个原子操作的形式把事务 id 和计数值一起存入数据库。在后续更新这个计数值的时候你就可以先比对这个数据块的事务 id。如果比对结果是相同的,那么就可以跳过更新操作 —— 由于 state 的强有序性,可以确定数据库中已经包含有当前数据库的额值。而如果比对结果不同,就可以放心地更新计数值了。 当然,你不需要在拓扑中手动进行这个操作,操作逻辑已经在 State 中封装好了,这个过程会自动进行。同样的,你的 State 对象也不一定要实现事务 id 标记:如果你不想在数据库里耗费空间存储事务 id,你就不用那么做。在这样的情况下,State 会在出现失败的情形下保持“至少处理一次”的操作语义(这样对你的应用也是一件好事)。在这篇文章里你可以了解到更多关于如何实现 State 以及各种容错性权衡技术。 你可以使用任何一种你想要的方法来实现 state 的存储操作。你可以把 state 存入外部数据库,也可以保存在内存中然后在存入 HDFS 中(有点像 HBase 的工作机制)。State 也并不需要一直保存某个状态值。比如,你可以实现一个只保存过去几个小时数据并将其余的数据删除的 State。这是一个实现 State 的例子:Memcached integration。 Trident 拓扑的运行 Trident 拓扑会被编译成一种尽可能和普通拓扑有着同样的运行效率的形式。只有在请求数据的重新分配(比如 groupBy 或者 shuffle 操作)时 tuple 才会被发送到网络中。因此,像下面这样的 Trident 拓扑: 就会被编译成若干个 spout/bolt: 总结 Trident 让实时计算变得非常简单。从上面的描述中,你已经看到了高吞吐量的数据流处理、状态操作以及低延时查询处理是怎样通过 Trident 的 API 来实现无缝结合的。总而言之,Trident 可以让你以一种更加自然,同时仍然保持着良好的性能的方式来实现实时计算。 转载自并发编程网 - ifeve.com

优秀的个人博客,低调大师

Apache Storm 官方文档 —— Trident State

Trident 中含有对状态化(stateful)的数据源进行读取和写入操作的一级抽象封装工具。这个所谓的状态(state)既可以保存在拓扑内部(保存在内存中并通过 HDFS 来实现备份),也可以存入像 Memcached 或者 Cassandra 这样的外部数据库中。而对于 Trident API 而言,这两种机制并没有任何区别。 Trident 使用一种容错性的方式实现对 state 的管理,这样,即使在发生操作失败或者重试的情况下状态的更新操作仍然是幂等的。基于这个机制,每条消息都可以看作被恰好处理了一次,然后你就可以很容易地推断出 Trident 拓扑的状态。 State 的更新过程支持多级容错性保证机制。在讨论这一点之前,我们先来看一个例子,这个例子展示了如何实现恰好一次的语义的技术。假如你正在对数据流进行一个计数聚合操作,并打算将计数结果存入数据库中。在这个例子里,你存入数据库的就是一个对应计数结果的值,每次处理新 tuple 的时候就会增加这个值。 考虑到可能存在的处理失败情况,tuple 有可能需要重新处理。这样就给 state 的更新操作带来了一个问题(或者其他的副作用)—— 你无法知道当前的这个 tuple 的更新操作是否已经处理过了。也许你之前没有处理过这个 tuple,那么你现在就需要增加计数结果;也许你之前已经处理过 tuple 了并且成功地增加了计数结果,但是在后续操作过程中 tuple 的处理失败了,并由此引发了 tuple 的重新处理操作,这时你就不能再增加计数结果了;还有可能你之前在使用这个 tuple 更新数据库的时候出错了,也就是说计数值的更新操作并未成功,此时在 tuple 的重新处理过程中你仍然需要更新数据库。 所以说,如果只是向数据库中简单地存入计数值,你确实无法知道 tuple 是否已经被处理过。因此,你需要一些更多的信息来做决定。Trident 提供了一种支持恰好一次处理的语义,如下所述: 通过小数据块(batch)的方式来处理 tuple(可以参考Trident 教程一文) 为每个 batch 提供一个唯一的 id,这个 id 称为 “事务 id”(transaction id,txid)。如果需要对 batch 重新处理,这个 batch 上仍然会赋上相同的 txid。 State 的更新操作是按照 batch 的顺序进行的。也就是说,在 batch 2 完成处理之前,batch 3 的状态更新操作不会进行。 基于这几个基本性质,你的 State 的实现就可以检测到 tuple 的 batch 是否已经被处理过,并根据检测结果选择合适的 state 更新操作。你具体采用的操作取决于你的输入 spout 提供的语义,这个语义对每个 batch 都是有效的。有三类支持容错性的 spout:“非事务型”(non-transactional)、“事务型”(transactional)以及“模糊事务型”(opaque transactional)。接下来我们来分析下每种 spout 类型的容错性语义。 事务型 spout(Transactional spouts) 记住一点,Trident 是通过小数据块(batch)的方式来处理 tuple 的,而且每个 batch 都会有一个唯一的 txid。spout 的特性是由他们所提供的容错性保证机制决定的,而且这种机制也会对每个 batch 发生作用。事务型 spout 包含以下特性: 每个 batch 的 txid 永远不会改变。对于某个特定的 txid,batch 在执行重新处理操作时所处理的 tuple 集和它的第一次处理操作完全相同。 不同 batch 中的 tuple 不会出现重复的情况(某个 tuple 只会出现在一个 batch 中,而不会同时出现在多个 batch 中)。 每个 tuple 都会放入一个 batch 中(处理操作不会遗漏任何的 tuple)。 这是一种很容易理解的 spout,其中的数据流会被分解到固定的 batches 中。Storm-contrib 项目中提供了一种基于 Kafka 的事务型 spout 实现。 看到这里,你可能会有这样的疑问:为什么不在拓扑中完全使用事务型 spout 呢?这个原因很好理解。一方面,有些时候事务型 spout 并不能提供足够可靠的容错性保障,所以不需要使用事务型 spout。比如,TransactionalTridentKafkaSpout的工作方式就是使得带有某个 txid 的 batch 中包含有来自一个 Kafka topic 的所有 partition 的 tuple。一旦一个 batch 被发送出去,在将来无论重新发送这个 batch 多少次,batch 中都会包含有完全相同的 tuple 集,这是由事务型 spout 的语义决定的。现在假设TransactionalTridentKafkaSpout发送出的某个 batch 处理失败了,而与此同时,Kafka 的某个节点因为故障下线了。这时你就无法重新处理之前的 batch 了(因为 Kafka 的节点故障,Kafka topic 必然有一部分 partition 无法获取到),这个处理过程也会因此终止。 这就是要有“模糊事务型” spout 的原因了 —— 模糊事务型 spout 支持在数据源节点丢失的情况下仍然可以实现恰好一次的处理语义。我们会在下一节讨论这类 spout。 顺便提一点,如果 Kafka 支持数据复制,那么就可以放心地使用事务型 spout 提供的容错性机制了,因为这种情况下某个节点的故障不会导致数据丢失,不过 Kafka 暂时还不支持该特性。(本文的写作时间应该较早,Kakfa 早就已经可以支持复制的机制了 —— 译者注)。 在讨论“模糊事务型” spout 之前,让我们先来看看如何为事务型 spout 设计一种支持恰好一次语义的 State。这个 State 就称为 “事务型 state”,它支持对于特定的 txid 永远只与同一组 tuple 相关联的特性。 假如你的拓扑需要计算单词数,而且你准备将计数结果存入一个 K-V 型数据库中。这里的 key 就是单词,value 对应于单词数。从上面的讨论中你应该已经明白了仅仅存储计数结果是无法确定某个 batch 中的tuple 是否已经被处理过的。所以,现在你应该将 txid 作为一种原子化的值与计数值一起存入数据库。随后,在更新计数值的时候,你就可以将数据库中的 txid 与当前处理的 batch 的 txid 进行比对。如果两者相同,你就可以跳过更新操作 —— 由于 Trident 的强有序性处理机制,可以确定数据库中的值是对应于当前的 batch 的。如果两者不同,你就可以放心地增加计数值。由于一个 batch 的 txid 永远不会改变,而且 Trident 能够保证 state 的更新操作完全是按照 batch 的顺序进行的,所以,这样的处理逻辑是完全可行的。 下面来看一个例子。假如你正在处理 txid 3,其中包含有以下几个 tuple: ["man"] ["man"] ["dog"] 假如数据库中有以下几个 key-value 对: man => [count=3, txid=1] dog => [count=4, txid=3] apple => [count=10, txid=2] 其中与 “man” 相关联的 txid 为 1。由于当前处理的 txid 为 3,你就可以确定当前处理的 batch 与数据库中存储的值无关,这样你就可以放心地将 “man” 的计数值加上 2 并更新 txid 为 3。另一方面,由于 “dog” 的 txid 与当前的 txid 相同,所以,“dog” 的计数是之前已经处理过的,现在不能再对数据库中的计数值进行更新操作。这样,在结束 txid3 的更新操作之后,数据库中的结果就会变成这样: man => [count=5, txid=3] dog => [count=4, txid=3] apple => [count=10, txid=2] 现在我们再来讨论一下“模糊事务型” spout。 模糊事务型 spout(Opaque transactional spouts) 前面已经提到过,模糊事务型 spout 不能保证一个 txid 对应的 batch 中包含的 tuple 完全一致。模糊事务型 spout 有以下的特性: 每个 tuple 都会通过某个 batch 处理完成。不过,在 tuple 处理失败的时候,tuple 有可能继续在另一个 batch 中完成处理,而不一定是在原先的 batch 中完成处理。 OpaqueTridentKafkaSpout就具有这样的特性,同时它对 Kafka 节点的丢失问题具有很好的容错性。OpaqueTridentKafkaSpout在发送一个 batch 的时候总会总上一个 batch 结束的地方开始发送新 tuple。这一点可以保证 tuple 不会被遗漏,而且也不会被多个 batch 处理。 不过,模糊事务型 spout 的缺点就在于不能通过 txid 来识别数据库中的 state 是否是已经处理过的。这是因为在 state 的更新的过程中,batch 有可能会发生变化。 在这种情况下,你应该在数据库中存储更多的 state 信息。除了一个结果值和 txid 之外,你还应该存入前一个结果值。我们再以上面的计数值的例子来分析以下这个问题。假如你的 batch 的部分计数值是 “2”,现在你需要应用一个更新操作。假定现在数据库中的值是这样的: { value = 4, prevValue = 1, txid = 2 } 情形1:假如当前处理的 txid 为 3,这与数据库中的 txid 不同。这时可以将 “prevValue” 的值设为 “value” 的值,再为 “value” 的值加上部分计数的结果并更新 txid。执行完这一系列操作之后的数据库中的值就会变成这样: { value = 6, prevValue = 4, txid = 3 } 情形2:如果当前处理的 txid 为 2,也就是和数据库中存储的 txid 一致,这种情况下的处理逻辑与上面的 txid 不一致的情况又有所不同。因为此时你会知道数据库中的更新操作是由上一个拥有相同 txid 的batch 做出的。不过那个 batch 有可能与当前的 batch 并不相同,所以你需要忽略它的操作。这个时候,你应该将 “prevValue” 加上 batch 中的部分计数值来计算新的 “value”。在这个操作之后数据库中的值就会变成这样: { value = 3, prevValue = 1, txid = 2 } 这种方法之所以可行是因为 Trident 具有强顺序性处理的特性。一旦 Trident 开始处理一个新的 batch 的状态更新操作,它永远不会回到过去的 batch 的处理上。同时,由于模糊事务型 spout 会保证 batch 之间不会存在重复 —— 每个 tuple 只会被某一个 batch 完成处理 —— 所以你可以放心地使用 prevValue 来更新 value。 非事务型 spout(Non-transactional spouts) 非事务型 spout 不能为 batch 提供任何的安全性保证。非事务型 spout 有可能提供一种“至多一次”的处理模型,在这种情况下 batch 处理失败后 tuple 并不会重新处理;也有可能提供一种“至少一次”的处理模型,在这种情况下可能会有多个 batch 分别处理某个 tuple。总之,此类 spout 不能提供“恰好一次”的语义。 不同类型的 Spout 与 State 的总结 下图显示了不同的 spout/state 的组合是否支持恰好一次的消息处理语义: 模糊事务型 state 具有最好的容错性特征,不过这是以在数据库中存储更多的内容为代价的(一个 txid 和两个 value)。事务型 state 要求的存储空间相对较小,但是它的缺点是只对事务型 spout 有效。相对的,非事务型要求的存储空间最少,但是它也不能提供任何的恰好一次的消息执行语义。 你选择 state 与 spout 的时候必须在容错性与存储空间占用之间权衡。可以根据你的应用的需求来确定哪种组合最适合你。 State API 从上文的描述中你已经了解到了恰好一次的消息执行语义的原理是多么的复杂。不过作为用户你并不需要处理这些复杂的 txid 比对、多值存储等操作,Trident 已经在 State 中封装了所有的容错性处理逻辑,你只需要像下面这样写代码即可: TridentTopology topology = new TridentTopology(); TridentState wordCounts = topology.newStream("spout1", spout) .each(new Fields("sentence"), new Split(), new Fields("word")) .groupBy(new Fields("word")) .persistentAggregate(MemcachedState.opaque(serverLocations), new Count(), new Fields("count")) .parallelismHint(6); 所有处理模糊事务型 state 的逻辑已经封装在MemcachedState.opaque的调用中了。另外,状态更新都会自动调整为批处理操作,这样可以减小与数据库的反复交互的资源损耗。 基本的State接口只有两个方法: public interface State { void beginCommit(Long txid); // 对于类似于在 DRPC 流上进行 partitionPersist 的操作,此方法可以为空 void commit(Long txid); } 前面已经说过,state 更新操作的开始时和结束时都会获取一个 txid。对于你的 state 怎么工作,你在其中使用什么样的方法执行更新操作,或者使用什么样的方法从 state 中读取数据,Trident 并不关心。 假如你有一个包含有用户的地址信息的定制数据库,你需要使用 Trident 与该数据库交互。你的 State 的实现就会包含有用于获取与设置用户信息的方法,比如下面这样: public class LocationDB implements State { public void beginCommit(Long txid) { } public void commit(Long txid) { } public void setLocation(long userId, String location) { // code to access database and set location } public String getLocation(long userId) { // code to get location from database } } 接着你就可以为 Trident 提供一个 StateFactory 来创建 Trident 任务内部的 State 对象的实例。对应于你的数据库(LocationDB)的 StateFactory 大概是这样的: public class LocationDBFactory implements StateFactory { public State makeState(Map conf, int partitionIndex, int numPartitions) { return new LocationDB(); } } Trident 提供了一个用于查询 state 数据源的QueryFunction接口,以及一个用于更新 state 数据源的StateUpdater接口。例如,我们可以写一个查询 LocationDB 中的用户地址信息的 “QueryLocation”。让我们从你在拓扑中使用这个操作的方式开始。假如在拓扑中需要读取输入流中的 userid 信息: TridentTopology topology = new TridentTopology(); TridentState locations = topology.newStaticState(new LocationDBFactory()); topology.newStream("myspout", spout) .stateQuery(locations, new Fields("userid"), new QueryLocation(), new Fields("location")) 这里的QueryLocation的实现可能是这样的: public class QueryLocation extends BaseQueryFunction<LocationDB, String> { public List<String> batchRetrieve(LocationDB state, List<TridentTuple> inputs) { List<String> ret = new ArrayList(); for(TridentTuple input: inputs) { ret.add(state.getLocation(input.getLong(0))); } return ret; } public void execute(TridentTuple tuple, String location, TridentCollector collector) { collector.emit(new Values(location)); } } QueryFunction的执行包含两个步骤。首先,Trident 会将读取的一些数据中汇总为一个 batch 传入 batchRetrieve 方法中。在这个例子中,batchRetrieve 方法会收到一些用户 id。然后 batchRetrieve 会返回一个与输入 tuple 列表大小相同的队列。结果队列的第一个元素与第一个输入 tuple 对应,第二个元素与第二个输入 tuple 相对应,以此类推。 你会发现这段代码并没有发挥出 Trident 批处理的优势,因为这段代码仅仅一次查询一下 LocationDB。所以,实现 LocationDB 的更好的方式应该是这样的: public class LocationDB implements State { public void beginCommit(Long txid) { } public void commit(Long txid) { } public void setLocationsBulk(List<Long> userIds, List<String> locations) { // set locations in bulk } public List<String> bulkGetLocations(List<Long> userIds) { // get locations in bulk } } 然后,你可以这样实现QueryLocation方法: public class QueryLocation extends BaseQueryFunction<LocationDB, String> { public List<String> batchRetrieve(LocationDB state, List<TridentTuple> inputs) { List<Long> userIds = new ArrayList<Long>(); for(TridentTuple input: inputs) { userIds.add(input.getLong(0)); } return state.bulkGetLocations(userIds); } public void execute(TridentTuple tuple, String location, TridentCollector collector) { collector.emit(new Values(location)); } } 这段代码大幅减少了域数据库的IO,具有更高的执行效率。 你需要使用StateUpdater接口来更新 state。下面是一个更新 LocationDB 的地址信息的 StateUpdater 实现: public class LocationUpdater extends BaseStateUpdater<LocationDB> { public void updateState(LocationDB state, List<TridentTuple> tuples, TridentCollector collector) { List<Long> ids = new ArrayList<Long>(); List<String> locations = new ArrayList<String>(); for(TridentTuple t: tuples) { ids.add(t.getLong(0)); locations.add(t.getString(1)); } state.setLocationsBulk(ids, locations); } } 然后你就可以在 Trident 拓扑中这样使用这个操作: TridentTopology topology = new TridentTopology(); TridentState locations = topology.newStream("locations", locationsSpout) .partitionPersist(new LocationDBFactory(), new Fields("userid", "location"), new LocationUpdater()) partitionPersist操作会更新 state 数据源。StateUpdater接收 State 和一批 tuple 作为输入,然后更新这个 State。上面的代码仅仅从输入 tuple 中抓取 userid 和 location 信息,然后对 State 执行一个批处理更新操作。 在 Trident 拓扑更新 LocationDB 之后,partitionPersist会返回一个表示更新后状态的TridentState对象。随后你就可以在拓扑的其他地方使用stateQuery方法对这个 state 执行查询操作。 你也许注意到了 StateUpdater 中有一个 TridentCollector 参数。发送到这个 collector 的 tuple 会进入一个“新的数值流”中。在这个例子里向这个新的流发送 tuple 并没有意义,不过如果你需要处理类似于更新数据库中的计数值这样的操作,你可以考虑将更新后的技术结果发送到这个流中。可以通过TridentState.newValuesStream方法来获取新的流的数据。 persistentAggregate Trident 使用一个称为persistentAggregate的方法来更新 State。你已经在前面的数据流单词统计的例子里见过了这个方法,这里再写一遍: TridentTopology topology = new TridentTopology(); TridentState wordCounts = topology.newStream("spout1", spout) .each(new Fields("sentence"), new Split(), new Fields("word")) .groupBy(new Fields("word")) .persistentAggregate(new MemoryMapState.Factory(), new Count(), new Fields("count")) partitionPersist 是一个接收 Trident 聚合器作为参数并对 state 数据源进行更新的方法,persistentAggregate 就是构建于 partitionPersist 上层的一个编程抽象。在这个例子里,由于是一个分组数据流(grouped stream),Trident 需要你提供一个实现MapState接口的 state。被分组的域就是 state 中的 key,而聚合的结果就是 state 中的 value。MapState接口是这样的: public interface MapState<T> extends State { List<T> multiGet(List<List<Object>> keys); List<T> multiUpdate(List<List<Object>> keys, List<ValueUpdater> updaters); void multiPut(List<List<Object>> keys, List<T> vals); } 而当你在非分组数据流上执行聚合操作时(全局聚合操作),Trident 需要你提供一个实现了Snapshottable接口的对象: public interface Snapshottable<T> extends State { T get(); T update(ValueUpdater updater); void set(T o); } MemoryMapState与MemcachedState都实现了上面两个接口。 实现 Map State 接口 实现MapState接口非常简单,Trident 几乎已经为你做好了所有的准备工作。OpaqueMap、TransactionalMap、与NonTransactionalMap类都分别实现了各自的容错性语义。你只需要为这些类提供一个用于对不同的 key/value 进行 multiGets 与 multiPuts 处理的 IBackingMap 实现类。IBackingMap接口是这样的: public interface IBackingMap<T> { List<T> multiGet(List<List<Object>> keys); void multiPut(List<List<Object>> keys, List<T> vals); } OpaqueMap 会使用OpaqueValue作为 vals 参数来调用 multiPut 方法,TransactionalMap 会使用TransactionalValue作为参数,而 NonTransactionalMap 则直接将拓扑中的对象传入。 Trident 也提供了一个CachedMap用于实现 K-V map 的自动 LRU 缓存功能。 最后,Trident 还提供了一个SnapshottableMap类,该类通过将全局聚合结果存入一个固定的 key 中的方法将 MapState 对象转化为一个 Snapshottable 对象。 可以参考MemcachedState的实现来了解如何将这些工具结合到一起来提供一个高性能的 MapState。MemcachedState支持选择模糊事务型、事务型或者非事务型语义。 转载自并发编程网 - ifeve.com

优秀的个人博客,低调大师

Apache Storm 官方文档 —— Trident Spouts

与一般的 Storm API 一样,spout 也是 Trident 拓扑的数据来源。不过,为了实现更复杂的功能服务,Trident Spout 在普通的 Storm Spout 之上另外提供了一些 API 接口。 数据源、数据流以及基于数据流更新 state(比如数据库)的操作,他们之间的耦合关系是不可避免的。Trident State一文中有这方面的详细解释,理解他们之间的这种联系对于理解 spout 的运作方式非常重要。 Trident 拓扑中的大部分 spout 都是非事务型 spout。在 Trident 拓扑中可以使用普通的IRichSpout接口来创建数据流: TridentTopology topology = new TridentTopology(); topology.newStream("myspoutid", new MyRichSpout()); Trident 拓扑中的所有 spout 都必须有一个唯一的标识,而且这个标识必须在整个 Storm 集群中都是唯一的。Trident 需要使用这个标识来存储 spout 从 ZooKeeper 中消费的元数据(metadata),包括 txid 以及其他相关的 spout 元数据。 你可以使用以下配置项来设置用于存储 spout 元数据的 ZooKeeper 地址(一般情况下不需要设置以下选项,因为 Storm 默认会直接使用集群的 ZooKeeper 服务器来存储数据 —— 译者注): transactional.zookeeper.servers:ZooKeeper 的服务器列表 transactional.zookeeper.port:ZooKeeper 集群的端口 transactional.zookeeper.root:元数据在 ZooKeeper 中存储的根目录。元数据会直接存储在该设置目录下。 管道 默认情况下,Trident 每次处理只一个 batch,知道该 batch 处理成功或者失败之后才会开始处理其他的 batch。你可以通过将 batch 管道化来提高吞吐率,降低每个 batch 的处理延时。同时处理的 batch 的最大数量可以通过topology.max.spout.pending来进行配置。 不过,即使在同时处理多个 batch 的情况下,Trident 也会按照 batch 的顺序来更新 state。例如,假如你正在处理一个将全局计数结果整合并更新到数据库中的任务,那么在你向数据库中更新 batch1 的计数结果时,你同时可以继续处理 batch2、batch3 甚至 batch10 的计数工作。不过,Trident 只会在 batch1 的 state 更新结束之后才会处理后续 batch 的 state 更新操作。这是实现恰好一次处理的语义的必要基础,我们已经在Trident State一文中讨论了这一点。 Trident spout 类型 下面列出了一些可用的 spout API 接口: ITridentSpout:这是最常用的 API,支持事务型和模糊事务型的语义实现。不过一般会根据需要使用它的某个已有的实现,而不是直接实现该接口。 IBatchSpout:非事务型 spout,每次会输出一个 batch 的 tuple。 IPartitionedTridentSpout:可以从分布式数据源(比如一个集群或者 Kafka 服务器)读取数据的事务型 spout。 OpaquePartitionedTridentSpout:可以从分布式数据源读取数据的模糊事务型 spout。 当然,正如这篇教程的开头提到的,除了这些 API 之外,你还可以使用普通的IRichSpout。 转载自并发编程网 - ifeve.com

优秀的个人博客,低调大师

Apache Storm 官方文档 —— 本地模式

本地模式是一种在本地进程中模拟 Storm 集群的工作模式,对于开发和测试拓扑很有帮助。在本地模式下运行拓扑与在集群模式下运行拓扑的方式很相似。 创建一个进程内的“集群”只需要使用LocalCluster类即可,例如: import backtype.storm.LocalCluster; LocalCluster cluster = new LocalCluster(); 随后,你就可以使用LocalCluster中的submitTopology方法来提交拓扑了。与StormSubmitter中相应的方法相似,submitTopology接收一个拓扑名称、拓扑配置以及拓扑对象作为输入参数。你也可以以拓扑名称为参数,使用killTopology方法来 kill 掉对应的拓扑。 使用以下语句关闭本地模式集群运行: cluster.shutdown(); 本地模式的常用配置 你可以在这里找到完整的配置项列表。以下是几个比较有用的配置项说明: Config.TOPOLOGY_MAX_TASK_PARALLELISM:该配置项设置了单个组件(bolt/spout)的线程数上限。生产环境下的拓扑往往含有很高的并行度(数百个线程),导致在本地模式下测试拓扑时会有较大的负载。这个配置项可以让你很容易地控制并行度。 Config.TOPOLOGY_DEBUG:此配置项设置为 true 时 Storm 会打印出 spout 或者 bolt 每一次发送消息的日志记录。这个功能对于调试拓扑很有用。 转载自并发编程网 - ifeve.com

优秀的个人博客,低调大师

Apache Storm 官方文档 —— 基础概念

Storm 系统中包含以下几个基本概念: 拓扑(Topologies) 流(Streams) 数据源(Spouts) 数据流处理组件(Bolts) 数据流分组(Stream groupings) 可靠性(Reliability) 任务(Tasks) 工作进程(Workers) 译者注:由于 Storm 的几个基础概念无论是直译还是意译均不够清晰,而且还会让习惯了 Storm 编程模型的读者感到困惑,因此后文在提及这些概念时大多还会以英文原文出现,希望大家能够谅解。 拓扑(Topologies) Storm 的拓扑是对实时计算应用逻辑的封装,它的作用与 MapReduce 的任务(Job)很相似,区别在于 MapReduce 的一个 Job 在得到结果之后总会结束,而拓扑会一直在集群中运行,直到你手动去终止它。拓扑还可以理解成由一系列通过数据流(Stream Grouping)相互关联的 Spout 和 Bolt 组成的的拓扑结构。Spout 和 Bolt 称为拓扑的组件(Component)。我们会在后文中给出这些概念的解释。 相关资料 TopologyBuilder:在 Java 中使用此类构造拓扑 在生产环境中运行拓扑 本地模式:通过本文学习如何在本地模式中开发、测试拓扑 数据流(Streams) 数据流(Streams)是 Storm 中最核心的抽象概念。一个数据流指的是在分布式环境中并行创建、处理的一组元组(tuple)的无界序列。数据流可以由一种能够表述数据流中元组的域(fields)的模式来定义。在默认情况下,元组(tuple)包含有整型(Integer)数字、长整型(Long)数字、短整型(Short)数字、字节(Byte)、双精度浮点数(Double)、单精度浮点数(Float)、布尔值以及字节数组等基本类型对象。当然,你也可以通过定义可序列化的对象来实现自定义的元组类型。 在声明数据流的时候需要给数据流定义一个有效的 id。不过,由于在实际应用中使用最多的还是单一数据流的 Spout 与 Bolt,这种场景下不需要使用 id 来区分数据流,因此可以直接使用OutputFieldsDeclarer来定义“无 id”的数据流。实际上,系统默认会给这种数据流定义一个名为“default”的 id。 相关资料 元组(Tuple):数据流由多个元组构成 OutputFieldsDeclarer:用于声明数据流和数据流对应的模式 序列化(Serialization):关于 Storm 元组的动态类型以及声明自定义序列化模型的相关内容 ISerialization:自定义的序列化模型必须实现该接口 CONFIG.TOPOLOGY_SERIALIZATIONS:自定义的序列化模型可以通过这个配置项实现注册 数据源(Spouts) 数据源(Spout)是拓扑中数据流的来源。一般 Spout 会从一个外部的数据源读取元组然后将他们发送到拓扑中。根据需求的不同,Spout 既可以定义为可靠的数据源,也可以定义为不可靠的数据源。一个可靠的 Spout 能够在它发送的元组处理失败时重新发送该元组,以确保所有的元组都能得到正确的处理;相对应的,不可靠的 Spout 就不会在元组发送之后对元组进行任何其他的处理。 一个 Spout 可以发送多个数据流。为了实现这个功能,可以先通过OutputFieldsDeclarer的declareStream方法来声明定义不同的数据流,然后在发送数据时在SpoutOutputCollector的emit方法中将数据流 id 作为参数来实现数据发送的功能。 Spout 中的关键方法是nextTuple。顾名思义,nextTuple要么会向拓扑中发送一个新的元组,要么会在没有可发送的元组时直接返回。需要特别注意的是,由于 Storm 是在同一个线程中调用所有的 Spout 方法,nextTuple不能被 Spout 的任何其他功能方法所阻塞,否则会直接导致数据流的中断(关于这一点,阿里的 JStorm 修改了 Spout 的模型,使用不同的线程来处理消息的发送,这种做法有利有弊,好处在于可以更加灵活地实现 Spout,坏处在于系统的调度模型更加复杂,如何取舍还是要看具体的需求场景吧——译者注)。 Spout 中另外两个关键方法是ack和fail,他们分别用于在 Storm 检测到一个发送过的元组已经被成功处理或处理失败后的进一步处理。注意,ack和fail方法仅仅对上述“可靠的” Spout 有效。 相关资料 IRichSpout:这是实现 Spout 的接口 消息的可靠性处理 数据流处理组件(Bolts) 拓扑中所有的数据处理均是由 Bolt 完成的。通过数据过滤(filtering)、函数处理(functions)、聚合(aggregations)、联结(joins)、数据库交互等功能,Bolt 几乎能够完成任何一种数据处理需求。 一个 Bolt 可以实现简单的数据流转换,而更复杂的数据流变换通常需要使用多个 Bolt 并通过多个步骤完成。例如,将一个微博数据流转换成一个趋势图像的数据流至少包含两个步骤:其中一个 Bolt 用于对每个图片的微博转发进行滚动计数,另一个或多个 Bolt 将数据流输出为“转发最多的图片”结果(相对于使用2个Bolt,如果使用3个 Bolt 你可以让这种转换具有更好的可扩展性)。 与 Spout 相同,Bolt 也可以输出多个数据流。为了实现这个功能,可以先通过OutputFieldsDeclarer的declareStream方法来声明定义不同的数据流,然后在发送数据时在OutputCollector的emit方法中将数据流 id 作为参数来实现数据发送的功能。 在定义 Bolt 的输入数据流时,你需要从其他的 Storm 组件中订阅指定的数据流。如果你需要从其他所有的组件中订阅数据流,你就必须要在定义 Bolt 时分别注册每一个组件。对于声明为默认 id(即上文中提到的“default”——译者注)的数据流,InputDeclarer支持订阅此类数据流的语法糖。也就是说,如果需要订阅来自组件“1”的数据流,declarer.shuffleGrouping("1")与declarer.shuffleGrouping("1", DEFAULT_STREAM_ID)两种声明方式是等价的。 Bolt 的关键方法是execute方法。execute方法负责接收一个元组作为输入,并且使用OutputCollector对象发送新的元组。如果有消息可靠性保障的需求,Bolt 必须为它所处理的每个元组调用OutputCollector的ack方法,以便 Storm 能够了解元组是否处理完成(并且最终决定是否可以响应最初的 Spout 输出元组树)。一般情况下,对于每个输入元组,在处理之后可以根据需要选择不发送还是发送多个新元组,然后再响应(ack)输入元组。IBasicBolt接口能够实现元组的自动应答。 在 Bolt 中启动新线程来进行异步处理是一种非常好的方式,因为OutputCollector是线程安全的对象,可以在任意时刻被调用(此处译者保留意见,由于 Storm 的并发设计和集群的弹性扩展机制,在 Bolt 中新建的线程可能存在一定的不可控风险——译者注)。 请注意OutputCollector不是线程安全的对象,所有的 emit、ack 和 fail 操作都需要在同一个线程中进行处理。更多信息请参考问题与解决一文。 相关资料 IRichBolt:用于定义 Bolt 的基本接口 IBasicBolt: 用于定义带有过滤或者其他简单的函数操作功能的 Bolt 的简便接口 OutputCollector:Bolt 使用此类来发送数据流 消息的可靠性处理 数据流分组(Stream groupings) 为拓扑中的每个 Bolt 的确定输入数据流是定义一个拓扑的重要环节。数据流分组定义了在 Bolt 的不同任务(tasks)中划分数据流的方式。 在 Storm 中有八种内置的数据流分组方式(原文有误,现在已经已经有八种分组模型——译者注),而且你还可以通过CustomStreamGrouping接口实现自定义的数据流分组模型。这八种分组分时分别为: 随机分组(Shuffle grouping):这种方式下元组会被尽可能随机地分配到 Bolt 的不同任务(tasks)中,使得每个任务所处理元组数量能够能够保持基本一致,以确保集群的负载均衡。 域分组(Fields grouping):这种方式下数据流根据定义的“域”来进行分组。例如,如果某个数据流是基于一个名为“user-id”的域进行分组的,那么所有包含相同的“user-id”的元组都会被分配到同一个任务中,这样就可以确保消息处理的一致性。 部分关键字分组(Partial Key grouping):这种方式与域分组很相似,根据定义的域来对数据流进行分组,不同的是,这种方式会考虑下游 Bolt 数据处理的均衡性问题,在输入数据源关键字不平衡时会有更好的性能1。感兴趣的读者可以参考这篇论文,其中详细解释了这种分组方式的工作原理以及它的优点。 完全分组(All grouping):这种方式下数据流会被同时发送到 Bolt 的所有任务中(也就是说同一个元组会被复制多份然后被所有的任务处理),使用这种分组方式要特别小心。 全局分组(Global grouping):这种方式下所有的数据流都会被发送到 Bolt 的同一个任务中,也就是 id 最小的那个任务。 非分组(None grouping):使用这种方式说明你不关心数据流如何分组。目前这种方式的结果与随机分组完全等效,不过未来 Storm 社区可能会考虑通过非分组方式来让 Bolt 和它所订阅的 Spout 或 Bolt 在同一个线程中执行。 直接分组(Direct grouping):这是一种特殊的分组方式。使用这种方式意味着元组的发送者可以指定下游的哪个任务可以接收这个元组。只有在数据流被声明为直接数据流时才能够使用直接分组方式。使用直接数据流发送元组需要使用OutputCollector的其中一个emitDirect方法。Bolt 可以通过TopologyContext来获取它的下游消费者的任务 id,也可以通过跟踪OutputCollector的emit方法(该方法会返回它所发送元组的目标任务的 id)的数据来获取任务 id。 本地或随机分组(Local or shuffle grouping):如果在源组件的 worker 进程里目标 Bolt 有一个或更多的任务线程,元组会被随机分配到那些同进程的任务中。换句话说,这与随机分组的方式具有相似的效果。 相关资料 TopologyBuilder:使用此类构造拓扑 InputDeclarer:在TopologyBuilder中调用setBolt方法时会返回这个对象的实例,通过该对象就可以定义 Bolt 的输入数据流以及数据流的分组方式 CoordinatedBolt:这个 Bolt 主要用于分布式 RPC 拓扑,其中大量使用了直接数据流与直接分组模型 可靠性(Reliability) Storm 可以通过拓扑来确保每个发送的元组都能得到正确处理。通过跟踪由 Spout 发出的每个元组构成的元组树可以确定元组是否已经完成处理。每个拓扑都有一个“消息延时”参数,如果 Storm 在延时时间内没有检测到元组是否处理完成,就会将该元组标记为处理失败,并会在稍后重新发送该元组。 为了充分利用 Storm 的可靠性机制,你必须在元组树创建新结点的时候以及元组处理完成的时候通知 Storm。这个过程可以在 Bolt 发送元组时通过OutputCollector实现:在emit方法中实现元组的锚定(Anchoring),同时使用ack方法表明你已经完成了元组的处理。 关于可靠性保障的更多内容可以参考这篇文章:消息的可靠性处理。 任务(Tasks) 在 Storm 集群中每个 Spout 和 Bolt 都由若干个任务(tasks)来执行。每个任务都与一个执行线程相对应。数据流分组可以决定如何由一组任务向另一组任务发送元组。你可以在TopologyBuilder的setSpout方法和setBolt方法中设置 Spout/Bolt 的并行度。 工作进程(Workers) 拓扑是在一个或多个工作进程(worker processes)中运行的。每个工作进程都是一个实际的 JVM 进程,并且执行拓扑的一个子集。例如,如果拓扑的并行度定义为300,工作进程数定义为50,那么每个工作进程就会执行6个任务(进程内部的线程)。Storm 会在所有的 worker 中分散任务,以便实现集群的负载均衡。 相关资料 Config.TOPOLOGY_WORKERS:这个配置项用于设置拓扑的工作进程数 1Partial Key grouping 方式目前仅支持开发版,尚未加入 Storm 的正式发行版,不过可以通过CustomStreamGrouping间接实现该分组功能,具体的实现可以参考PartialKeyGrouping源代码。 转载自并发编程网 - ifeve.com

优秀的个人博客,低调大师

Apache Storm 官方文档 —— 容错性

本文通过问答的形式解释了 Storm 的容错性原理。 工作进程(worker)死亡时会发生什么? 工作进程死亡的时候,supervisor 会重新启动这个进程。如果在启动过程中仍然一直失败,并且无法向 Nimbus 发送心跳,Nimbus 就会将这个 worker 重新分配到其他机器上去。 节点故障时会发生什么? 一个节点(集群中的工作节点,非 Nimbus 所在服务器)故障时,该节点上所有的任务(tasks)都会超时,然后 Nimbus 在检测到超时后会将所有的这些任务重新分配到其他机器上去。 Nimbus 或者 Supervisor 的后台进程挂掉时会发生什么 Nimbus 和 Supervisor 的后台进程本身是设计为快速失败(无论何时发生了异常情况之后都会启动自毁操作)和无状态(所有的状态由 ZooKeeper 负责管理)的。正如配置 Storm 集群这篇文章中所述,Nimbus 和 Supervisor 的后台进程实际上是在后台监控工具的监控之下运行的。所以,如果 Nimbus 或者 Supervisor 进程挂掉,他们就会静默地自动重启。 值得一提的是,Nimbus 和 Supervisor 的故障不会影响任何工作进程。这一点与 Hadoop 形成了鲜明对比,在 Hadoop 中 JobTracker 的故障会导致所有正在运行的 job 运行失败(Hadoop 2.x 中引入的 Yarn 架构已经提升了 Hadoop 系统的稳定性,目前的架构并不会如这里所说的那么不堪——译者注)。 Nimbus 是系统的单故障点1吗? 如果你的 Nimbus 节点出现故障无法访问,集群中的 worker 仍然会继续保持运行。另外,此时 Supervisor 也仍然会正常工作,在 worker 挂掉时自动重启挂掉的进程。但是,由于缺少 Nimbus 的协调,worker 就不会在必要的时候重新分配到不同的机器中(看上去好像你丢失了一个 worker)。 所以这个问题的答案是,Nimbus 确实稍微有一点像 SPOF(单故障点,Single Point of Failure)。不过在实际应用中,Nimbus 的故障从来就不是什么问题。未来的开发计划中还会考虑让 Nimbus 具备更好的可用性。 Storm 是如何保障消息的完全处理的? 对于节点故障或者消息丢失的情况,Storm 提供了一套完善的机制保障所有的消息都能够得到正确处理。消息的可靠性保障一文中解释了这方面更多的技术细节。 1单故障点是指在一个系统中的某个在失效或停止运转后会导致整个系统不能工作的部件,具体概念可以参考维基百科。 转载自并发编程网 - ifeve.com

优秀的个人博客,低调大师

《Spark 官方文档》Spark编程指南

Spark编程指南 概述 总体上来说,每个Spark应用都包含一个驱动器(driver)程序,驱动器运行用户的main函数,并在集群上执行各种并行操作。 Spark最重要的一个抽象概念就是弹性分布式数据集(resilient distributed dataset – RDD),RDD是一个可分区的元素集合,其包含的元素可以分布在集群各个节点上,并且可以执行一些分布式并行操作。RDD通常是通过,HDFS(或者其他Hadoop支持的文件系统)上的文件,或者驱动器中的Scala集合对象,来创建或转换得到;其次,用户也可以请求Spark将RDD持久化到内存里,以便在不同的并行操作里复用之;最后,RDD具备容错性,可以从节点失败中自动恢复数据。 Spark第二个重要抽象概念是共享变量,共享变量是一种可以在并行操作之间共享使用的变量。默认情况下,当Spark把一系列任务调度到不同节点上运行时,Spark会同时把每个变量的副本和任务代码一起发送给各个节点。但有时候,我们需要在任务之间,或者任务和驱动器之间共享一些变量。Spark提供了两种类型的共享变量:广播变量和累加器,广播变量可以用来在各个节点上缓存数据,而累加器则是用来执行跨节点的“累加”操作,例如:计数和求和。 本文将会使用Spark所支持的所有语言来展示Spark的特性。如果你能启动Spark的交互式shell动手实验一下,效果会更好(对scala请使用bin/spark-shell,而对于python,请使用bin/pyspark)。 链接Spark Scala Java Python Spark 1.6.0 使用了Scala 2.10。用Scala写应用的话,你需要使用一个兼容的Scala版本(如:2.10.X) 同时,如果你需要在maven中依赖Spark,可以用如下maven工件标识: groupId = org.apache.spark artifactId = spark-core_2.10 version = 1.6.0 另外,如果你需要访问特定版本的HDFS,那么你可能需要增加相应版本的hadoop-client依赖项,其maven工件标识如下: groupId = org.apache.hadoop artifactId = hadoop-client version = <your-hdfs-version> 最后,你需要如下,在你的代码里导入一些Spark class: import org.apache.spark.SparkContext import org.apache.spark.SparkConf (在Spark 1.3.0之前,你需要显示的import org.apache.spark.SparkContext._来启用这些重要的隐式转换) 初始化Spark Scala Java Python Spark应用程序需要做的第一件事就是创建一个SparkContext对象,SparkContext对象决定了Spark如何访问集群。而要新建一个SparkContext对象,你还得需要构造一个SparkConf对象,SparkConf对象包含了你的应用程序的配置信息。 每个JVM进程中,只能有一个活跃(active)的SparkContext对象。如果你非要再新建一个,那首先必须将之前那个活跃的SparkContext 对象stop()掉。 val conf = new SparkConf().setAppName(appName).setMaster(master) new SparkContext(conf) appName参数值是你的应用展示在集群UI上的应用名称。master参数值是Spark, Mesos or YARN cluster URL或者特殊的“local”(本地模式)。实际上,一般不应该将master参数值硬编码到代码中,而是应该用spark-submit脚本的参数来设置。然而,如果是本地测试或单元测试中,你可以直接在代码里给master参数写死一个”local”值。 使用shell Scala Python 在Spark shell中,默认已经为你新建了一个SparkContext对象,变量名为sc。所以spark-shell里不能自建SparkContext对象。你可以通过–master参数设置要连接到哪个集群,而且可以给–jars参数传一个逗号分隔的jar包列表,以便将这些jar包加到classpath中。你还可以通过–packages设置逗号分隔的maven工件列表,以便增加额外的依赖项。同样,还可以通过–repositories参数增加maven repository地址。下面是一个示例,在本地4个CPU core上运行的实例: $./bin/spark-shell –masterlocal[4] 或者,将code.jar添加到classpath下: $ ./bin/spark-shell --master local[4] --jars code.jar 通过maven标识添加依赖: $ ./bin/spark-shell --master local[4] --packages "org.example:example:0.1" spark-shell –help可以查看完整的选项列表。实际上,spark-shell是在后台调用spark-submit来实现其功能的(spark-submitscript.) 弹性分布式数据集(RDD) Spark的核心概念是弹性分布式数据集(RDD),RDD是一个可容错、可并行操作的分布式元素集合。总体上有两种方法可以创建RDD对象:由驱动程序中的集合对象通过并行化操作创建,或者从外部存储系统中数据集加载(如:共享文件系统、HDFS、HBase或者其他Hadoop支持的数据源)。 并行化集合 Scala Java Python 并行化集合是以一个已有的集合对象(例如:Scala Seq)为参数,调用 SparkContext.parallelize() 方法创建得到的RDD。集合对象中所有的元素都将被复制到一个可并行操作的分布式数据集中。例如,以下代码将一个1到5组成的数组并行化成一个RDD: val data = Array(1, 2, 3, 4, 5) val distData = sc.parallelize(data) 一旦创建成功,该分布式数据集(上例中的distData)就可以执行一些并行操作。如,distData.reduce((a, b) => a + b),这段代码会将集合中所有元素加和。后面我们还会继续讨论分布式数据集上的各种操作。 并行化集合的一个重要参数是分区(partition),即这个分布式数据集可以分割为多少片。Spark中每个任务(task)都是基于分区的,每个分区一个对应的任务(task)。典型场景下,一般每个CPU对应2~4个分区。并且一般而言,Spark会基于集群的情况,自动设置这个分区数。当然,你还是可以手动控制这个分区数,只需给parallelize方法再传一个参数即可(如:sc.parallelize(data, 10) )。注意:Spark代码里有些地方仍然使用分片(slice)这个术语,这只不过是分区的一个别名,主要为了保持向后兼容。 外部数据集 Scala Java Python Spark 可以通过Hadoop所支持的任何数据源来创建分布式数据集,包括:本地文件系统、HDFS、Cassandra、HBase、Amazon S3等。Spark 支持的文件格式包括:文本文件(text files)、SequenceFiles,以及其他Hadoop 支持的输入格式(InputFormat)。 文本文件创建RDD可以用 SparkContext.textFile 方法。这个方法输入参数是一个文件的URI(本地路径,或者 hdfs://,s3n:// 等),其输出RDD是一个文本行集合。以下是一个简单示例: scala> val distFile = sc.textFile("data.txt") distFile: RDD[String] = MappedRDD@1d4cee08 创建后,distFile 就可以执行数据集的一些操作。比如,我们可以把所有文本行的长度加和:distFile.map(s => s.length).reduce((a, b) => a + b) 以下是一些Spark读取文件的要点: 如果是本地文件系统,那么这个文件必须在所有的worker节点上能够以相同的路径访问到。所以要么把文件复制到所有worker节点上同一路径下,要么挂载一个共享文件系统。 所有Spark基于文件输入的方法(包括textFile)都支持输入参数为:目录,压缩文件,以及通配符。例如:textFile(“/my/directory”), textFile(“/my/directory/*.txt”), 以及 textFile(“/my/directory/*.gz”) textFile方法同时还支持一个可选参数,用以控制数据的分区个数。默认地,Spark会为文件的每一个block创建一个分区(HDFS上默认block大小为64MB),你可以通过调整这个参数来控制数据的分区数。注意,分区数不能少于block个数。 除了文本文件之外,Spark的Scala API还支持其他几种数据格式: SparkContext.wholeTextFiles 可以读取一个包含很多小文本文件的目录,并且以 (filename, content) 键值对的形式返回结果。这与textFile 不同,textFile只返回文件的内容,每行作为一个元素。 对于SequenceFiles,可以调用 SparkContext.sequenceFile[K, V],其中 K 和 V 分别是文件中key和value的类型。这些类型都应该是Writable接口的子类, 如:IntWritableandText等。另外,Spark 允许你为一些常用Writable指定原生类型,例如:sequenceFile[Int, String] 将自动读取 IntWritable 和 Text。 对于其他的Hadoop InputFormat,你可以用 SparkContext.hadoopRDD 方法,并传入任意的JobConf 对象和 InputFormat,以及key class、value class。这和设置Hadoop job的输入源是同样的方法。你还可以使用 SparkContext.newAPIHadoopRDD,该方法接收一个基于新版Hadoop MapReduce API (org.apache.hadoop.mapreduce)的InputFormat作为参数。 RDD.saveAsObjectFile 和 SparkContext.objectFile 支持将RDD中元素以Java对象序列化的格式保存成文件。虽然这种序列化方式不如Avro效率高,却为保存RDD提供了一种简便方式。 RDD算子 RDD支持两种类型的算子(operation):transformation算子 和 action算子;transformation算子可以将已有RDD转换得到一个新的RDD,而action算子则是基于数据集计算,并将结果返回给驱动器(driver)。例如,map是一个transformation算子,它将数据集中每个元素传给一个指定的函数,并将该函数返回结果构建为一个新的RDD;而 reduce是一个action算子,它可以将RDD中所有元素传给指定的聚合函数,并将最终的聚合结果返回给驱动器(还有一个reduceByKey算子,其返回的聚合结果是一个数据集)。 Spark中所有transformation算子都是懒惰的,也就是说,这些算子并不立即计算结果,而是记录下对基础数据集(如:一个数据文件)的转换操作。只有等到某个action算子需要计算一个结果返回给驱动器的时候,transformation算子所记录的操作才会被计算。这种设计使Spark可以运行得更加高效 – 例如,map算子创建了一个数据集,同时该数据集下一步会调用reduce算子,那么Spark将只会返回reduce的最终聚合结果(单独的一个数据)给驱动器,而不是将map所产生的数据集整个返回给驱动器。 默认情况下,每次调用action算子的时候,每个由transformation转换得到的RDD都会被重新计算。然而,你也可以通过调用persist(或者cache)操作来持久化一个RDD,这意味着Spark将会把RDD的元素都保存在集群中,因此下一次访问这些元素的速度将大大提高。同时,Spark还支持将RDD元素持久化到内存或者磁盘上,甚至可以支持跨节点多副本。 基础 Scala Java Python 以下简要说明一下RDD的基本操作,参考如下代码: val lines = sc.textFile("data.txt") val lineLengths = lines.map(s => s.length) val totalLength = lineLengths.reduce((a, b) => a + b) 其中,第一行是从外部文件加载数据,并创建一个基础RDD。这时候,数据集并没有加载进内存除非有其他操作施加于lines,这时候的lines RDD其实可以说只是一个指向 data.txt 文件的指针。第二行,用lines通过map转换得到一个lineLengths RDD,同样,lineLengths也是懒惰计算的。最后,我们使用 reduce算子计算长度之和,reduce是一个action算子。此时,Spark将会把计算分割为一些小的任务,分别在不同的机器上运行,每台机器上都运行相关的一部分map任务,并在本地进行reduce,并将这些reduce结果都返回给驱动器。 如果我们后续需要重复用到 lineLengths RDD,我们可以增加一行: lineLengths.persist() 这一行加在调用 reduce 之前,则 lineLengths RDD 首次计算后,Spark会将其数据保存到内存中。 将函数传给Spark Scala Java Python Spark的API 很多都依赖于在驱动程序中向集群传递操作函数。以下是两种建议的实现方式: 匿名函数(Anonymous function syntax),这种方式代码量比较少。 全局单件中的静态方法。例如,你可以按如下方式定义一个 object MyFunctions 并传递其静态成员函数 MyFunctions.func1: object MyFunctions { def func1(s: String): String = { ... } } myRdd.map(MyFunctions.func1) 注意,技术上来说,你也可以传递一个类对象实例上的方法(不是单件对象),不过这回导致传递函数的同时,需要把相应的对象也发送到集群中各节点上。例如,我们定义一个MyClass如下: class MyClass { def func1(s: String): String = { ... } def doStuff(rdd: RDD[String]): RDD[String] = { rdd.map(func1) } } 如果我们 new MyClass 创建一个实例,并调用其 doStuff 方法,同时doStuff中的 map算子引用了该MyClass实例上的 func1 方法,那么接下来,这个MyClass对象将被发送到集群中所有节点上。rdd.map(x => this.func1(x)) 也会有类似的效果。 类似地,如果应用外部对象的成员变量,也会导致对整个对象实例的引用: class MyClass { val field = "Hello" def doStuff(rdd: RDD[String]): RDD[String] = { rdd.map(x => field + x) } } 上面的代码对field的引用等价于 rdd.map(x => this.field + x),这将导致应用整个this对象。为了避免类似问题,最简单的方式就是,将field固执到一个本地临时变量中,而不是从外部直接访问之,如下: def doStuff(rdd: RDD[String]): RDD[String] = { val field_ = this.field rdd.map(x => field_ + x) } 理解闭包 Spark里一个比较难的事情就是,理解在整个集群上跨节点执行的变量和方法的作用域以及生命周期。Spark里一个频繁出现的问题就是RDD算子在变量作用域之外修改了其值。下面的例子,我们将会以foreach() 算子为例,来递增一个计数器counter,不过类似的问题在其他算子上也会出现。 示例 考虑如下例子,我们将会计算RDD中原生元素的总和,如果不是在同一个JVM中执行,其表现将有很大不同。例如,这段代码如果使用Spark本地模式(–master=local[n])运行,和在集群上运行(例如,用spark-submit提交到YARN上)结果完全不同。 Scala Java Python var counter = 0 var rdd = sc.parallelize(data) // Wrong: Don't do this!! rdd.foreach(x => counter += x) println("Counter value: " + counter) 本地模式 v.s. 集群模式 上面这段代码其行为是不确定的。在本地模式下运行,所有代码都在运行于单个JVM中,所以RDD的元素都能够被累加并保存到counter变量中,这是因为本地模式下,counter变量和驱动器节点在同一个内存空间中。 然而,在集群模式下,情况会更复杂,以上代码的运行结果就不是所预期的结果了。为了执行这个作业,Spark会将RDD算子的计算过程分割成多个独立的任务(task)- 每个任务分发给不同的执行器(executor)去执行。而执行之前,Spark需要计算闭包。闭包是由执行器执行RDD算子(本例中的foreach())时所需要的变量和方法组成的。闭包将会被序列化,并发送给每个执行器。由于本地模式下,只有一个执行器,所有任务都共享同样的闭包。而在其他模式下,情况则有所不同,每个执行器都运行于不同的worker节点,并且都拥有独立的闭包副本。 在上面的例子中,闭包中的变量会跟随不同的闭包副本,发送到不同的执行器上,所以等到foreach真正在执行器上运行时,其引用的counter已经不再是驱动器上所定义的那个counter副本了,驱动器内存中仍然会有一个counter变量副本,但是这个副本对执行器是不可见的!执行器只能看到其所收到的序列化闭包中包含的counter副本。因此,最终驱动器上得到的counter将会是0。 为了确保类似这样的场景下,代码能有确定的行为,这里应该使用累加器(Accumulator)。累加器是Spark中专门用于集群跨节点分布式执行计算中,安全地更新同一变量的机制。本指南中专门有一节详细说明累加器。 通常来说,闭包(由循环或本地方法组成),不应该改写全局状态。Spark中改写闭包之外对象的行为是未定义的。这种代码,有可能在本地模式下能正常工作,但这只是偶然情况,同样的代码在分布式模式下其行为很可能不是你想要的。所以,如果需要全局聚合,请记得使用累加器(Accumulator)。 打印RDD中的元素 另一种常见习惯是,试图用 rdd.foreach(println) 或者 rdd.map(println) 来打印RDD中所有的元素。如果是在单机上,这种写法能够如预期一样,打印出RDD所有元素。然后,在集群模式下,这些输出将会被打印到执行器的标准输出(stdout)上,因此驱动器的标准输出(stdout)上神马也看不到!如果真要在驱动器上把所有RDD元素都打印出来,你可以先调用collect算子,把RDD元素先拉倒驱动器上来,代码可能是这样:rdd.collect().foreach(println)。不过如果RDD很大的话,有可能导致驱动器内存溢出,因为collect会把整个RDD都弄到驱动器所在单机上来;如果你只是需要打印一部分元素,那么take是不更安全的选择:rdd.take(100).foreach(println) 使用键值对 Scala Java Python 大部分Spark算子都能在包含任意类型对象的RDD上工作,但也有一部分特殊的算子要求RDD包含的元素必须是键值对(key-value pair)。这种算子常见于做分布式混洗(shuffle)操作,如:以key分组或聚合。 在Scala中,这种操作在包含Tuple2(内建与scala语言,可以这样创建:(a, b) )类型对象的RDD上自动可用。键值对操作是在PairRDDFunctions类上可用,这个类型也会自动包装到包含tuples的RDD上。 例如,以下代码将使用reduceByKey算子来计算文件中每行文本出现的次数: val lines = sc.textFile("data.txt") val pairs = lines.map(s => (s, 1)) val counts = pairs.reduceByKey((a, b) => a + b) 同样,我们还可以用 counts.sortByKey() 来对这些键值对按字母排序,最后再用 counts.collect() 将数据以对象数据组的形式拉到驱动器内存中。 注意:如果使用自定义类型对象做键值对中的key的话,你需要确保自定义类型实现了 equals() 方法(通常需要同时也实现hashCode()方法)。完整的细节可以参考:Object.hashCode() documentation 转换算子 – transformation 以下是Spark支持的一些常用transformation算子。详细请参考RDD API doc (Scala,Java,Python,R) 以及 键值对 RDD 函数 (Scala,Java) 。 transformation算子 作用 map(func) 返回一个新的分布式数据集,其中每个元素都是由源RDD中一个元素经func转换得到的。 filter(func) 返回一个新的数据集,其中包含的元素来自源RDD中元素经func过滤后(func返回true时才选中)的结果 flatMap(func) 类似于map,但每个输入元素可以映射到0到n个输出元素(所以要求func必须返回一个Seq而不是单个元素) mapPartitions(func) 类似于map,但基于每个RDD分区(或者数据block)独立运行,所以如果RDD包含元素类型为T,则 func 必须是 Iterator<T> => Iterator<U> 的映射函数。 mapPartitionsWithIndex(func) 类似于 mapPartitions,只是func 多了一个整型的分区索引值,因此如果RDD包含元素类型为T,则func 必须是 Iterator<T> => Iterator<U> 的映射函数。 sample(withReplacement,fraction,seed) 采样部分(比例取决于fraction)数据,同时可以指定是否使用回置采样(withReplacement),以及随机数种子(seed) union(otherDataset) 返回源数据集和参数数据集(otherDataset)的并集 intersection(otherDataset) 返回源数据集和参数数据集(otherDataset)的交集 distinct([numTasks])) 返回对源数据集做元素去重后的新数据集 groupByKey([numTasks]) 只对包含键值对的RDD有效,如源RDD包含 (K, V) 对,则该算子返回一个新的数据集包含 (K, Iterable<V>) 对。 注意:如果你需要按key分组聚合的话(如sum或average),推荐使用 reduceByKey或者 aggregateByKey 以获得更好的性能。 注意:默认情况下,输出计算的并行度取决于源RDD的分区个数。当然,你也可以通过设置可选参数 numTasks 来指定并行任务的个数。 reduceByKey(func, [numTasks]) 如果源RDD包含元素类型(K, V) 对,则该算子也返回包含(K, V) 对的RDD,只不过每个key对应的value是经过func聚合后的结果,而func本身是一个 (V, V) => V 的映射函数。 另外,和 groupByKey 类似,可以通过可选参数numTasks指定reduce任务的个数。 aggregateByKey(zeroValue)(seqOp,combOp, [numTasks]) 如果源RDD包含 (K, V) 对,则返回新RDD包含 (K, U) 对,其中每个key对应的value都是由combOp函数 和 一个“0”值zeroValue聚合得到。允许聚合后value类型和输入value类型不同,避免了不必要的开销。和 groupByKey 类似,可以通过可选参数numTasks指定reduce任务的个数。 sortByKey([ascending], [numTasks]) 如果源RDD包含元素类型(K, V) 对,其中K可排序,则返回新的RDD包含 (K, V) 对,并按照 K 排序(升序还是降序取决于ascending参数) join(otherDataset, [numTasks]) 如果源RDD包含元素类型(K, V) 且参数RDD(otherDataset)包含元素类型(K, W),则返回的新RDD中将包含内关联后key对应的 (K, (V, W)) 对。外关联(Outer joins)操作请参考 leftOuterJoin、rightOuterJoin 以及 fullOuterJoin 算子。 cogroup(otherDataset, [numTasks]) 如果源RDD包含元素类型(K, V) 且参数RDD(otherDataset)包含元素类型(K, W),则返回的新RDD中包含 (K, (Iterable<V>, Iterable<W>))。该算子还有个别名:groupWith cartesian(otherDataset) 如果源RDD包含元素类型 T 且参数RDD(otherDataset)包含元素类型 U,则返回的新RDD包含前二者的笛卡尔积,其元素类型为 (T, U) 对。 pipe(command,[envVars]) 以shell命令行管道处理RDD的每个分区,如:Perl 或者 bash 脚本。 RDD中每个元素都将依次写入进程的标准输入(stdin),然后按行输出到标准输出(stdout),每一行输出字符串即成为一个新的RDD元素。 coalesce(numPartitions) 将RDD的分区数减少到numPartitions。当以后大数据集被过滤成小数据集后,减少分区数,可以提升效率。 repartition(numPartitions) 将RDD数据重新混洗(reshuffle)并随机分布到新的分区中,使数据分布更均衡,新的分区个数取决于numPartitions。该算子总是需要通过网络混洗所有数据。 repartitionAndSortWithinPartitions(partitioner) 根据partitioner(spark自带有HashPartitioner和RangePartitioner等)重新分区RDD,并且在每个结果分区中按key做排序。这是一个组合算子,功能上等价于先 repartition 再在每个分区内排序,但这个算子内部做了优化(将排序过程下推到混洗同时进行),因此性能更好。 动作算子 – action 以下是Spark支持的一些常用action算子。详细请参考RDD API doc (Scala,Java,Python,R) 以及 键值对 RDD 函数 (Scala,Java) 。 Action算子 作用 reduce(func) 将RDD中元素按func进行聚合(func是一个 (T,T) => T 的映射函数,其中T为源RDD元素类型,并且func需要满足交换律 和 结合律以便支持并行计算) collect() 将数据集中所有元素以数组形式返回驱动器(driver)程序。通常用于,在RDD进行了filter或其他过滤操作后,将一个足够小的数据子集返回到驱动器内存中。 count() 返回数据集中元素个数 first() 返回数据集中首个元素(类似于 take(1) ) take(n) 返回数据集中前n个元素 takeSample(withReplacement,num, [seed]) 返回数据集的随机采样子集,最多包含num个元素,withReplacement表示是否使用回置采样,最后一个参数为可选参数seed,随机数生成器的种子。 takeOrdered(n,[ordering]) 按元素排序(可以通过 ordering 自定义排序规则)后,返回前n个元素 saveAsTextFile(path) 将数据集中元素保存到指定目录下的文本文件中(或者多个文本文件),支持本地文件系统、HDFS 或者其他任何Hadoop支持的文件系统。 保存过程中,Spark会调用每个元素的toString方法,并将结果保存成文件中的一行。 saveAsSequenceFile(path) (Java and Scala) 将数据集中元素保存到指定目录下的Hadoop Sequence文件中,支持本地文件系统、HDFS 或者其他任何Hadoop支持的文件系统。适用于实现了Writable接口的键值对RDD。在Scala中,同样也适用于能够被隐式转换为Writable的类型(Spark实现了所有基本类型的隐式转换,如:Int,Double,String 等) saveAsObjectFile(path) (Java and Scala) 将RDD元素以Java序列化的格式保存成文件,保存结果文件可以使用 SparkContext.objectFile 来读取。 countByKey() 只适用于包含键值对(K, V)的RDD,并返回一个哈希表,包含 (K, Int) 对,表示每个key的个数。 foreach(func) 在RDD的每个元素上运行 func 函数。通常被用于累加操作,如:更新一个累加器(Accumulator) 或者 和外部存储系统互操作。 注意:用 foreach 操作出累加器之外的变量可能导致未定义的行为。更详细请参考前面的“理解闭包”(Understanding closures)这一小节。 混洗操作 有一些Spark算子会触发众所周知的混洗(Shuffle)事件。Spark中的混洗机制是用于将数据重新分布,其结果是所有数据将在各个分区间重新分组。一般情况下,混洗需要跨执行器(executor)或跨机器复制数据,这也是混洗操作一般都比较复杂而且开销大的原因。 背景 为了理解混洗阶段都发生了哪些事,我首先以reduceByKey算子为例来看一下。reduceByKey算子会生成一个新的RDD,将源RDD中一个key对应的多个value组合进一个tuple - 然后将这些values输入给reduce函数,得到的result再和key关联放入新的RDD中。这个算子的难点在于对于某一个key来说,并非其对应的所有values都在同一个分区(partition)中,甚至有可能都不在同一台机器上,但是这些values又必须放到一起计算reduce结果。 在Spark中,通常是由于为了进行某种计算操作,而将数据分布到所需要的各个分区当中。而在计算阶段,单个任务(task)只会操作单个分区中的数据 – 因此,为了组织好每个reduceByKey中reduce任务执行时所需的数据,Spark需要执行一个多对多操作。即,Spark需要读取RDD的所有分区,并找到所有key对应的所有values,然后跨分区传输这些values,并将每个key对应的所有values放到同一分区,以便后续计算各个key对应values的reduce结果 – 这个过程就叫做混洗(Shuffle)。 虽然混洗好后,各个分区中的元素和分区自身的顺序都是确定的,但是分区中元素的顺序并非确定的。如果需要混洗后分区内的元素有序,可以参考使用以下混洗操作: mapPartitions 使用 .sorted 对每个分区排序 repartitionAndSortWithinPartitions重分区的同时,对分区进行排序,比自行组合repartition和sort更高效 sortBy 创建一个全局有序的RDD 会导致混洗的算子有:重分区(repartition)类算子,如:repartition和coalesce;ByKey类算子(除了计数类的,如 countByKey) 如:groupByKey和reduceByKey;以及Join类算子,如:cogroup和join. 性能影响 混洗(Shuffle)之所以开销大,是因为混洗操作需要引入磁盘I/O,数据序列化以及网络I/O等操作。为了组织好混洗数据,Spark需要生成对应的任务集 – 一系列map任务用于组织数据,再用一系列reduce任务来聚合数据。注意这里的map、reduce是来自MapReduce的术语,和Spark的map、reduce算子并没有直接关系。 在Spark内部,单个map任务的输出会尽量保存在内存中,直至放不下为止。然后,这些输出会基于目标分区重新排序,并写到一个文件里。在reduce端,reduce任务只读取与之相关的并已经排序好的blocks。 某些混洗算子会导致非常明显的内存开销增长,因为这些算子需要在数据传输前后,在内存中维护组织数据记录的各种数据结构。特别地,reduceByKey和aggregateByKey都会在map端创建这些数据结构,而ByKey系列算子都会在reduce端创建这些数据结构。如果数据在内存中存不下,Spark会把数据吐到磁盘上,当然这回导致额外的磁盘I/O以及垃圾回收的开销。 混洗还会再磁盘上生成很多临时文件。以Spark-1.3来说,这些临时文件会一直保留到其对应的RDD被垃圾回收才删除。之所以这样做,是因为如果血统信息需要重新计算的时候,这些混洗文件可以不必重新生成。如果程序持续引用这些RDD或者垃圾回收启动频率较低,那么这些垃圾回收可能需要等较长的一段时间。这就意味着,长时间运行的Spark作业可能会消耗大量的磁盘。Spark的临时存储目录,是由spark.local.dir 配置参数指定的。 混洗行为可以由一系列配置参数来调优。参考Spark配置指南(Spark Configuration Guide)中“混洗行为”这一小节。 RDD持久化 Spark的一项关键能力就是它可以持久化(或者缓存)数据集在内存中,从而跨操作复用这些数据集。如果你持久化了一个RDD,那么每个节点上都会存储该RDD的一些分区,这些分区是由对应的节点计算出来并保持在内存中,后续可以在其他施加在该RDD上的action算子中复用(或者从这些数据集派生新的RDD)。这使得后续动作的速度提高很多(通常高于10倍)。因此,缓存对于迭代算法和快速交互式分析是一个很关键的工具。 你可以用persist() 或者 cache() 来标记一下需要持久化的RDD。等到该RDD首次被施加action算子的时候,其对应的数据分区就会被保留在内存里。同时,Spark的缓存具备一定的容错性 – 如果RDD的任何一个分区丢失了,Spark将自动根据其原来的血统信息重新计算这个分区。 另外,每个持久化的RDD可以使用不同的存储级别,比如,你可以把RDD保存在磁盘上,或者以java序列化对象保存到内存里(为了省空间),或者跨节点多副本,或者使用Tachyon存到虚拟机以外的内存里。这些存储级别都可以由persist()的参数StorageLevel对象来控制。cache() 方法本身就是一个使用默认存储级别做持久化的快捷方式,默认存储级别是 StorageLevel.MEMORY_ONLY(以java序列化方式存到内存里)。完整的存储级别列表如下: 存储级别 含义 MEMORY_ONLY 以未序列化的Java对象形式将RDD存储在JVM内存中。如果RDD不能全部装进内存,那么将一部分分区缓存,而另一部分分区将每次用到时重新计算。这个是Spark的RDD的默认存储级别。 MEMORY_AND_DISK 以未序列化的Java对象形式存储RDD在JVM中。如果RDD不能全部装进内存,则将不能装进内存的分区放到磁盘上,然后每次用到的时候从磁盘上读取。 MEMORY_ONLY_SER 以序列化形式存储RDD(每个分区一个字节数组)。通常这种方式比未序列化存储方式要更省空间,尤其是如果你选用了一个比较好的序列化协议(fast serializer),但是这种方式也相应的会消耗更多的CPU来读取数据。 MEMORY_AND_DISK_SER 和MEMORY_ONLY_SER类似,只是当内存装不下的时候,会将分区的数据吐到磁盘上,而不是每次用到都重新计算。 DISK_ONLY RDD数据只存储于磁盘上。 MEMORY_ONLY_2, MEMORY_AND_DISK_2, etc. 和上面没有”_2″的级别相对应,只不过每个分区数据会在两个节点上保存两份副本。 OFF_HEAP (实验性的) 将RDD以序列化格式保存到Tachyon。与MEMORY_ONLY_SER相比,OFF_HEAP减少了垃圾回收开销,并且使执行器(executor)进程更小且可以共用同一个内存池,这一特性在需要大量消耗内存和多Spark应用并发的场景下比较吸引人。而且,因为RDD存储于Tachyon中,所以一个执行器挂了并不会导致数据缓存的丢失。这种模式下Tachyon 的内存是可丢弃的。因此,Tachyon并不会重建一个它逐出内存的block。如果你打算用Tachyon做为堆外存储,Spark和Tachyon具有开箱即用的兼容性。请参考这里,有建议使用的Spark和Tachyon的匹配版本对:page。 注意:在Python中存储的对象总是会使用Pickle做序列化,所以这时是否选择一个序列化级别已经无关紧要了。 Spark会自动持久化一些混洗操作(如:reduceByKey)的中间数据,即便用户根本没有调用persist。这么做是为了避免一旦有一个节点在混洗过程中失败,就要重算整个输入数据。当然,我们还是建议对需要重复使用的RDD调用其persist算子。 如何选择存储级别? Spark的存储级别主要可于在内存使用和CPU占用之间做一些权衡。建议根据以下步骤来选择一个合适的存储级别: 如果RDD能使用默认存储级别(MEMORY_ONLY),那就尽量使用默认级别。这是CPU效率最高的方式,所有RDD算子都能以最快的速度运行。 如果步骤1的答案是否(不适用默认级别),那么可以尝试MEMORY_ONLY_SER级别,并选择一个高效的序列化协议(selecting a fast serialization library),这回大大节省数据对象的存储空间,同时速度也还不错。 尽量不要把数据吐到磁盘上,除非:1.你的数据集重新计算的代价很大;2.你的数据集是从一个很大的数据源中过滤得到的结果。否则的话,重算一个分区的速度很可能和从磁盘上读取差不多。 如果需要支持容错,可以考虑使用带副本的存储级别(例如:用Spark来服务web请求)。所有的存储级别都能够以重算丢失数据的方式来提供容错性,但是带副本的存储级别可以让你的应用持续的运行,而不必等待重算丢失的分区。 在一些需要大量内存或者并行多个应用的场景下,实验性的OFF_HEAP会有以下几个优势: 这个级别下,可以允许多个执行器共享同一个Tachyon中内存池。 可以有效地减少垃圾回收的开销。 即使单个执行器挂了,缓存数据也不会丢失。 删除数据 Spark能够自动监控各个节点上缓存使用率,并且以LRU(最近经常使用)的方式将老数据逐出内存。如果你更喜欢手动控制的话,可以用RDD.unpersist() 方法来删除无用的缓存。 共享变量 一般而言,当我们给Spark算子(如 map 或 reduce)传递一个函数时,这些函数将会在远程的集群节点上运行,并且这些函数所引用的变量都是各个节点上的独立副本。这些变量都会以副本的形式复制到各个机器节点上,如果更新这些变量副本的话,这些更新并不会传回到驱动器(driver)程序。通常来说,支持跨任务的可读写共享变量是比较低效的。不过,Spark还是提供了两种比较通用的共享变量:广播变量和累加器。 广播变量 广播变量提供了一种只读的共享变量,它是把在每个机器节点上保存一个缓存,而不是每个任务保存一份副本。通常可以用来在每个节点上保存一个较大的输入数据集,这要比常规的变量副本更高效(一般的变量是每个任务一个副本,一个节点上可能有多个任务)。Spark还会尝试使用高效的广播算法来分发广播变量,以减少通信开销。 Spark的操作有时会有多个阶段(stage),不同阶段之间的分割线就是混洗操作。Spark会自动广播各个阶段用到的公共数据。这些方式广播的数据都是序列化过的,并且在运行各个任务前需要反序列化。这也意味着,显示地创建广播变量,只有在跨多个阶段(stage)的任务需要同样的数据 或者 缓存数据的序列化和反序列化格式很重要的情况下才是必须的。 广播变量可以通过一个变量v来创建,只需调用 SparkContext.broadcast(v)即可。这个广播变量是对变量v的一个包装,要访问其值,可以调用广播变量的 value 方法。代码示例如下: Scala Java Python scala> val broadcastVar = sc.broadcast(Array(1, 2, 3)) broadcastVar: org.apache.spark.broadcast.Broadcast[Array[Int]] = Broadcast(0) scala> broadcastVar.value res0: Array[Int] = Array(1, 2, 3) 广播变量创建之后,集群中任何函数都不应该再使用原始变量v,这样才能保证v不会被多次复制到同一个节点上。另外,对象v在广播后不应该再被更新,这样才能保证所有节点上拿到同样的值(例如,更新后,广播变量又被同步到另一新节点,新节点有可能得到的值和其他节点不一样)。 累加器 累加器是一种只支持满足结合律的“累加”操作的变量,因此它可以很高效地支持并行计算。利用累加器可以实现计数(类似MapReduce中的计数器)或者求和。Spark原生支持了数字类型的累加器,开发者也可以自定义新的累加器。如果创建累加器的时候给了一个名字,那么这个名字会展示在Spark UI上,这对于了解程序运行处于哪个阶段非常有帮助(注意:Python尚不支持该功能)。 创捷累加器时需要赋一个初始值v,调用 SparkContext.accumulator(v) 可以创建一个累加器。后续集群中运行的任务可以使用 add 方法 或者 += 操作符 (仅Scala和Python支持)来进行累加操作。不过,任务本身并不能读取累加器的值,只有驱动器程序可以用 value 方法访问累加器的值。 以下代码展示了如何使用累加器对一个元素数组求和: Scala Java Python scala> val accum = sc.accumulator(0, "My Accumulator") accum: spark.Accumulator[Int] = 0 scala> sc.parallelize(Array(1, 2, 3, 4)).foreach(x => accum += x) ... 10/09/29 18:41:08 INFO SparkContext: Tasks finished in 0.317106 s scala> accum.value res2: Int = 10 以上代码使用了Spark内建支持的Int型累加器,开发者也可以通过子类化AccumulatorParam来自定义累加器。累加器接口(AccumulatorParam)主要有两个方法:1. zero:这个方法为累加器提供一个“零值”,2.addInPlace 将收到的两个参数值进行累加。例如,假设我们需要为Vector提供一个累加机制,那么可能的实现方式如下: object VectorAccumulatorParam extends AccumulatorParam[Vector] { def zero(initialValue: Vector): Vector = { Vector.zeros(initialValue.size) } def addInPlace(v1: Vector, v2: Vector): Vector = { v1 += v2 } } // Then, create an Accumulator of this type: val vecAccum = sc.accumulator(new Vector(...))(VectorAccumulatorParam) 如果使用Scala,Spark还支持几种更通用的接口:1.Accumulable,这个接口可以支持所累加的数据类型与结果类型不同(如:构建一个收集元素的list);2.SparkContext.accumulableCollection 方法可以支持常用的Scala集合类型。 对于在action算子中更新的累加器,Spark保证每个任务对累加器的更新只会被应用一次,例如,某些任务如果重启过,则不会再次更新累加器。而如果在transformation算子中更新累加器,那么用户需要注意,一旦某个任务因为失败被重新执行,那么其对累加器的更新可能会实施多次。 累加器并不会改变Spark懒惰求值的运算模型。如果在RDD算子中更新累加器,那么其值只会在RDD做action算子计算的时候被更新一次。因此,在transformation算子(如:map)中更新累加器,其值并不能保证一定被更新。以下代码片段说明了这一特性: Scala Java Python val accum = sc.accumulator(0) data.map { x => accum += x; f(x) } // 这里,accum任然是0,因为没有action算子,所以map也不会进行实际的计算 部署到集群 应用提交指南(application submission guide)中描述了如何向集群提交应用。换句话说,就是你需要把你的应用打包成 JAR文件(Java/Scala)或者一系列 .py 或 .zip 文件(Python),然后再用 bin/spark-submit 脚本将其提交给Spark所支持的集群管理器。 从Java/Scala中启动Spark作业 org.apache.spark.launcher包提供了简明的Java API,可以将Spark作业作为子进程启动。 单元测试 Spark对所有常见的单元测试框架提供友好的支持。你只需要在测试中创建一个SparkContext对象,然后吧master URL设为local,运行测试操作,最后调用 SparkContext.stop() 来停止测试。注意,一定要在 finally 代码块或者单元测试框架的 tearDown方法里调用SparkContext.stop(),因为Spark不支持同一程序中有多个SparkContext对象同时运行。 从1.0之前版本迁移过来 Scala Java Python Spark 1.0 冻结了Spark Core 1.x 系列的核心API,只要是没有标记为 “experimental” 或者 “developer API”的API,在未来的版本中会一直支持。对于Scala用户来说,唯一的变化就是分组相关的算子,如:groupByKey, cogroup, join,这些算子的返回类型由 (Key, Seq[Value]) 变为 (Key, Iterable[Value])。 更详细迁移向导请参考这里:Spark Streaming,MLlib以及GraphX. 下一步 你可以去Spark的官网上看看示例程序(example Spark programs)。另外,Spark代码目录下也自带了不少例子,见 examples 目录(Scala,Java,Python,R)。你可以把示例中的类名传给 bin/run-example 脚本来运行这些例子;例如: ./bin/run-example SparkPi 如果需要运行Python示例,则需要使用 spark-submit 脚本: ./bin/spark-submit examples/src/main/python/pi.py 对R语言,同样也需要使用 spark-submit: ./bin/spark-submit examples/src/main/r/dataframe.R 配置(configuration)和调优(tuning)指南提供了不少最佳实践的信息,可以帮助你优化程序,特别是这些信息可以帮助你确保数据以一种高效的格式保存在内存里。集群模式概览(cluster mode overview)这篇文章描述了分布式操作中相关的组件,以及Spark所支持的各种集群管理器。 最后,完整的API文件见:Scala,Java,Python以及R. 转载自并发编程网 - ifeve.com

优秀的个人博客,低调大师

《Spark 官方文档》Spark调优

Spark调优 由于大部分Spark计算都是在内存中完成的,所以Spark程序的瓶颈可能由集群中任意一种资源导致,如:CPU、网络带宽、或者内存等。最常见的情况是,数据能装进内存,而瓶颈是网络带宽;当然,有时候我们也需要做一些优化调整来减少内存占用,例如将RDD以序列化格式保存(storing RDDs in serialized form)。本文将主要涵盖两个主题:1.数据序列化(这对于优化网络性能极为重要);2.减少内存占用以及内存调优。同时,我们也会提及其他几个比较小的主题。 数据序列化 序列化在任何一种分布式应用性能优化时都扮演几位重要的角色。如果序列化格式序列化过程缓慢,或者需要占用字节很多,都会大大拖慢整体的计算效率。通常,序列化都是Spark应用优化时首先需要关注的地方。Spark着眼于要达到便利性(允许你在计算过程中使用任何Java类型)和性能的一个平衡。Spark主要提供了两个序列化库: Java serialization: 默认情况,Spark使用Java自带的ObjectOutputStream 框架来序列化对象,这样任何实现了java.io.Serializable接口的对象,都能被序列化。同时,你还可以通过扩展java.io.Externalizable来控制序列化性能。Java序列化很灵活但性能较差,同时序列化后占用的字节数也较多。 Kryo serialization: Spark还可以使用Kryo 库(版本2)提供更高效的序列化格式。Kryo的序列化速度和字节占用都比Java序列化好很多(通常是10倍左右),但Kryo不支持所有实现了Serializable接口的类型,它需要你在程序中 register 需要序列化的类型,以得到最佳性能。 要切换到使用 Kryo,你可以在SparkConf初始化的时候调用 conf.set(“spark.serializer”, “org.apache.spark.serializer.KryoSerializer”)。这个设置不仅控制各个worker节点之间的混洗数据序列化格式,同时还控制RDD存到磁盘上的序列化格式。目前,Kryo不是默认的序列化格式,因为它需要你在使用前注册需要序列化的类型,不过我们还是建议在对网络敏感的应用场景下使用Kryo。 Spark对一些常用的Scala核心类型(包括在Twitter chill库的AllScalaRegistrar中)自动使用Kryo序列化格式。 如果你的自定义类型需要使用Kryo序列化,可以用 registerKryoClasses 方法先注册: val conf = new SparkConf().setMaster(...).setAppName(...) conf.registerKryoClasses(Array(classOf[MyClass1], classOf[MyClass2])) val sc = new SparkContext(conf) Kryo的文档(Kryo documentation)中有详细描述了更多的高级选项,如:自定义序列化代码等。 如果你的对象很大,你可能需要增大 spark.kryoserializer.buffer 配置项(config)。其值至少需要大于最大对象的序列化长度。 最后,如果你不注册需要序列化的自定义类型,Kryo也能工作,不过每一个对象实例的序列化结果都会包含一份完整的类名,这有点浪费空间。 内存调优 内存占用调优主要需要考虑3点:1.数据占用的总内存(你多半会希望整个数据集都能装进内存吧);2.访问数据集中每个对象的开销;3.垃圾回收的开销(如果你的数据集中对象周转速度很快的话)。 一般,Java对象的访问时很快的,但同时Java对象会比原始数据(仅包含各个字段值)占用的空间多2~5倍。主要原因有: 每个Java对象都有一个对象头(object header),对象头大约占用16字节,其中包含像其对应class的指针这样的信息。对于一些包含较少数据的对象(比如只包含一个Int字段),这个对象头可能比对象数据本身还大。 Java字符串(String)有大约40子节点额外开销(Java String以Char数据的形式保存原始数据,所以需要一些额外的字段,如数组长度等),并且每个字符都以两字节的UTF-16编码在内部保存。因此,10个字符的String很容易就占了60字节。 一些常见的集合类,如 HashMap、LinkedList,使用的是链表类数据结构,因此它们对每项数据都有一个包装器。这些包装器对象不仅其自身就有“对象头”,同时还有指向下一个包装器对象的链表指针(通常为8字节)。 原始类型的集合通常也是以“装箱”的形式包装成对象(如:java.lang.Integer)。 本节只是Spark内存管理的一个概要,下面我们会更详细地讨论各种Spark内存调优的具体策略。特别地,我们会讨论如何评估数据的内存使用量,以及如何改进 – 要么改变你的数据结构,要么以某种序列化格式存储数据。最后,我们还会讨论如何调整Spark的缓存大小,以及如何调优Java的垃圾回收器。 内存管理概览 Spark中内存主要用于两类目的:执行计算和数据存储。执行计算的内存主要用于混洗(Shuffle)、关联(join)、排序(sort)以及聚合(aggregation),而数据存储的内存主要用于缓存和集群内部数据传播。Spark中执行计算和数据存储都是共享同一个内存区域(M)。如果执行计算没有占用内存,那么数据存储可以申请占用所有可用的内存,反之亦然。执行计算可能会抢占数据存储使用的内存,并将存储于内存的数据逐出内存,直到数据存储占用的内存比例降低到一个指定的比例(R)。换句话说,R是M基础上的一个子区域,这个区域的内存数据永远不会被逐出内存。然而,数据存储不会抢占执行计算的内存(否则实现太复杂了)。 这样设计主要有这么几个需要考虑的点。首先,不需要缓存数据的应用可以把整个空间用来执行计算,从而避免频繁地把数据吐到磁盘上。其次,需要缓存数据的应用能够有一个数据存储比例(R)的最低保证,也避免这部分缓存数据被全部逐出内存。最后,这个实现方式能够在默认情况下,为大多数使用场景提供合理的性能,而不需要专家级用户来设置内存使用如何划分。 虽然有两个内存划分相关的配置参数,但一般来说,用户不需要设置,因为默认值已经能够适用于绝大部分的使用场景: spark.memory.fraction 表示上面M的大小,其值为相对于JVM堆内存的比例(默认0.75)。剩余的25%是为其他用户数据结构、Spark内部元数据以及避免OOM错误的安全预留空间(大量稀疏数据和异常大的数据记录)。 spark.memory.storageFraction 表示上面R的大小,其值为相对于M的一个比例(默认0.5)。R是M中专门用于缓存数据块,且这部分数据块永远不会因执行计算任务而逐出内存。 评估内存消耗 确定一个数据集占用内存总量最好的办法就是,创建一个RDD,并缓存到内存中,然后再到web UI上”Storage”页面查看。页面上会展示这个RDD总共占用了多少内存。 要评估一个特定对象的内存占用量,可以用 SizeEstimator.estimate 方法。这个方法对试验哪种数据结构能够裁剪内存占用量比较有用,同时,也可以帮助用户了解广播变量在每个执行器堆上占用的内存量。 数据结构调优 减少内存消耗的首要方法就是避免过多的Java封装(减少对象头和额外辅助字段),比如基于指针的数据结构和包装对象等。以下有几条建议: 设计数据结构的时候,优先使用对象数组和原生类型,减少对复杂集合类型(如:HashMap)的使用。fastutil提供了一些很方便的原声类型集合,同时兼容Java标准库。 尽可能避免嵌套大量的小对象和指针。 对应键值应尽量使用数值型或枚举型,而不是字符串型。 如果内存小于32GB,可以设置JVM标志参数 -XX:+UseCompressdOops 将指针设为4字节而不是8字节。你可以在spark-env.sh中设置这个参数。 序列化RDD存储 如果经过上面的调整后,存储的数据对象还是太大,那么你可以试试将这些对象以序列化格式存储,所需要做的只是通过RDD persistence API设置好存储级别,如:MEMORY_ONLY_SER。Spark会将RDD的每个分区以一个巨大的字节数组形式存储起来。以序列化格式存储的唯一缺点就是访问数据会变慢一点,因为Spark需要反序列化每个被访问的对象。如果你需要序列化缓存数据,我们强烈建议你使用Kryo(using Kryo),和Java序列化相比,Kryo能大大减少序列化对象占用的空间(当然也比原始Java对象小很多)。 垃圾回收调优 JVM的垃圾回收在某些情况下可能会造成瓶颈,比如,你的RDD存储经常需要“换入换出”(新RDD抢占了老RDD内存,不过如果你的程序没有这种情况的话那JVM垃圾回收一般不是问题,比如,你的RDD只是载入一次,后续只是在这一个RDD上做操作)。当Java需要把老对象逐出内存的时候,JVM需要跟踪所有的Java对象,并找出那些对象已经没有用了。概括起来就是,垃圾回收的开销和对象个数成正比,所以减少对象的个数(比如用 Int数组取代 LinkedList),就能大大减少垃圾回收的开销。当然,一个更好的方法就如前面所说的,以序列化形式存储数据,这时每个RDD分区都只包含有一个对象了(一个巨大的字节数组)。在尝试其他技术方案前,首先可以试试用序列化RDD的方式(serialized caching)评估一下GC是不是一个瓶颈。 如果你的作业中各个任务需要的工作内存和节点上存储的RDD缓存占用的内存产生冲突,那么GC很可能会出现问题。下面我们将讨论一下如何控制好RDD缓存使用的内存空间,以减少这种冲突。 衡量GC的影响 GC调优的第一步是统计一下,垃圾回收启动的频率以及GC所使用的总时间。给JVM设置一下这几个参数(参考Spark配置指南 –configuration guide,查看Spark作业中的Java选项参数):-verbose:gc -XX:+PrintGCDetails,就可以在后续Spark作业的worker日志中看到每次GC花费的时间。注意,这些日志是在集群worker节点上(在各节点的工作目录下stdout文件中),而不是你的驱动器所在节点。 高级GC调优 为了进一步调优GC,我们就需要对JVM内存管理有一个基本的了解: Java堆内存可分配的空间有两个区域:新生代(Younggeneration)和老生代(Oldgeneration)。新生代用以保存生存周期短的对象,而老生代则是保存生存周期长的对象。 新生代区域被进一步划分为三个子区域:Eden,Survivor1,Survivor2。 简要描述一下垃圾回收的过程:如果Eden区满了,则启动一轮minor GC回收Eden中的对象,生存下来(没有被回收掉)的Eden中的对象和Survivor1区中的对象一并复制到Survivor2中。两个Survivor区域是互相切换使用的(就是说,下次从Eden和Survivor2中复制到Survivor1中)。如果某个对象的年龄(每次GC所有生存下来的对象长一岁)超过某个阈值,或者Survivor2(下次是Survivor1)区域满了,则将对象移到老生代(Old区)。最终如果老生代也满了,就会启动full GC。 Spark GC调优的目标就是确保老生代(Oldgeneration)只保存长生命周期RDD,而同时新生代(Younggeneration)的空间又能足够保存短生命周期的对象。这样就能在任务执行期间,避免启动full GC。以下是GC调优的主要步骤: 从GC的统计日志中观察GC是否启动太多。如果某个任务结束前,多次启动了full GC,则意味着用以执行该任务的内存不够。 如果GC统计信息中显示,老生代内存空间已经接近存满,可以通过降低 spark.memory.storageFraction 来减少RDD缓存占用的内存;减少缓存对象总比任务执行缓慢要强! 如果major GC比较少,但minor GC很多的话,可以多分配一些Eden内存。你可以把Eden的大小设为高于各个任务执行所需的工作内存。如果要把Eden大小设为E,则可以这样设置新生代区域大小:-Xmn=4/3*E。(放大4/3倍,主要是为了给Survivor区域保留空间) 举例来说,如果你的任务会从HDFS上读取数据,那么单个任务的内存需求可以用其所读取的HDFS数据块的大小来评估。需要特别注意的是,解压后的HDFS块是解压前的2~3倍大。所以如果我们希望保留3~4个任务并行的工作内存,并且HDFS块大小为64MB,那么可以评估Eden的大小应该设为 4*3*64MB。 最后,再观察一下垃圾回收的启动频率和总耗时有没有什么变化。 我们的很多经验表明,GC调优的效果和你的程序代码以及可用的总内存相关。网上还有不少调优的选项说明(many more tuning options),但总体来说,就是控制好full GC的启动频率,就能有效减少垃圾回收开销。 其他注意事项 并行度 一般来说集群并不会满负荷运转,除非你吧每个操作的并行度都设得足够大。Spark会自动根据对应的输入文件大小来设置“map”类算子的并行度(当然你可以通过一个SparkContext.textFile等函数的可选参数来控制并行度),而对于想 groupByKey 或reduceByKey这类 “reduce” 算子,会使用其各父RDD分区数的最大值。你可以将并行度作为构建RDD第二个参数(参考spark.PairRDDFunctions),或者设置 spark.default.parallelism 这个默认值。一般来说,评估并行度的时候,我们建议2~3个任务共享一个CPU。 Reduce任务的内存占用 如果RDD比内存要大,有时候你可能收到一个OutOfMemoryError,但其实这是因为你的任务集中的某个任务太大了,如reduce任务groupByKey。Spark的混洗(Shuffle)算子(sortByKey,groupByKey,reduceByKey,join等)会在每个任务中构建一个哈希表,以便在任务中对数据分组,这个哈希表有时会很大。最简单的修复办法就是增大并行度,以减小单个任务的输入集。Spark对于200ms以内的短任务支持非常好,因为Spark可以跨任务复用执行器JVM,任务的启动开销很小,因此把并行度增加到比集群中总CPU核数还多是没有任何问题的。 广播大变量 使用SparkContext中的广播变量相关功能(broadcast functionality)能大大减少每个任务本身序列化的大小,以及集群中启动作业的开销。如果你的Spark任务正在使用驱动器(driver)程序中定义的巨大对象(比如:静态查询表),请考虑使用广播变量替代之。Spark会在master上将各个任务的序列化后大小打印出来,所以你可以检查一下各个任务是否过大;通常来说,大于20KB的任务就值得优化一下。 数据本地性 数据本地性对Spark作业往往会有较大的影响。如果代码和其所操作的数据在统一节点上,那么计算速度肯定会更快一些。但如果二者不在一起,那必然需要挪动其中之一。一般来说,挪动序列化好的代码肯定比挪动一大堆数据要快。Spark就是基于这个一般性原则来构建数据本地性的调度。 数据本地性是指代码和其所处理的数据的距离。基于数据当前的位置,数据本地性可以划分成以下几个层次(按从近到远排序): PROCESS_LOCAL 数据和运行的代码处于同一个JVM进程内。 NODE_LOCAL 数据和代码处于同一节点。例如,数据处于HDFS上某个节点,而对应的执行器(executor)也在同一个机器节点上。这会比PROCESS_LOCAL稍微慢一些,因为数据需要跨进程传递。 NO_PREF 数据在任何地方处理都一样,没有本地性偏好。 RACK_LOCAL 数据和代码处于同一个机架上的不同机器。这时,数据和代码处于不同机器上,需要通过网络传递,但还是在同一个机架上,一般也就通过一个交换机传输即可。 ANY 数据在网络中其他未知,即数据和代码不在同一个机架上。 Spark倾向于让所有任务都具有最佳的数据本地性,但这并非总是可行的。某些情况下,可能会出现一些空闲的执行器(executor)没有待处理的数据,那么Spark可能就会牺牲一些数据本地性。有两种可能的选项:a)等待已经有任务的CPU,待其释放后立即在同一台机器上启动一个任务;b)立即在其他节点上启动新任务,并把所需要的数据复制过去。 而通常,Spark会等待一小会,看看是否有CPU会被释放出来。一旦等待超时,则立即在其他节点上启动并将所需的数据复制过去。数据本地性各个级别之间的回落超时可以单独配置,也可以在统一参数内一起设定;详细请参考configuration page中的 spark.locality 相关参数。如果你的任务执行时间比较长并且数据本地性很差,你就应该试试调大这几个参数,不过默认值一般都能适用于大多数场景了。 总结 本文是一个简短的Spark调优指南,列举了Spark应用调优一些比较重要的考虑点 – 最重要的就是,数据序列化和内存调优。对于绝大多数应用来说,用Kryo格式序列化数据能够解决大多数的性能问题。如果您有其他关于性能调优最佳实践的问题,欢迎邮件咨询(Spark mailing list)。 转载自并发编程网 - ifeve.com

优秀的个人博客,低调大师

《Spark 官方文档》监控和工具

监控和工具 监控Spark应用有很多种方式:web UI,metrics 以及外部工具。 Web界面 每个SparkContext都会启动一个web UI,其默认端口为4040,并且这个web UI能展示很多有用的Spark应用相关信息。包括: 一个stage和task的调度列表 一个关于RDD大小以及内存占用的概览 运行环境相关信息 运行中的执行器相关信息 你只需打开浏览器,输入 http://<driver-node>:4040 即可访问该web界面。如果有多个SparkContext在同时运行中,那么它们会从4040开始,按顺序依次绑定端口(4041,4042,等)。 注意,默认情况下,这些信息只有在Spark应用运行期内才可用。如果需要在Spark应用退出后仍然能在web UI上查看这些信息,则需要在应用启动前,将 spark.eventLog.enabled 设为 true。这项配置将会把Spark事件日志都记录到持久化存储中。 事后查看 Spark独立部署时,其对应的集群管理器也有其对应的web UI。如果Spark应用将其运行期事件日志保留下来了,那么独立部署集群管理器对应的web UI将会根据这些日志自动展示已经结束的Spark应用。 如果Spark是运行于Mesos或者YARN上的话,那么你需要开启Spark的history server,开启event log。开启history server需要如下指令: ./sbin/start-history-server.sh 如果使用file-system provider class(参考下面的 spark.history.provider),那么日志目录将会基于 spark.history.fs.logDirectory 配置项,并且在表达Spark应用的事件日志路径时,应该带上子目录。history server对应的web界面默认在这里 http://<server-url>:18080。同时,history server有一些可用的配置如下: 环境变量 含义 SPARK_DAEMON_MEMORY history server分配多少内存(默认: 1g) SPARK_DAEMON_JAVA_OPTS history server的 JVM参数(默认:none) SPARK_PUBLIC_DNS history server的外部访问地址,如果不配置,那么history server有可能会绑定server的内部地址,这可能会导致外部不能访问(默认:none) SPARK_HISTORY_OPTS history server配置项(默认:none):spark.history.* 属性名称 默认值 含义 spark.history.provider org.apache.spark.deploy .history.FsHistoryProvider Spark应用历史后台实现的类名。目前可用的只有spark自带的一个实现,支持在本地文件系统中查询应用日志。 spark.history.fs.logDirectory file:/tmp/spark-events history server加载应用日志的目录 spark.history.fs.update.interval 10s history server更新信息的时间间隔。每次更新将会检查磁盘上的日志是否有更新。 spark.history.retainedApplications 50 UI上保留的spark应用历史个数。超出的将按时间排序,删除最老的。 spark.history.ui.port 18080 history server绑定的端口 spark.history.kerberos.enabled false history server是否启用kerberos验证登陆。如果history server需要访问一个需要安全保证的hadoop集群,则需要开启这个功能。该配置设为true以后,需要同时配置 spark.history.kerberos.principal 和 spark.history.kerberos.keytab spark.history.kerberos.principal (none) 登陆history server的kerberos 主体名称 spark.history.kerberos.keytab (none) history server对应的kerberos keytab文件路径 spark.history.ui.acls.enable false 指定是否启用ACL以控制用户访问验证。如果启用,那么不管某个应用是否设置了 spark.ui.acls.enabled,访问控制都将检查用户是否有权限。Spark应用的owner始终有查看的权限,而其他用户则需要通过 spark.ui.view.acls 配置其访问权限。如果禁用,则不会检查访问权限。 spark.history.fs.cleaner.enabled false 指定history server是否周期性清理磁盘上的event log spark.history.fs.cleaner.interval 1d history server清理磁盘文件的时间间隔。只会清理比 spark.history.fs.cleaner.maxAge 时间长的磁盘文件。 spark.history.fs.cleaner.maxAge 7d 如果启用了history server周期性清理,比这个时间长的Spark作业历史文件将会被清理掉 注意,所有web界面上的 table 都可以点击其表头来排序,这样可以帮助用户做一些简单分析,如:发现跑的最慢的任务、数据倾斜等。 注意history server 只展示已经结束的Spark作业。一种通知Spark作业结束的方法是,显式地关闭SparkContext(通过调用 sc.stop(),或者在 使用 SparkContext() 处理其 setup 和 tear down 事件(适用于python),然后作业历史就会出现在web UI上了。 REST API 度量信息除了可以在UI上查看之外,还可以以JSON格式访问。这能使开发人员很容易构建新的Spark可视化和监控工具。JSON格式的度量信息对运行中的Spark应用和history server中的历史作业均有效。其访问端点挂载在 /api/v1 路径下。例如,对于history server,一般你可以通过 http://<server-url>:18080/api/v1 来访问,而对于运行中的应用,可以通过 http://localhost:4040/api/v1 来访问。 端点 含义 /applications 所有应用的列表 /applications/[app-id]/jobs 给定应用的全部作业列表 /applications/[app-id]/jobs/[job-id] 给定作业的细节 /applications/[app-id]/stages 给定应用的stage列表 /applications/[app-id]/stages/[stage-id] 给定stage的所有attempt列表 /applications/[app-id]/stages/[stage-id]/[stage-attempt-id] 给定attempt的详细信息 /applications/[app-id]/stages/[stage-id]/[stage-attempt-id]/taskSummary 指定attempt对应的所有task的概要度量信息 /applications/[app-id]/stages/[stage-id]/[stage-attempt-id]/taskList 指定的attempt的所有task的列表 /applications/[app-id]/executors 给定应用的所有执行器 /applications/[app-id]/storage/rdd 给定应用的已保存的RDD列表 /applications/[app-id]/storage/rdd/[rdd-id] 给定的RDD的存储详细信息 /applications/[app-id]/logs 将给定应用的所有attempt对应的event log以zip格式打包下载 /applications/[app-id]/[attempt-id]/logs 将给定attempt的所有attempt对应的event log以zip格式打包下载 如果在YARN上运行,每个应用都由多个attempts,所以 [app-id] 实际上是 [app-id]/[attempt-id]。 这些API端点都有版本号,所以基于这些API开发程序就比较容易。Spark将保证: 端点一旦添加进来,就不会删除 某个端点支持的字段永不删除 未来可能会增加新的端点 已有端点可能会增加新的字段 未来可能会增加新的API版本,但会使用不同的端点(如:api/v2 )。但新版本不保证向后兼容。 API版本可能会整个丢弃掉,但在丢弃前,一定会和新版本API共存至少一个小版本。 注意,在UI上检查运行中的应用时,虽然每次只能查看一个应用, 但applicatoins/[app-id] 这部分路径仍然是必须的。例如,你需要查看运行中应用的作业列表时,你需要输入 http://localhost:4040/api/v1/applications/[app-id]/jobs。虽然麻烦点,但这能保证两种模式下访问路径的一致性。 度量 Spark的度量子系统是可配置的,其功能是基于Coda Hale Metrics Library开发的。这套度量子系统允许用户以多种形式的汇报槽(sink)汇报Spark度量信息,包括:HTTP、JMX和CSV文件等。其对应的配置文件路径为:${SPARK_HOME}/conf/metrics.properties。当然,你可以通过spark.metrics.conf 这个Spark属性来自定义配置文件路径(详见configuration property)。Spark的各个组件都有其对应的度量实例,且这些度量实例之间是解耦的。这些度量实例中,你都可以配置一系列不同的汇报槽来汇报度量信息。以下是目前支持的度量实例: master: 对应Spark独立部署时的master进程。 applications: master进程中的一个组件,专门汇报各个Spark应用的度量信息。 worker: 对应Spark独立部署时的worker进程。 executor: 对应Spark执行器。 driver: 对应Spark驱动器进程(即创建SparkContext对象的进程)。 每个度量实例可以汇报给0~n个槽。以下是目前 org.apache.spark.metrics.sink 包中包含的几种汇报槽(sink): ConsoleSink:将度量信息打印到控制台。 CSVSink: 以特定的间隔,将度量信息输出到CSV文件。 JmxSink: 将度量信息注册到JMX控制台。 MetricsServlet: 在已有的Spark UI中增加一个servlet,对外提供JSON格式的度量数据。 GraphiteSink: 将度量数据发到Graphite节点。 Slf4jSink: 将度量数据发送给slf4j 打成日志。 Spark同样也支持Ganglia,但因为license限制的原因没有包含在默认的发布包中: GangliaSink: 将度量信息发送给一个Ganglia节点或者多播组。 如果需要支持GangliaSink的话,你需要自定义Spark构建包。注意,如果你包含了GangliaSink代码包的话,就必须同时将LGPL-licensed 协议包含进你的Spark包中。对于sbt用户,只需要在编译打包前设置好环境变量:SPARK_GANGLIA_LGPL即可。对于maven用户,启用 -Pspark-ganglia-lgpl 即可。另外,除了修改集群的Spark之外,用户程序还需要链接 spark-ganglia-lgpl 工件。 度量系统配置文件语法可以参考这个配置文件示例:${SPARK_HOME}/conf/metrics.properties.template 高级工具 以下是几个可以用以分析Spark性能的外部工具: 集群整体监控工具,如:Ganglia,可以提供集群整体的使用率和资源瓶颈视图。比如,Ganglia的仪表盘可以迅速揭示出整个集群的工作负载是否达到磁盘、网络或CPU限制。 操作系统分析工具,如:dstat,iostat, 以及iotop,可以提供单个节点上细粒度的分析剖面。 JVM工具可以帮助你分析JVM虚拟机,如:jstack可以提供调用栈信息,jmap可以转储堆内存数据,jstat可以汇报时序统计信息,jconsole可以直观的探索各种JVM属性,这对于熟悉JVM内部机制非常有用。 转载自并发编程网 - ifeve.com

优秀的个人博客,低调大师

《Spark 官方文档》Spark快速入门

快速入门 本教程是对Spark的一个快速简介。首先,我们通过Spark的交互式shell介绍一下API(主要是Python或Scala),然后展示一下如何用Java、Scala、Python写一个Spark应用。更完整参考看这里:programming guide 首先,请到Spark website下载一个Spark发布版本,以便后续方便学习。我们暂时还不会用到HDFS,所以你可以使用任何版本的Hadoop。 使用Spark shell交互式分析 基础 利用Spark shell 很容易学习Spark API,同时也Spark shell也是强大的交互式数据分析工具。Spark shell既支持Scala(Scala版本的shell在Java虚拟机中运行,所以在这个shell中可以引用现有的Java库),也支持Python。在Spark目录下运行下面的命令可以启动一个Spark shell: Scala Python ./bin/spark-shell Spark最主要的抽象概念是个分布式集合,也叫作弹性分布式数据集(Resilient Distributed Dataset –RDD)。RDD可以由Hadoop InputFormats读取HDFS文件创建得来,或者从其他RDD转换得到。下面我们就先利用Spark源代码目录下的README文件来新建一个RDD: scala> val textFile = sc.textFile("README.md") textFile: spark.RDD[String] = spark.MappedRDD@2ee9b6e3 RDD有两种算子,action算子(actions)返回结果,transformation算子(transformations)返回一个新RDD。我们先来看一下action算子: scala> textFile.count() // Number of items in this RDD res0: Long = 126 scala> textFile.first() // First item in this RDD res1: String = # Apache Spark 再来看下如何使用transformation算子。我们利用filter这个transformation算子返回一个只包含原始文件子集的新RDD。 scala> val linesWithSpark = textFile.filter(line => line.contains("Spark")) linesWithSpark: spark.RDD[String] = spark.FilteredRDD@7dd4af09 把这两个例子串起来,我们可以这样写: scala> textFile.filter(line => line.contains("Spark")).count() // How many lines contain "Spark"? res3: Long = 15 更多RDD算子 RDD action 和 transformation 算子可以做更加复杂的计算。下面的代码中,我们将找出文件中包含单词数最多的行有多少个单词: Scala Python scala> textFile.map(line => line.split(" ").size).reduce((a, b) => if (a > b) a else b) res4: Long = 15 首先,用一个map算子将每一行映射为一个整数,返回一个新RDD。然后,用reduce算子找出这个RDD中最大的单词数。map和reduce算组的参数都是scala 函数体(闭包),且函数体内可以使用任意的语言特性,或引用scala/java库。例如,我们可以调用其他函数。为了好理解,下面我们用Math.max作为例子: scala> import java.lang.Math import java.lang.Math scala> textFile.map(line => line.split(" ").size).reduce((a, b) => Math.max(a, b)) res5: Int = 15 Hadoop上的MapReduce是大家耳熟能详的一种通用数据流模式。而Spark能够轻松地实现MapReduce流程: scala> val wordCounts = textFile.flatMap(line => line.split(" ")).map(word => (word, 1)).reduceByKey((a, b) => a + b) wordCounts: spark.RDD[(String, Int)] = spark.ShuffledAggregatedRDD@71f027b8 这个例子里,我使用了flatMap,map, andreduceByKey这几个transformation算子,把每个单词及其在文件中出现的次数转成一个包含(String,int)键值对的RDD,计算出每个单词在文件中出现的次数 scala> wordCounts.collect() res6: Array[(String, Int)] = Array((means,1), (under,2), (this,3), (Because,1), (Python,2), (agree,1), (cluster.,1), ...) 缓存 Spark同样支持把数据集拉到集群范围的内存缓存中。这对于需要重复访问的数据非常有用,比如:查询一些小而”热“(频繁访问)的数据集 或者 运行一些迭代算法(如 PageRank)。作为一个简单的示例,我们把 linesWithSpark 这个数据集缓存一下: Scala Python scala> linesWithSpark.cache() res7: spark.RDD[String] = spark.FilteredRDD@17e51082 scala> linesWithSpark.count() res8: Long = 19 scala> linesWithSpark.count() res9: Long = 19 用Spark来缓存一个100行左右的文件,看起来确实有点傻。但有趣的是,同样的代码可以用于缓存非常大的数据集,即使这些数据集可能分布在数十或数百个节点,也是一样。你可以用 bin/spark-shell 连到一个集群上来验证一下,更详细的请参考:programming guide. 独立的应用程序 假设我们想写一个独立的Spark应用程序。我们将快速的过一下一个简单的应用程序,分别用Scala(sbt编译),Java(maven编译)和Python。 Scala Java Python 首先用Scala新建一个简单的Spark应用 – 简单到连名字都叫SimpleApp.scala /* SimpleApp.scala */ import org.apache.spark.SparkContext import org.apache.spark.SparkContext._ import org.apache.spark.SparkConf object SimpleApp { def main(args: Array[String]) { val logFile = "YOUR_SPARK_HOME/README.md" // Should be some file on your system val conf = new SparkConf().setAppName("Simple Application") val sc = new SparkContext(conf) val logData = sc.textFile(logFile, 2).cache() val numAs = logData.filter(line => line.contains("a")).count() val numBs = logData.filter(line => line.contains("b")).count() println("Lines with a: %s, Lines with b: %s".format(numAs, numBs)) } } 注意,应用程序需要定义一个main方法,而不是继承scala.App。scala.App的子类可能不能正常工作。 这个程序,统计了Spark README文件中分别包含‘a’和’b’的行数。注意,你需要把YOUR_SPARK_HOME换成你的Spark安装目录。与之前用spark-shell不同,这个程序有一个单独的SparkContext对象,我们初始化了这个SparkContext对象并将其作为程序的一部分。 我们把一个SparkConf对象传给SparkContext的构造函数,SparkConf对象包含了我们这个应用程序的基本信息和配置。 我们的程序需要依赖Spark API,所以我们需要包含一个sbt配置文件,simple.sbt,在这个文件里,我们可以配置Spark依赖项。这个文件同时也添加了Spark本身的依赖库: name := "Simple Project" version := "1.0" scalaVersion := "2.10.5" libraryDependencies += "org.apache.spark" %% "spark-core" % "1.6.0" 为了让sbt能正常工作,我们需要一个典型的目录结构来放SimpleApp.scala和simple.sbt程序。一旦建立好目录,我们就可以创建一个jar包,然后用spark-submit脚本运行我们的代码。 # Your directory layout should look like this $ find . . ./simple.sbt ./src ./src/main ./src/main/scala ./src/main/scala/SimpleApp.scala # Package a jar containing your application $ sbt package ... [info] Packaging {..}/{..}/target/scala-2.10/simple-project_2.10-1.0.jar # Use spark-submit to run your application $ YOUR_SPARK_HOME/bin/spark-submit \ --class "SimpleApp" \ --master local[4] \ target/scala-2.10/simple-project_2.10-1.0.jar ... Lines with a: 46, Lines with b: 23 下一步 恭喜你!你的首个Spark应用已经跑起来了! 进一步的API参考,请看这里:Spark programming guide,或者在其他页面上点击“Programming Guides”菜单 如果想了解集群上运行应用程序,请前往:deployment overview 最后,Spark examples子目录下包含了多个示例,你可以这样来运行这些例子: # For Scala and Java, use run-example: ./bin/run-example SparkPi # For Python examples, use spark-submit directly: ./bin/spark-submit examples/src/main/python/pi.py # For R examples, use spark-submit directly: ./bin/spark-submit examples/src/main/r/dataframe.R 转载自并发编程网 - ifeve.com

优秀的个人博客,低调大师

《Spark 官方文档》Spark配置(二)

内存管理 属性名 默认值 含义 spark.memory.fraction 0.75 堆内存中用于执行、混洗和存储(缓存)的比例。这个值越低,则执行中溢出到磁盘越频繁,同时缓存被逐出内存也更频繁。这个配置的目的,是为了留出用户自定义数据结构、内部元数据使用的内存。推荐使用默认值。请参考this description. spark.memory.storageFraction 0.5 不会被逐出内存的总量,表示一个相对于 spark.memory.fraction的比例。这个越高,那么执行混洗等操作用的内存就越少,从而溢出磁盘就越频繁。推荐使用默认值。更详细请参考this description. spark.memory.offHeap.enabled true 如果true,Spark会尝试使用堆外内存。启用 后,spark.memory.offHeap.size必须为正数。 spark.memory.offHeap.size 0 堆外内存分配的大小(绝对值)。这个设置不会影响堆内存的使用,所以你的执行器总内存必须适应JVM的堆内存大小。必须要设为正数。并且前提是 spark.memory.offHeap.enabled=true. spark.memory.useLegacyMode false 是否使用老式的内存管理模式(1.5以及之前)。老模式在堆内存管理上更死板,使用固定划分的区域做不同功能,潜在的会导致过多的数据溢出到磁盘(如果不小心调整性能)。必须启用本参数,以下选项才可用:spark.shuffle.memoryFractionspark.storage.memoryFractionspark.storage.unrollFraction spark.shuffle.memoryFraction 0.2 (废弃)必须先启用spark.memory.useLegacyMode这个才有用。 混洗阶段用于聚合和协同分组的JVM堆内存比例。在任何指定的时间,所有用于混洗的内存总和不会超过这个上限,超出的部分会溢出到磁盘上。如果溢出台频繁,考虑增加spark.storage.memoryFraction的大小。 spark.storage.memoryFraction 0.6 (废弃)必须先启用spark.memory.useLegacyMode这个才有用。 Spark用于缓存数据的对内存比例。这个值不应该比JVM 老生代(old generation)对象所占用的内存大,默认是60%的堆内存,当然你可以增加这个值,同时配置你所用的老生代对象占用内存大小。 spark.storage.unrollFraction 0.2 (废弃)必须先启用spark.memory.useLegacyMode这个才有用。 Spark块展开的内存占用比例。如果没有足够的内存来完整展开新的块,那么老的块将被抛弃。 执行行为 属性名 默认值 含义 spark.broadcast.blockSize 4m TorrentBroadcastFactory每个分片大小。太大会减少广播时候的并发数(更慢了);如果太小,BlockManager可能会给出性能提示。 spark.broadcast.factory org.apache.spark.broadcast. TorrentBroadcastFactory 广播算法的实现。 spark.cleaner.ttl (infinite) Spark记住任意元数据的保留时间(秒)。周期性的清理能保证比这个更老的元数据将被遗忘(删除)。这对于长期运行的Spark作业非常有用(如,一些7*24运行)。注意,RDD持久化到内存中后,过了这么长时间以后,也会被清理掉(这。。。是不是有点坑!)。 spark.executor.cores YARN模式下默认1;如果是独立部署,则是worker节点上所有可用的core。 单个执行器可用的core数。仅针对YARN和独立部署模式。独立部署时,单个worker节点上会运行多个执行器(executor),只要worker上有足够的core。否则,每个应用在单个worker上只会启动一个执行器。 spark.default.parallelism 对于reduceByKey和join这样的分布式混洗(shuffle)算子,等于父RDD中最大的分区。对于parallelize这样没有父RDD的算子,则取决于集群管理器: Local mode: number of cores on the local machine — 本地模式:机器的core数 Mesos fine grained mode: 8 — Mesos细粒度模式:8 Others: total number of cores on all executor nodes or 2, whichever is larger — 其他:所有执行器节点上core的数量 或者 2,这两数取较大的 如果用户没有在参数里指定,这个属性是默认的RDD transformation算子分区数,如:join,reduceByKey,parallelize等。 spark.executor.heartbeatInterval 10s 执行器心跳间隔(报告心跳给驱动器)。心跳机制使驱动器了解哪些执行器还活着,并且可以从心跳数据中获得执行器的度量数据。 spark.files.fetchTimeout 60s 获取文件的通讯超时,所获取的文件是通过在驱动器上调用SparkContext.addFile()添加的。 spark.files.useFetchCache true 如果设为true(默认),则同一个spark应用的不同执行器之间,会使用一二共享缓存来拉取文件,这样可以提升同一主机上运行多个执行器时候,任务启动的性能。如果设为false,这个优化就被禁用,各个执行器将使用自己独有的缓存,他们拉取的文件也是各自有一份拷贝。如果在NFS文件系统上使用本地文件系统,可以禁用掉这个优化(参考SPARK-6313) spark.files.overwrite false SparkContext.addFile()添加的文件已经存在,且内容不匹配的情况下,是否覆盖。 spark.hadoop.cloneConf false 如设为true,对每个任务复制一份Hadoop Configuration对象。启用这个可以绕过Configuration线程安全问题(SPARK-2546)。默认这个是禁用的,很多job并不会受这个issue的影响。 spark.hadoop.validateOutputSpecs true 如设为true,在saveAsHadoopFile及其变体的时候,将会验证输出(例如,检查输出目录是否存在)。对于已经验证过或确认存在输出目录的情况,可以禁用这个。我们建议不要禁用,除非你确定需要和之前的spark版本兼容。可以简单的利用Hadoop 文件系统API手动删掉已存在的输出目录。这个设置会被Spark Streaming StreamingContext生成的job忽略,因为Streaming需要在回复检查点的时候,覆盖已有的输出目录。 spark.storage.memoryMapThreshold 2m spark从磁盘上读取一个块后,映射到内存块的最小大小。这阻止了spark映射过小的内存块。通常,内存映射块是有开销的,应该比接近或小于操作系统的页大小。 spark.externalBlockStore.blockManager org.apache.spark.storage.TachyonBlockManager 用于存储RDD的外部块管理器(文件系统)的实现。 文件系统URL由spark.externalBlockStore.url决定。 spark.externalBlockStore.baseDir System.getProperty(“java.io.tmpdir”) 外部块存储存放RDD的目录。文件系统URL由spark.externalBlockStore.url决定。也可以是逗号分隔的目录列表(Tachyon文件系统) spark.externalBlockStore.url tachyon://localhost:19998 for Tachyon 所使用的外部块存储文件系统URL。 网络 属性名 默认值 含义 spark.akka.frameSize 128 “control plane” 通讯中所允许的最大消息大小(MB)。通常,只应用于map输出数据的大小信息,这些信息会在执行器和驱动器之间传递。如果你的job包含几千个map和reduce任务,你可能需要增大这个设置。 spark.akka.heartbeat.interval 1000s 设这么大的值,是为了禁用Akka传输失败检测器。也可以重新启用,如果你想用这个特性(但不建议)。设成较大的值可以减少网络开销,而较小的值(1秒左右)可能会对Akka的失败检测更有用。如有需要,可以调整这个值和spark.akka.heartbeat.pauses的组合。一种可能需要使用失败检测的情形是:用一个敏感的失败检测,可以快速识别并逐出不稳定的执行器。然而,在真实的spark集群中,这通常不是GC暂停或网络延迟造成的。除此之外,启用这个还会导致过多的心跳数据交换,从而造成网络洪峰。 spark.akka.heartbeat.pauses 6000s 设这么大的值,是为了禁用Akka传输失败检测器。也可以重新启用,如果你想用这个特性(但不建议)。这个是可接受的Akka心跳暂停时间。这个可以用来控制对GC暂停敏感程度。如有需要,可以调整这个值和spark.akka.heartbeat.interval的组合。 spark.akka.threads 4 用于通讯的actor线程数。如果驱动器机器上有很多CPU core,你可以适当增大这个值。 spark.akka.timeout 100s Spark节点之间通讯超时。 spark.blockManager.port (random) 块管理器(block manager)监听端口。在驱动器和执行器上都有。 spark.broadcast.port (random) 驱动器HTTP广播server监听端口。这和torrent广播没有关系。 spark.driver.host (local hostname) 驱动器主机名。用于和执行器以及独立部署时集群master通讯。 spark.driver.port (random) 驱动器端口。用于和执行器以及独立部署时集群master通讯。 spark.executor.port (random) 执行器端口。用于和驱动器通讯。 spark.fileserver.port (random) 驱动器HTTP文件server监听端口。 spark.network.timeout 120s 所有网络交互的默认超时。这个配置是以下属性的默认值:spark.core.connection.ack.wait.timeout,spark.akka.timeout,spark.storage.blockManagerSlaveTimeoutMs,spark.shuffle.io.connectionTimeout,spark.rpc.askTimeoutorspark.rpc.lookupTimeout spark.port.maxRetries 16 绑定一个端口的最大重试次数。如果指定了一个端口(非0),每个后续重试会在之前尝试的端口基础上加1,然后再重试绑定。本质上,这确定了一个绑定端口的范围,就是 [start port, start port + maxRetries] spark.replClassServer.port (random) 驱动器HTTP class server的监听端口。只和spark shell相关。 spark.rpc.numRetries 3 RPC任务最大重试次数。RPC任务最多重试这么多次。 spark.rpc.retry.wait 3s RPC请求操作重试前等待时间。 spark.rpc.askTimeout 120s RPC请求操作超时等待时间。 spark.rpc.lookupTimeout 120s RPC远程端点查询超时。 调度 属性名 默认值 含义 spark.cores.max (not set) 如果运行在独立部署集群模式(standalone deploy cluster)或者Mesos集群粗粒度共享模式(Mesos cluster in “coarse-grained” sharing mode),这个值决定了spark应用可以使用的最大CPU总数(应用在整个集群中可用CPU总数,而不是单个机器)。如果不设置,那么独立部署时默认为spark.deploy.defaultCores,Mesos集群则默认无限制(即所有可用的CPU)。 spark.locality.wait 3s 为了数据本地性最长等待时间(spark会根据数据所在位置,尽量让任务也启动于相同的节点,然而可能因为该节点上资源不足等原因,无法满足这个任务分配,spark最多等待这么多时间,然后放弃数据本地性)。数据本地性有多个级别,每一级别都是等待这么多时间(同一进程、同一节点、同一机架、任意)。你也可以为每个级别定义不同的等待时间,需要设置spark.locality.wait.node等。如果你发现任务数据本地性不佳,可以增加这个值,但通常默认值是ok的。 spark.locality.wait.node spark.locality.wait 单独定义同一节点数据本地性任务等待时间。你可以设为0,表示忽略节点本地性,直接跳到下一级别,即机架本地性(如果你的集群有机架信息)。 spark.locality.wait.process spark.locality.wait 单独定义同一进程数据本地性任务等待时间。这个参数影响试图访问特定执行器上的缓存数据的任务。 spark.locality.wait.rack spark.locality.wait 单独定义同一机架数据本地性等待时间。 spark.scheduler.maxRegisteredResourcesWaitingTime 30s 调度开始前,向集群管理器注册使用资源的最大等待时间。 spark.scheduler.minRegisteredResourcesRatio 0.8 for YARN mode; 0.0 for standalone mode and Mesos coarse-grained mode 调度启动前,需要注册得到资源的最小比例(注册到的资源数 / 需要资源总数)(YARN模式下,资源是执行器;独立部署和Mesos粗粒度模式下时资源是CPU core【spark.cores.max是期望得到的资源总数】)。可以设为0.0~1.0的一个浮点数。不管job是否得到了最小资源比例,最大等待时间都是由spark.scheduler.maxRegisteredResourcesWaitingTime控制的。 spark.scheduler.mode FIFO 提交到同一个SparkContext上job的调度模式(scheduling mode)。另一个可接受的值是FAIR,而FIFO只是简单的把job按先来后到排队。对于多用户服务很有用。 spark.scheduler.revive.interval 1s 调度器复活worker的间隔时间。 spark.speculation false 如果设为true,将会启动推测执行任务。这意味着,如果stage中有任务执行较慢,他们会被重新调度到别的节点上执行。 spark.speculation.interval 100ms Spark检查慢任务的时间间隔。 spark.speculation.multiplier 1.5 比任务平均执行时间慢多少倍的任务会被认为是慢任务。 spark.speculation.quantile 0.75 对于一个stage来说,完成多少百分比才开始检查慢任务,并启动推测执行任务。 spark.task.cpus 1 每个任务分配的CPU core。 spark.task.maxFailures 4 单个任务最大失败次数。应该>=1。最大重试次数 =spark.task.maxFailures – 1 动态分配 属性名 默认值 含义 spark.dynamicAllocation.enabled false 是否启用动态资源分配特性,启用后,执行器的个数会根据工作负载动态的调整(增加或减少)。注意,目前在YARN模式下不用。更详细信息,请参考:here该特性依赖于 spark.shuffle.service.enabled 的启用。同时还和以下配置相关:spark.dynamicAllocation.minExecutors, spark.dynamicAllocation.maxExecutors以及spark.dynamicAllocation.initialExecutors spark.dynamicAllocation .executorIdleTimeout 60s 动态分配特性启用后,空闲时间超过该配置时间的执行器都会被移除。更详细请参考这里:description spark.dynamicAllocation.cachedExecutorIdleTimeout infinity 动态分配特性启用后,包含缓存数据的执行器如果空闲时间超过该配置设置的时间,则被移除。更详细请参考:description spark.dynamicAllocation.initialExecutors spark .dynamicAllocation .minExecutors 动态分配开启后,执行器的初始个数 spark.dynamicAllocation.maxExecutors infinity 动态分配开启后,执行器个数的上限 spark.dynamicAllocation.minExecutors 0 动态分配开启后,执行器个数的下限 spark.dynamicAllocation.schedulerBacklogTimeout 1s 动态分配启用后,如果有任务积压的持续时间长于该配置设置的时间,则申请新的执行器。更详细请参考:description spark.dynamicAllocation.sustainedSchedulerBacklogTimeout schedulerBacklogTimeout 和spark.dynamicAllocation.schedulerBacklogTimeout类似,只不过该配置对应于随后持续的执行器申请。更详细请参考:description 安全 属性名 默认值 含义 spark.acls.enable false 是否启用Spark acls(访问控制列表)。如果启用,那么将会检查用户是否有权限查看或修改某个作业(job)。注意,检查的前提是需要知道用户是谁,所以如果用户是null,则不会做任何检查。你可以在Spark UI上设置过滤器(Filters)来做用户认证,并设置用户名。 spark.admin.acls Empty 逗号分隔的用户列表,在该列表中的用户/管理员将能够访问和修改所有的Spark作业(job)。如果你的集群是共享的,并且有集群管理员,还有需要调试的开发人员,那么这个配置会很有用。如果想让所有人都有管理员权限,只需把该配置设置为”*” spark.authenticate false 设置Spark是否认证集群内部连接。如果不是在YARN上运行,请参考 spark.authenticate.secret spark.authenticate.secret None 设置Spark用于内部组件认证的秘钥。如果不是在YARN上运行,且启用了 spark.authenticate,那么该配置必须设置 spark.authenticate.enableSaslEncryption false 是否对Spark内部组件认证使用加密通信。该配置目前只有 block transfer service 使用。 spark.network.sasl.serverAlwaysEncrypt false 是否对支持SASL认证的service禁用非加密通信。该配置目前只有 external shuffle service 支持。 spark.core.connection.ack.wait.timeout 60s 网络连接等待应答信号的超时时间。为了避免由于GC等导致的意外超时,你可以设置一个较大的值。 spark.core.connection.auth.wait.timeout 30s 网络连接等待认证的超时时间。 spark.modify.acls Empty 逗号分隔的用户列表,在改列表中的用户可以修改Spark作业。默认情况下,只有启动该Spark作业的用户可以修改之(比如杀死该作业)。如果想要任何用户都可以修改作业,请将该配置设置为”*” spark.ui.filters None 逗号分隔的过滤器class列表,这些过滤器将用于Spark web UI。这里的过滤器应该是一个标准的javax servlet Filter。每个过滤器的参数可以通过java系统属性来设置,如下: spark.<class name of filer>.params=’param1=value1,param2=value2’例如: -Dspark.ui.filters=com.test.filter1 -Dspark.com.test.filter1.params=’param1=foo,param2=testing’ spark.ui.view.acls Empty 逗号分隔的用户列表,在该列表中的用户可以查看Spark web UI。默认,只有启动该Spark作业的用户可以查看之。如果需要让所有用户都能查看,只需将该配置设为”*” 加密 属性名 默认值 含义 spark.ssl.enabled false 是否启用SSL连接(在所有所支持的协议上)。所有SSL相关配置(spark.ssl.xxx,其中xxx是一个特定的配置属性),都是全局的。如果需要在某些协议上覆盖全局设置,那么需要在该协议命名空间上进行单独配置。使用 spark.ssl.YYY.XXX 来为协议YYY覆盖全局配置XXX。目前YYY的可选值有 akka(用于基于AKKA框架的网络连接) 和 fs(用于应广播和文件服务器) spark.ssl.enabledAlgorithms Empty 逗号分隔的加密算法列表。这些加密算法必须是JVM所支持的。这里有个可用加密算法参考列表:this spark.ssl.keyPassword None 在key-store中私匙对应的密码。 spark.ssl.keyStore None key-store文件路径。可以是绝对路径,或者以本组件启动的工作目录为基础的相对路径。 spark.ssl.keyStorePassword None key-store的密码。 spark.ssl.protocol None 协议名称。该协议必须是JVM所支持的。这里有JVM支持的协议参考列表:this spark.ssl.trustStore None trust-store文件路径。可以是绝对路径,或者以本组件启动的工作目录为基础的相对路径。 spark.ssl.trustStorePassword None trust-store的密码 Spark Streaming [流式] 属性名 默认值 含义 spark.streaming.backpressure.enabled false 是否启用Spark Streaming 的内部反压机制(spark 1.5以上支持)。启用后,Spark Streaming会根据当前批次的调度延迟和处理时长来控制接收速率,这样一来,系统的接收速度会和处理速度相匹配。该特性会在内部动态地设置接收速率。该速率的上限将由 spark.streaming.receiver.maxRate 和 spark.streaming.kafka.maxRatePerPartition 决定(如果它们设置了的话)。 spark.streaming.blockInterval 200ms 在将数据保存到Spark之前,Spark Streaming接收器组装数据块的时间间隔。建议不少于50ms。关于Spark Streaming编程指南细节,请参考performance tuning这一节。 spark.streaming.receiver.maxRate not set 接收速度的最大速率(每秒记录条数)。实际上,每个流每秒将消费这么多条记录。设置为0或者负数表示不限制速率。更多细节请参考:deployment guide spark.streaming.receiver.writeAheadLog.enable false 是否启用接收器预写日志。所有的输入数据都会保存到预写日志中,这样在驱动器失败后,可以基于预写日志来恢复数据。更详细请参考:deployment guide spark.streaming.unpersist true 是否强制Spark Streaming自动从内存中清理掉所生成并持久化的RDD。同时,Spark Streaming收到的原始数据也将会被自动清理掉。如果设置为false,那么原始数据以及持久化的RDD将不会被自动清理,以便外部程序可以访问这些数据。当然,这将导致Spark消耗更多的内存。 spark.streaming.stopGracefullyOnShutdown false 如果设为true,Spark将会在JVM关闭时,优雅地关停StreamingContext,而不是立即关闭之。 spark.streaming.kafka.maxRatePerPartition not set 在使用Kafka direct stream API时,从每个Kafka数据分区读取数据的最大速率(每秒记录条数)。更详细请参考:Kafka Integration guide spark.streaming.kafka.maxRetries 1 驱动器连续重试的最大次数,这个配置是为了让驱动器找出每个Kafka分区上的最大offset(默认值为1,意味着驱动器将最多尝试2次)。只对新的Kafka direct stream API有效。 spark.streaming.ui.retainedBatches 1000 Spark Streaming UI 以及 status API 中保留的最大批次个数。 SparkR 属性名 默认值 含义 spark.r.numRBackendThreads 2 SparkR RBackEnd处理RPC调用的后台线程数 spark.r.command Rscript 集群模式下,驱动器和worker上执行的R脚本可执行文件 spark.r.driver.command spark.r.command client模式的驱动器执行的R脚本。集群模式下会忽略 集群管理器 每个集群管理器都有一些额外的配置选项。详细请参考这里: YARN Mesos Standalone Mode 环境变量 有些Spark设置需要通过环境变量来设定,这些环境变量可以在${SPARK_HOME}/conf/spark-env.sh脚本(Windows下是conf/spark-env.cmd)中设置。如果是独立部署或者Mesos模式,这个文件可以指定机器相关信息(如hostname)。运行本地Spark应用或者submit脚本时,也会引用这个文件。 注意,conf/spark-env.sh默认是不存在的。你需要复制conf/spark-env.sh.template这个模板来创建,还有注意给这个文件附上可执行权限。 以下变量可以在spark-env.sh中设置: 环境变量 含义 JAVA_HOME Java安装目录(如果没有在PATH变量中指定) PYSPARK_PYTHON 驱动器和worker上使用的Python二进制可执行文件(默认是python) PYSPARK_DRIVER_PYTHON 仅在驱动上使用的Python二进制可执行文件(默认同PYSPARK_PYTHON) SPARKR_DRIVER_R SparkR shell使用的R二进制可执行文件(默认是R) SPARK_LOCAL_IP 本地绑定的IP SPARK_PUBLIC_DNS Spark程序公布给其他机器的hostname 另外,还有一些选项需要在Sparkstandalone cluster scripts里设置,如:每台机器上使用的core数量,和最大内存占用量。 spark-env.sh是一个shell脚本,因此一些参数可以通过编程方式来设定 – 例如,你可以获取本机IP来设置SPARK_LOCAL_IP。 日志配置 Spark使用log4j打日志。你可以在conf目录下用log4j.properties来配置。复制该目录下已有的log4j.properties.template并改名为log4j.properties即可。 覆盖配置目录 默认Spark配置目录是”${SPARK_HOME}/conf”,你也可以通过 ${SPARK_CONF_DIR}指定其他目录。Spark会从这个目录下读取配置文件(spark-defaults.conf,spark-env.sh,log4j.properties等) 继承Hadoop集群配置 如果你打算用Spark从HDFS读取数据,那么有2个Hadoop配置文件必须放到Spark的classpath下: hdfs-site.xml,配置HDFS客户端的默认行为 core-site.xml,默认文件系统名 这些配置文件的路径在不同发布版本中不太一样(如CDH和HDP版本),但通常都能在 ${HADOOP_HOME}/etc/hadoop/conf目录下找到。一些工具,如Cloudera Manager,可以动态修改配置,而且提供了下载一份拷贝的机制。 要想让这些配置对Spark可见,请在${SPARK_HOME}/spark-env.sh中设置HADOOP_CONF_DIR变量。 转载自并发编程网 - ifeve.com

优秀的个人博客,低调大师

《Spark 官方文档》Spark配置(一)

Spark配置 Spark有以下三种方式修改配置: Spark properties(Spark属性)可以控制绝大多数应用程序参数,而且既可以通过SparkConf对象来设置,也可以通过Java系统属性来设置。 Environment variables(环境变量)可以指定一些各个机器相关的设置,如IP地址,其设置方法是写在每台机器上的conf/spark-env.sh中。 Logging(日志)可以通过log4j.properties配置日志。 Spark属性 Spark属性可以控制大多数的应用程序设置,并且每个应用的设定都是分开的。这些属性可以用SparkConf对象直接设定。SparkConf为一些常用的属性定制了专用方法(如,master URL和application name),其他属性都可以用键值对做参数,调用set()方法来设置。例如,我们可以初始化一个包含2个本地线程的Spark应用,代码如下: 注意,local[2]代表2个本地线程 – 这是最小的并发方式,可以帮助我们发现一些只有在分布式上下文才能复现的bug。 val conf = new SparkConf() .setMaster("local[2]") .setAppName("CountingSheep") val sc = new SparkContext(conf) 注意,本地模式下,我们可以使用n个线程(n >= 1)。而且在像Spark Streaming这样的场景下,我们可能需要多个线程来防止类似线程饿死这样的问题。 配置时间段的属性应该写明时间单位,如下格式都是可接受的: 25ms (milliseconds) 5s (seconds) 10m or 10min (minutes) 3h (hours) 5d (days) 1y (years) 配置大小的属性也应该写明单位,如下格式都是可接受的: 1b (bytes) 1k or 1kb (kibibytes = 1024 bytes) 1m or 1mb (mebibytes = 1024 kibibytes) 1g or 1gb (gibibytes = 1024 mebibytes) 1t or 1tb (tebibytes = 1024 gibibytes) 1p or 1pb (pebibytes = 1024 tebibytes) 动态加载Spark属性 在某些场景下,你可能需要避免将属性值写死在 SparkConf 中。例如,你可能希望在同一个应用上使用不同的master或不同的内存总量。Spark允许你简单地创建一个空的SparkConf对象: val sc = new SparkContext(new SparkConf()) 然后在运行时设置这些属性: ./bin/spark-submit --name "My app" --master local[4] --conf spark.eventLog.enabled=false --conf "spark.executor.extraJavaOptions=-XX:+PrintGCDetails -XX:+PrintGCTimeStamps" myApp.jar Spark shell和spark-submit工具支持两种动态加载配置的方法。第一种,通过命令行选项,如:上面提到的–master(设置master URL)。spark-submit可以在启动Spark应用时,通过–conf标志接受任何属性配置,同时有一些特殊配置参数同样可用(如,–master)。运行./bin/spark-submit –help可以展示这些选项的完整列表。 同时,bin/spark-submit 也支持从conf/spark-defaults.conf 中读取配置选项,在该文件中每行是一个键值对,并用空格分隔,如下: spark.master spark://5.6.7.8:7077 spark.executor.memory 4g spark.eventLog.enabled true spark.serializer org.apache.spark.serializer.KryoSerializer 这些通过参数或者属性配置文件传递的属性,最终都会在SparkConf 中合并。其优先级是:首先是SparkConf代码中写的属性值,其次是spark-submit或spark-shell的标志参数,最后是spark-defaults.conf文件中的属性。 有一些配置项被重命名过,这种情形下,老的名字仍然是可以接受的,只是优先级比新名字优先级低。 查看Spark属性 每个SparkContext都有其对应的Spark UI,所以Spark应用程序都能通过Spark UI查看其属性。默认你可以在这里看到:http://<driver>:4040,页面上的”Environment“ tab页可以查看Spark属性。如果你真的想确认一下属性设置是否正确的话,这个功能就非常有用了。注意,只有显式地通过SparkConf对象、在命令行参数、或者spark-defaults.conf设置的参数才会出现在页面上。其他属性,你可以认为都是默认值。 可用的属性 绝大多数属性都有合理的默认值。这里是部分常用的选项: 应用属性 属性名称 默认值 含义 spark.app.name (none) Spark应用的名字。会在SparkUI和日志中出现。 spark.driver.cores 1 在cluster模式下,用几个core运行驱动器(driver)进程。 spark.driver.maxResultSize 1g Spark action算子返回的结果最大多大。至少要1M,可以设为0表示无限制。如果结果超过这一大小,Spark作业(job)会直接中断退出。但是,设得过高有可能导致驱动器OOM(out-of-memory)(取决于spark.driver.memory设置,以及驱动器JVM的内存限制)。设一个合理的值,以避免驱动器OOM。 spark.driver.memory 1g 驱动器进程可以用的内存总量(如:1g,2g)。 注意,在客户端模式下,这个配置不能在SparkConf中直接设置(因为驱动器JVM都启动完了呀!)。驱动器客户端模式下,必须要在命令行里用 –driver-memory 或者在默认属性配置文件里设置。 spark.executor.memory 1g 单个执行器(executor)使用的内存总量(如,2g,8g) spark.extraListeners (none) 逗号分隔的SparkListener子类的类名列表;初始化SparkContext时,这些类的实例会被创建出来,并且注册到Spark的监听总线上。如果这些类有一个接受SparkConf作为唯一参数的构造函数,那么这个构造函数会被优先调用;否则,就调用无参数的默认构造函数。如果没有构造函数,SparkContext创建的时候会抛异常。 spark.local.dir /tmp Spark的”草稿“目录,包括map输出的临时文件,或者RDD存在磁盘上的数据。这个目录最好在本地文件系统中,这样读写速度快。这个配置可以接受一个逗号分隔的列表,通常用这种方式将文件IO分散不同的磁盘上去。 注意:Spark-1.0及以后版本中,这个属性会被集群管理器所提供的环境变量覆盖:SPARK_LOCAL_DIRS(独立部署或Mesos)或者 LOCAL_DIRS(YARN)。 spark.logConf false SparkContext启动时是否把生效的 SparkConf 属性以INFO日志打印到日志里 spark.master (none) 集群管理器URL。参考allowed master URL’s. 除了这些以外,以下还有很多可用的参数配置,在某些特定情形下,可能会用到: 运行时环境 属性名 默认值 含义 spark.driver.extraClassPath (none) 额外的classpath,将插入到到驱动器的classpath开头。 注意:驱动器如果运行客户端模式下,这个配置不能通过SparkConf 在程序里配置,因为这时候程序已经启动呀!而是应该用命令行参数(–driver-class-path)或者在 conf/spark-defaults.conf 配置。 spark.driver.extraJavaOptions (none) 驱动器额外的JVM选项。如:GC设置或其他日志参数。 注意:驱动器如果运行客户端模式下,这个配置不能通过SparkConf在程序里配置,因为这时候程序已经启动呀!而是应该用命令行参数(–driver-java-options)或者conf/spark-defaults.conf 配置。 spark.driver.extraLibraryPath (none) 启动驱动器JVM时候指定的依赖库路径。 注意:驱动器如果运行客户端模式下,这个配置不能通过SparkConf在程序里配置,因为这时候程序已经启动呀!而是应该用命令行参数(–driver-library-path)或者conf/spark-defaults.conf 配置。 spark.driver.userClassPathFirst false (试验性的:即未来不一定会支持该配置) 驱动器是否首选使用用户指定的jars,而不是spark自身的。这个特性可以用来处理用户依赖和spark本身依赖项之间的冲突。目前还是试验性的,并且只能用在集群模式下。 spark.executor.extraClassPath (none) 添加到执行器(executor)classpath开头的classpath。主要为了向后兼容老的spark版本,不推荐使用。 spark.executor.extraJavaOptions (none) 传给执行器的额外JVM参数。如:GC设置或其他日志设置等。注意,不能用这个来设置Spark属性或者堆内存大小。Spark属性应该用SparkConf对象,或者spark-defaults.conf文件(会在spark-submit脚本中使用)来配置。执行器堆内存大小应该用 spark.executor.memory配置。 spark.executor.extraLibraryPath (none) 启动执行器JVM时使用的额外依赖库路径。 spark.executor.logs.rolling.maxRetainedFiles (none) Sets the number of latest rolling log files that are going to be retained by the system. Older log files will be deleted. Disabled by default.设置日志文件最大保留个数。老日志文件将被干掉。默认禁用的。 spark.executor.logs.rolling.maxSize (none) 设置执行器日志文件大小上限。默认禁用的。 需要自动删日志请参考 spark.executor.logs.rolling.maxRetainedFiles. spark.executor.logs.rolling.strategy (none) 执行器日志滚动策略。默认禁用。 可接受的值有”time”(基于时间滚动) 或者 “size”(基于文件大小滚动)。 time:将使用 spark.executor.logs.rolling.time.interval设置滚动时间间隔 size:将使用 spark.executor.logs.rolling.size.maxBytes设置文件大小上限 spark.executor.logs.rolling.time.interval daily 设置执行器日志滚动时间间隔。日志滚动默认是禁用的。 可用的值有 “daily”, “hourly”, “minutely”,也可设为数字(则单位为秒)。 关于日志自动清理,请参考 spark.executor.logs.rolling.maxRetainedFiles spark.executor.userClassPathFirst false (试验性的)与 spark.driver.userClassPathFirst类似,只不过这个参数将应用于执行器 spark.executorEnv.[EnvironmentVariableName] (none) 向执行器进程增加名为EnvironmentVariableName的环境变量。用户可以指定多个来设置不同的环境变量。 spark.python.profile false 对Python worker启用性能分析,性能分析结果会在sc.show_profile()中,或者在驱动器退出前展示。也可以用sc.dump_profiles(path)输出到磁盘上。如果部分分析结果被手动展示过,那么驱动器退出前就不再自动展示了。默认会使用pyspark.profiler.BasicProfiler,也可以自己传一个profiler 类参数给SparkContext构造函数。 spark.python.profile.dump (none) 这个目录是用来在驱动器退出前,dump性能分析结果。性能分析结果会按RDD分别dump。同时可以使用ptats.Stats()来装载。如果制定了这个,那么分析结果就不再自动展示。 spark.python.worker.memory 512m 聚合时每个python worker使用的内存总量,和JVM的内存字符串格式相同(如,512m,2g)。如果聚合时使用的内存超过这个量,就将数据溢出到磁盘上。 spark.python.worker.reuse true 是否复用Python worker。如果是,则每个任务会启动固定数量的Python worker,并且不需要fork() python进程。如果需要广播的数据量很大,设为true能大大减少广播数据量,因为需要广播的进程数减少了。 混洗行为 属性名 默认值 含义 spark.reducer.maxSizeInFlight 48m map任务输出同时reduce任务获取的最大内存占用量。每个输出需要创建buffer来接收,对于每个reduce任务来说,有一个固定的内存开销上限,所以最好别设太大,除非你内存非常大。 spark.shuffle.compress true 是否压缩map任务的输出文件。通常来说,压缩是个好主意。使用的压缩算法取决于 spark.io.compression.codec spark.shuffle.file.buffer 32k 每个混洗输出流的内存buffer大小。这个buffer能减少混洗文件的创建和磁盘寻址。 spark.shuffle.io.maxRetries 3 (仅对netty)如果IO相关异常发生,重试次数(如果设为非0的话)。重试能是大量数据的混洗操作更加稳定,因为重试可以有效应对长GC暂停或者网络闪断。 spark.shuffle.io.numConnectionsPerPeer 1 (仅netty)主机之间的连接是复用的,这样可以减少大集群中重复建立连接的次数。然而,有些集群是机器少,磁盘多,这种集群可以考虑增加这个参数值,以便充分利用所有磁盘并发性能。 spark.shuffle.io.preferDirectBufs true (仅netty)堆外缓存可以有效减少垃圾回收和缓存复制。对于堆外内存紧张的用户来说,可以考虑禁用这个选项,以迫使netty所有内存都分配在堆上。 spark.shuffle.io.retryWait 5s (仅netty)混洗重试获取数据的间隔时间。默认最大重试延迟是15秒,设置这个参数后,将变成maxRetries* retryWait。 spark.shuffle.manager sort 混洗数据的实现方式。可用的有”sort”和”hash“。基于排序(sort)的混洗内存利用率更高,并且从1.2开始已经是默认值了。 spark.shuffle.service.enabled false 启用外部混洗服务。启用外部混洗服务后,执行器生成的混洗中间文件就由该服务保留,这样执行器就可以安全的退出了。如果 spark.dynamicAllocation.enabled启用了,那么这个参数也必须启用,这样动态分配才能有外部混洗服务可用。 更多请参考:dynamic allocation configuration and setup documentation spark.shuffle.service.port 7337 外部混洗服务对应端口 spark.shuffle.sort.bypassMergeThreshold 200 (高级)在基于排序(sort)的混洗管理器中,如果没有map端聚合的话,就会最多存在这么多个reduce分区。 spark.shuffle.spill.compress true 是否在混洗阶段压缩溢出到磁盘的数据。压缩算法取决于spark.io.compression.codec Spark UI 属性名 默认值 含义 spark.eventLog.compress false 是否压缩事件日志(当然spark.eventLog.enabled必须开启) spark.eventLog.dir file:///tmp/spark-events Spark events日志的基础目录(当然spark.eventLog.enabled必须开启)。在这个目录中,spark会给每个应用创建一个单独的子目录,然后把应用的events log打到子目录里。用户可以设置一个统一的位置(比如一个HDFS目录),这样history server就可以从这里读取历史文件。 spark.eventLog.enabled false 是否启用Spark事件日志。如果Spark应用结束后,仍需要在SparkUI上查看其状态,必须启用这个。 spark.ui.killEnabled true 允许从SparkUI上杀掉stage以及对应的作业(job) spark.ui.port 4040 SparkUI端口,展示应用程序运行状态。 spark.ui.retainedJobs 1000 SparkUI和status API最多保留多少个spark作业的数据(当然是在垃圾回收之前) spark.ui.retainedStages 1000 SparkUI和status API最多保留多少个spark步骤(stage)的数据(当然是在垃圾回收之前) spark.worker.ui.retainedExecutors 1000 SparkUI和status API最多保留多少个已结束的执行器(executor)的数据(当然是在垃圾回收之前) spark.worker.ui.retainedDrivers 1000 SparkUI和status API最多保留多少个已结束的驱动器(driver)的数据(当然是在垃圾回收之前) spark.sql.ui.retainedExecutions 1000 SparkUI和status API最多保留多少个已结束的执行计划(execution)的数据(当然是在垃圾回收之前) spark.streaming.ui.retainedBatches 1000 SparkUI和status API最多保留多少个已结束的批量(batch)的数据(当然是在垃圾回收之前) 压缩和序列化 属性名 默认值 含义 spark.broadcast.compress true 是否在广播变量前使用压缩。通常是个好主意。 spark.closure.serializer org.apache.spark.serializer. JavaSerializer 闭包所使用的序列化类。目前只支持Java序列化。 spark.io.compression.codec snappy 内部数据使用的压缩算法,如:RDD分区、广播变量、混洗输出。Spark提供了3中算法:lz4,lzf,snappy。你也可以使用全名来指定压缩算法:org.apache.spark.io.LZ4CompressionCodec,org.apache.spark.io.LZFCompressionCodec,org.apache.spark.io.SnappyCompressionCodec. spark.io.compression.lz4.blockSize 32k LZ4算法使用的块大小。当然你需要先使用LZ4压缩。减少块大小可以减少混洗时LZ4算法占用的内存量。 spark.io.compression.snappy.blockSize 32k Snappy算法使用的块大小(先得使用Snappy算法)。减少块大小可以减少混洗时Snappy算法占用的内存量。 spark.kryo.classesToRegister (none) 如果你使用Kryo序列化,最好指定这个以提高性能(tuning guide)。 本参数接受一个逗号分隔的类名列表,这些类都会注册为Kryo可序列化类型。 spark.kryo.referenceTracking true (false when using Spark SQL Thrift Server) 是否跟踪同一对象在Kryo序列化的引用。如果你的对象图中有循环护着包含统一对象的多份拷贝,那么最好启用这个。如果没有这种情况,那就禁用以提高性能。 spark.kryo.registrationRequired false Kryo序列化时,是否必须事先注册。如果设为true,那么Kryo遇到没有注册过的类型,就会抛异常。如果设为false(默认)Kryo会序列化未注册类型的对象,但会有比较明显的性能影响,所以启用这个选项,可以强制必须在序列化前,注册可序列化类型。 spark.kryo.registrator (none) 如果你使用Kryo序列化,用这个class来注册你的自定义类型。如果你需要自定义注册方式,这个参数很有用。否则,使用 spark.kryo.classesRegister更简单。要设置这个参数,需要用KryoRegistrator的子类。详见:tuning guide。 spark.kryoserializer.buffer.max 64m 最大允许的Kryo序列化buffer。必须必你所需要序列化的对象要大。如果你在Kryo中看到”buffer limit exceeded”这个异常,你就得增加这个值了。 spark.kryoserializer.buffer 64k Kryo序列化的初始buffer大小。注意,每台worker上对应每个core会有一个buffer。buffer最大增长到 spark.kryoserializer.buffer.max spark.rdd.compress false 是否压缩序列化后RDD的分区(如:StorageLevel.MEMORY_ONLY_SER)。能节省大量空间,但多消耗一些CPU。 spark.serializer org.apache.spark.serializer. JavaSerializer (org.apache.spark.serializer. KryoSerializer when using Spark SQL Thrift Server) 用于序列化对象的类,序列化后的数据将通过网络传输,或从缓存中反序列化回来。默认的Java序列化使用java的Serializable接口,但速度较慢,所以我们建议使用usingorg.apache.spark.serializer.KryoSerializerand configuring Kryo serialization如果速度需要保证的话。当然你可以自定义一个序列化器,通过继承org.apache.spark.Serializer. spark.serializer.objectStreamReset 100 如果使用org.apache.spark.serializer.JavaSerializer做序列化器,序列化器缓存这些对象,以避免输出多余数据,然而,这个会打断垃圾回收。通过调用reset来flush序列化器,从而使老对象被回收。要禁用这一周期性reset,需要把这个参数设为-1,。默认情况下,序列化器会每过100个对象,被reset一次。 转载自并发编程网 - ifeve.com

优秀的个人博客,低调大师

《Spark官方文档》提交Spark应用

提交Spark应用 spark-submit脚本在Spark的bin目录下,可以利用此脚本向集群提交Spark应用。该脚本为所有Spark所支持的集群管理器(cluster managers)提供了统一的接口,因此,你基本上可以用同样的配置和脚本,向不同类型的集群管理器提交你的应用。 打包应用程序依赖 如果你的代码依赖于其他工程,那么你需要把依赖项也打包进来,并发布给Spark集群。这需要创建一个程序集jar包(或者uber jar),包含你自己的代码,同时也包含其依赖项。sbtandMaven都有assembly插件。创建程序集jar包时,注意,要把Spark和Hadoop的jar包都可设为provided;这些jar包在Spark集群上已经存在,不需要再打包进来。完成jar包后,你就可以使用bin/spark-submit来提交你的jar包了。 对于Python,你可以使用spark-submit的–py-files参数,将你的程序以.py、.zip 或.egg文件格式提交给集群。如果你需要依赖很多Python文件,我们推荐你使用.zip或者.egg来打包。 利用spark-submit启动应用 一旦打包好一个应用程序,你就可以用bin/spark-submit来提交之。这个脚本会自动设置Spark及其依赖的classpath,同时可以支持多种不同类型的集群管理器、以及不同的部署模式: ./bin/spark-submit \ --class <main-class> --master <master-url> \ --deploy-mode <deploy-mode> \ --conf <key>=<value> \ ... # 其他选项 <application-jar> \ [application-arguments] 一些常用的选项如下: --class: 应用入口类(例如:org.apache.spark.examples.SparkPi)) --master: 集群的master URL(如:spark://23.195.26.187:7077) --deploy-mode:驱动器进程是在集群上工作节点运行(cluster),还是在集群之外客户端运行(client)(默认:client) --conf: 可以设置任意的Spark配置属性,键值对(key=value)格式。如果值中包含空白字符,可以用双引号括起来(”key=value“)。 application-jar: 应用程序jar包路径,该jar包必须包括你自己的代码及其所有的依赖项。如果是URL,那么该路径URL必须是对整个集群可见且一致的,如:hdfs://path 或者file://path(要求对所有节点都一致) application-arguments: 传给入口类main函数的启动参数,如果有的话。 一种常见的部署策略是,在一台网关机器上提交你的应用,这样距离工作节点的物理距离比较近。这种情况下,client模式会比较适合。client模式下,驱动器直接运行在spark-submit的进程中,同时驱动器对于集群来说就像是一个客户端。应用程序的输入输出也被绑定到控制台上。因此,这种模式特别适用于交互式执行(REPL),spark-shell就是这种模式。 当然,你也可以从距离工作节点很远的机器(如:你的笔记本)上提交应用,这种情况下,通常适用cluster模式,以减少网络驱动器和执行器之间的网络通信延迟。注意:对于Mesos集群管理器,Spark还不支持cluster模式。目前,只有YARN上Python应用支持cluster模式。 对于Python应用,只要把<application-jar>换成一个.py文件,再把.zip、.egg或者.py文件传给–py-files参数即可。 有一些参数是专门用于设置集群管理器的(cluster manager)。例如,在独立部署(Spark standalone cluster)时,并且使用cluster模式,你可以用–supervise参数来确保驱动器在异常退出情况下(退出并返回非0值)自动重启。spark-submit –help可查看完整的选项列表。这里有几个常见的示例: # 本地运行,占用8个core ./bin/spark-submit \ --class org.apache.spark.examples.SparkPi \ --master local[8] \ /path/to/examples.jar \ 100 # 独立部署,client模式 ./bin/spark-submit \ --class org.apache.spark.examples.SparkPi \ --master spark://207.184.161.138:7077 \ --executor-memory 20G \ --total-executor-cores 100 \ /path/to/examples.jar \ 1000 # 独立部署,cluster模式,异常退出时自动重启 ./bin/spark-submit \ --class org.apache.spark.examples.SparkPi \ --master spark://207.184.161.138:7077 \ --deploy-mode cluster --supervise --executor-memory 20G \ --total-executor-cores 100 \ /path/to/examples.jar \ 1000 # YARN上运行,cluster模式 export HADOOP_CONF_DIR=XXX ./bin/spark-submit \ --class org.apache.spark.examples.SparkPi \ --master yarn \ --deploy-mode cluster \ # 要client模式就把这个设为client --executor-memory 20G \ --num-executors 50 \ /path/to/examples.jar \ 1000 # 独立部署,运行python ./bin/spark-submit \ --master spark://207.184.161.138:7077 \ examples/src/main/python/pi.py \ 1000 # Mesos集群上运行,cluster模式,异常时自动重启 ./bin/spark-submit \ --class org.apache.spark.examples.SparkPi \ --master mesos://207.184.161.138:7077 \ --deploy-mode cluster --supervise --executor-memory 20G \ --total-executor-cores 100 \ http://path/to/examples.jar \ 1000 Master URLs 传给Spark的master URL可以是以下几种格式: Master URL 含义 local 本地运行Spark,只用1个worker线程(没有并行计算) local[K] 本地运行Spark,使用K个worker线程(理论上,最好将这个值设为你机器上CPU core的个数) local[*] 本地运行Spark,使用worker线程数同你机器上逻辑CPU core个数 spark://HOST:PORT 连接到指定的Spark独立部署的集群管理器(Spark standalone cluster)。端口是可以配置的,默认7077。 mesos://HOST:PORT 连接到指定的Mesos集群。端口号可以配置,默认5050。如果Mesos集群依赖于ZooKeeper,可以使用 mesos://zk://… 来提交,注意 –deploy-mode需要设置为cluster,同时,HOST:PORT应指向MesosClusterDispatcher. yarn 连接到指定的YARN集群,使用–deploy-mode来指定 client模式 或是 cluster 模式。YARN集群位置需要通过 $HADOOP_CONF_DIR 或者 $YARN_CONF_DIR 变量来查找。 yarn-client YARN client模式的简写,等价于 –master yarn –deploy-mode client yarn-cluster YARN cluster模式的简写,等价于 –master yarn –deploy-mode cluster 从文件加载配置 spark-submit脚本可以从一个属性文件加载默认的Spark属性配置值(Spark configuration values),并将这些属性传给你的应用程序。默认Spark会从 conf/spark-defaults.conf读取这些属性配置。更详细信息,请参考loading default configurations. 用这种方式加载默认Spark属性配置,可以在调用spark-submit脚本时省略一些参数标志。例如:如果属性文件中设置了spark.master属性,那么你就以忽略spark-submit的–master参数。通常,在代码里用SparkConf中设置的参数具有最高的优先级,其次是spark-submit中传的参数,再次才是spark-defaults.conf文件中的配置值。 如果你总是搞不清楚最终生效的配置值是从哪里来的,你可以通过spark-submit的–verbose选项来打印细粒度的调试信息。 高级依赖管理 通过spark-submit提交应用时,application jar和–jars选项中的jar包都会被自动传到集群上。Spark支持以下URL协议,并采用不同的分发策略: file:– 文件绝对路径,并且file:/URI是通过驱动器的HTTP文件服务器来下载的,每个执行器都从驱动器的HTTP server拉取这些文件。 hdfs:,http:,https:,ftp:– 设置这些参数后,Spark将会从指定的URI位置下载所需的文件和jar包。 local:– local:/ 打头的URI用于指定在每个工作节点上都能访问到的本地或共享文件。这意味着,不会占用网络IO,特别是对一些大文件或jar包,最好使用这种方式,当然,你需要把文件推送到每个工作节点上,或者通过NFS和GlusterFS共享文件。 注意,每个SparkContext对应的jar包和文件都需要拷贝到所对应执行器的工作目录下。一段时间之后,这些文件可能会占用相当多的磁盘。在YARN上,这些清理工作是自动完成的;而在Spark独立部署时,这种自动清理需要配置 spark.worker.cleanup.appDataTtl 属性。 用户还可以用–packages参数,通过给定一个逗号分隔的maven坐标,来指定其他依赖项。这个命令会自动处理依赖树。额外的maven库(或者SBT resolver)可以通过–repositories参数来指定。Spark命令(pyspark,spark-shell,spark-submit)都支持这些参数。 对于Python,也可以使用等价的–py-files选项来分发.egg、.zip以及.py文件到执行器上。 更多信息 部署完了你的应用程序后,cluster mode overview描述了分布式执行中所涉及的各个组件,以及如何监控和调试应用程序。 转载自并发编程网 - ifeve.com

优秀的个人博客,低调大师

《Spark 官方文档》Spark作业调度

Spark作业调度 概览 Spark有好几种计算资源调度的方式。首先,回忆一下集群模式概览(cluster mode overview)中每个Spark应用(包含一个SparkContext实例)中运行了一些其独占的执行器(executor)进程。集群管理器提供了Spark应用之间的资源调度(scheduling across applications)。其次,在各个Spark应用内部,各个线程可能并发地通过action算子提交多个Spark作业(job)。如果你的应用服务于网络请求,那这种情况是很常见的。在Spark应用内部(对应同一个SparkContext)各个作业之间,Spark默认FIFO调度,同时也可以支持公平调度(fair scheduler)。 Spark应用之间的资源调度 如果在集群上运行,每个Spark应用都会获得一批独占的执行器JVM,来运行其任务并存储数据。如果有多个用户共享集群,那么会有很多资源分配相关的选项,如何设置还取决于具体的集群管理器。 对Spark所支持的各个集群管理器而言,最简单的资源分配,就是对资源静态划分。这种方式就意味着,每个Spark应用都是设定一个最大可用资源总量,并且该应用在整个生命周期内都会占住这些资源。这种方式在Spark独立部署(standalone)和YARN调度,以及Mesos粗粒度模式(coarse-grained Mesos mode)下都可用。 Standalone mode:默认情况下,Spark应用在独立部署的集群中都会以FIFO(first-in-first-out)模式顺序提交运行,并且每个Spark应用都会占用集群中所有可用节点。不过你可以通过设置spark.cores.max或者spark.deploy.defaultCores 来限制单个应用所占用的节点个数。最后,除了可以控制对CPU的使用数量之外,还可以通过 spark.executor.memory 来控制各个应用的内存占用量。 Mesos:在Mesos中要使用静态划分的话,需要将 spark.mesos.coarse 设为true,同样,你也需要设置 spark.cores.max 来控制各个应用的CPU总数,以及 spark.executor.memory 来控制各个应用的内存占用。 YARN:在YARN中需要使用 –num-executors 选项来控制Spark应用在集群中分配的执行器的个数,对于单个执行器(executor)所占用的资源,可以使用 –executor-memory 和 –executor-cores 来控制。 Mesos上另一种可用的方式是动态共享CPU。在这种模式下,每个Spark应用的内存占用仍然是固定且独占的(仍由spark.executor.memory决定),但是如果该Spark应用没有在某个机器上执行任务的话,那么其他应用可以占用该机器上的CPU。这种模式对集群中有大量不是很活跃应用的场景非常有效,例如:集群中有很多不同用户的Spark shell session。但这种模式不适用于低延迟的场景,因为当Spark应用需要使用CPU的时候,可能需要等待一段时间才能取得CPU的使用权。要使用这种模式,只需要在 mesos:// URL 上设置 spark.mesos.coarse 属性为flase即可。 注意,目前还没有任何一种资源分配模式能支持跨Spark应用的内存共享。如果你需要跨Spark应用共享内存,我们建议你用单独用一个server来计算和保留同一个RDD查询的结果,这样就能在多个请求(request)之间共享同一个RDD的数据。在未来的发布版本中,一些内存存储系统(如:Tachyon)或许能够提供这种跨Spark应用共享RDD的能力。 动态资源分配 Spark还提供了一种基于负载来动态调节Spark应用资源占用的机制。这意味着,你的应用会在资源空闲的时候将其释放给集群,而后续用到的时候再重新申请。这一特性在多个应用共享Spark集群资源的情况下特别有用。 注意,这个特性默认是禁用的,但是在所有的粗粒度集群管理器上都是可用的,如:独立部署模式(standalone mode),YARN模式(YARN mode)以及Mesos粗粒度模式(Mesos coarse-grained mode)。 配置和部署 要使用这一特性有两个前提条件。首先,你的应用必须设置 spark.dynamicAllocation.enabled 为 true。其次,你必须在每个节点上启动一个外部混洗服务(external shuffle service),并在你的应用中将 spark.shuffle.service.enabled 设为true。外部混洗服务的目的就是为了在删除执行器的时候,能够保留其输出的混洗文件(本文后续有更详细的描述)。启用外部混洗的方式在各个集群管理器上各不相同: 在Spark独立部署的集群中,你只需要在worker启动前设置 spark.shuffle.server.enabled 为true即可。 在Mesos粗粒度模式下,你需要在各个节点上运行 ${SPARK_HOME}/sbin/start-mesos-shuffle-service.sh 并设置 spark.shuffle.service.enabled 为true 即可。例如,你可以用Marathon来启用这一功能。 在YARN模式下,混洗服务需要按以下步骤在各个NodeManager上启动: 首先按照YARN profile构建Spark。如果你已经有打好包的Spark,可以忽略这一步。 找到 spark-<version>-yarn-shuffle.jar。如果你是自定义编译,其位置应该在 ${SPARK_HOME}/network/yarn/target/scala-<version>,否则应该可以在 lib 目录下找到这个jar包。 将该jar包添加到NodeManager的classpath路径中。 配置各个节点上的yarn-site.xml,将 spark_shuffle 添加到 yarn.nodemanager.aux-services 中,然后将 yarn.nodemanager.aux-services.spark_shuffle.class 设为 org.apache.spark.network.yarn.YarnShuffleService,并将 spark.shuffle.service.enabled 设为 true。 最后重启各节点上的NodeManager。 所有相关的配置都是可选的,并且都在 spark.dynamicAllocation.* 和 spark.shuffle.service.* 命名空间下。更详细请参考:configurations page。 资源分配策略 总体上来说,Spark应该在执行器空闲时将其关闭,而在后续要用时再次申请。因为没有一个固定的方法,可以预测一个执行器在后续是否马上回被分配去执行任务,或者一个新分配的执行器实际上是空闲的,所以我们需要一些试探性的方法,来决定是否申请或移除一个执行器。 请求策略 一个启用了动态分配的Spark应用会在有等待任务需要调度的时候,申请额外的执行器。这种情况下,必定意味着已有的执行器已经不足以同时执行所有未完成的任务。 Spark会分轮次来申请执行器。实际的资源申请,会在任务挂起 spark.dynamicAllocation.schedulerBacklogTimeout 秒后首次触发,其后如果等待队列中仍有挂起的任务,则每过 spark.dynamicAlloction.sustainedSchedulerBacklogTimeout 秒触发一次资源申请。另外,每一轮所申请的执行器个数以指数形式增长。例如,一个Spark应用可能在首轮申请1个执行器,后续的轮次申请个数可能是2个、4个、8个… … 。 采用指数级增长策略的原因有两个:第一,对于任何一个Spark应用如果只是需要多申请少数几个执行器的话,那么必须非常谨慎地启动资源申请,这和TCP慢启动有些类似;第二,如果一旦Spark应用确实需要申请很多个执行器的话,那么可以确保其所需的计算资源及时地增长。 移除策略 移除执行器的策略就简单多了。Spark应用会在某个执行器空闲超过 spark.dynamicAllocation.executorIdleTimeout 秒后将其删除。在绝大多数情况下,执行器的移除条件和申请条件都是互斥的,也就是说,执行器在有待执行任务挂起时,不应该空闲。 优雅地关闭执行器 非动态分配模式下,执行器可能的退出原因有执行失败或者相关Spark应用已经退出。不管是那种原因,执行器的所有状态都已经不再需要,可以丢弃掉。但在动态分配的情形下,执行器有可能在Spark应用运行期间被移除。这时候,如果Spark应用尝试去访问该执行器存储的状态,就必须重算这一部分数据。因此,Spark需要一种机制,能够优雅地关闭执行器,同时还保留其状态数据。 这种需求对于混洗操作尤其重要。混洗过程中,Spark执行器首先将map输出写到本地磁盘,同时执行器本身又是一个文件服务器,这样其他执行器就能够通过该执行器获得对应的map结果数据。一旦有某些任务执行时间过长,动态分配有可能在混洗结束前移除任务异常的执行器,而这些被移除的执行器对应的数据将会被重新计算,但这些重算其实是不必要的。 要解决这一问题,就需要用到一个外部混洗服务(external shuffle service),该服务在Spark 1.2引入。该服务在每个节点上都会启动一个不依赖于任何Spark应用或执行器的独立进程。一旦该服务启用,Spark执行器不再从各个执行器上获取shuffle文件,转而从这个service获取。这意味着,任何执行器输出的混洗状态数据都可能存留时间比对应的执行器进程还长。 除了混洗文件之外,执行器也会在磁盘或者内存中缓存数。一旦执行器被移除,其缓存数据将无法访问。这个问题目前还没有解决。或许在未来的版本中,可能会采用外部混洗服务类似的方法,将缓存数据保存在堆外存储中以解决这一问题。 Spark应用内部的资源调度 在指定的Spark应用内部(对应同一SparkContext实例),多个线程可能并发地提交Spark作业(job)。在本节中,作业(job)是指,由Spark action算子(如:collect)触发的一系列计算任务的集合。Spark调度器是完全线程安全的,并且能够支持Spark应用同时处理多个请求(比如:来自不同用户的查询)。 默认,Spark应用内部使用FIFO调度策略。每个作业被划分为多个阶段(stage)(例如:map阶段和reduce阶段),第一个作业在其启动后会优先获取所有的可用资源,然后是第二个作业再申请,再第三个……。如果前面的作业没有把集群资源占满,则后续的作业可以立即启动运行,否则,后提交的作业会有明显的延迟等待。 不过从Spark 0.8开始,Spark也能支持各个作业间的公平(Fair)调度。公平调度时,Spark以轮询的方式给每个作业分配资源,因此所有的作业获得的资源大体上是平均分配。这意味着,即使有大作业在运行,小的作业再提交也能立即获得计算资源而不是等待前面的作业结束,大大减少了延迟时间。这种模式特别适合于多用户配置。 要启用公平调度器,只需设置一下 SparkContext中spark.scheduler.mode 属性为 FAIR即可: val conf = new SparkConf().setMaster(...).setAppName(...) conf.set("spark.scheduler.mode", "FAIR") val sc = new SparkContext(conf) 公平调度资源池 公平调度器还可以支持将作业分组放入资源池(pool),然后给每个资源池配置不同的选项(如:权重)。这样你就可以给一些比较重要的作业创建一个“高优先级”资源池,或者你也可以把每个用户的作业分到一组,这样一来就是各个用户平均分享集群资源,而不是各个作业平分集群资源。Spark公平调度的实现方式基本都是模仿Hadoop Fair Scheduler来实现的。 默认情况下,新提交的作业都会进入到默认资源池中,不过作业对应于哪个资源池,可以在提交作业的线程中用SparkContext.setLocalProperty 设定 spark.scheduler.pool 属性。示例代码如下: // Assuming sc is your SparkContext variable sc.setLocalProperty("spark.scheduler.pool", "pool1") 一旦设好了局部属性,所有该线程所提交的作业(即:在该线程中调用action算子,如:RDD.save/count/collect 等)都会使用这个资源池。这个设置是以线程为单位保存的,你很容易实现用同一线程来提交同一用户的所有作业到同一个资源池中。同样,如果需要清除资源池设置,只需在对应线程中调用如下代码: sc.setLocalProperty("spark.scheduler.pool", null) 资源池默认行为 默认地,各个资源池之间平分整个集群的资源(包括default资源池),但在资源池内部,默认情况下,作业是FIFO顺序执行的。举例来说,如果你为每个用户创建了一个资源池,那么久意味着各个用户之间共享整个集群的资源,但每个用户自己提交的作业是按顺序执行的,而不会出现后提交的作业抢占前面作业的资源。 配置资源池属性 资源池的属性需要通过配置文件来指定。每个资源池都支持以下3个属性: schedulingMode:可以是FIFO或FAIR,控制资源池内部的作业是如何调度的。 weight:控制资源池相对其他资源池,可以分配到资源的比例。默认所有资源池的weight都是1。如果你将某个资源池的weight设为2,那么该资源池中的资源将是其他池子的2倍。如果将weight设得很高,如1000,可以实现资源池之间的调度优先级 – 也就是说,weight=1000的资源池总能立即启动其对应的作业。 minShare:除了整体weight之外,每个资源池还能指定一个最小资源分配值(CPU个数),管理员可能会需要这个设置。公平调度器总是会尝试优先满足所有活跃(active)资源池的最小资源分配值,然后再根据各个池子的weight来分配剩下的资源。因此,minShare属性能够确保每个资源池都能至少获得一定量的集群资源。minShare的默认值是0。 资源池属性是一个XML文件,可以基于 conf/fairscheduler.xml.template 修改,然后在SparkConf的 spark.scheduler.allocation.file 属性指定文件路径: conf.set("spark.scheduler.allocation.file", "/path/to/file") 资源池XML配置文件格式如下,其中每个池子对应一个<pool>元素,每个资源池可以有其独立的配置: <?xml version="1.0"?> <allocations> <pool name="production"> <schedulingMode>FAIR</schedulingMode> <weight>1</weight> <minShare>2</minShare> </pool> <pool name="test"> <schedulingMode>FIFO</schedulingMode> <weight>2</weight> <minShare>3</minShare> </pool> </allocations> 完整的例子可以参考 conf/fairscheduler.xml.template。注意,没有在配置文件中配置的资源池都会使用默认配置(schedulingMode:FIFO,weight:1,minShare:0)。 转载自并发编程网 - ifeve.com

优秀的个人博客,低调大师

《Spark官方文档》集群模式概览

集群模式概览 本文简要描述了Spark在集群中各个组件如何运行。想了解如何在集群中启动Spark应用,请参考application submission guide。 组件 Spark应用在集群上运行时,包括了多个独立的进程,这些进程之间通过你的主程序(也叫作驱动器,即:driver)中的SparkContext对象来进行协调。 特别要指出的是,SparkContext能与多种集群管理器通信(包括:Spark独立部署时自带的集群管理器,Mesos或者YARN)。一旦连接上集群管理器,Spark会为该应用在各个集群节点上申请执行器(executor),用于执行计算任务和存储数据。接下来,Spark将应用程序代码(JAR包或者Python文件)发送给所申请到的执行器。最后SparkContext将分割出的任务(task)发送给各个执行器去运行。 这个架构中有几个值得注意的地方: 每个Spark应用程序都有其对应的多个执行器进程,执行器进程在整个应用程序生命周期内,都保持运行状态,并以多线程方式运行所收到的任务。这样的好处是,可以隔离各个Spark应用,从调度角度来看,每个驱动器可以独立调度本应用程序内部的任务,从执行器角度来看,不同的Spark应用对应的任务将会在不同的JVM中运行。然而这种架构同样也有其劣势,多个Spark应用程序之间无法共享数据,除非把数据写到外部存储中。 Spark对底层的集群管理器一无所知。只要Spark能申请到执行器进程,并且能与之通信即可。这种实现方式可以使Spark相对比较容易在一个支持多种应用的集群管理器上运行(如:Mesos或YARN) 驱动器(driver)程序在整个生命周期内必须监听并接受其对应的各个执行器的连接请求(参考:spark.driver.port and spark.fileserver.port in the network config section)。因此,驱动器程序必须能够被所有worker节点访问到。 因为集群上的任务是由驱动器来调度的,所以驱动器应该和worker节点距离近一些,最好在同一个本地局域网中。如果你需要远程对集群发起请求,最好还是在驱动器节点上启动RPC服务,来响应这些远程请求,同时把驱动器本身放在集群worker节点比较近的机器上。 集群管理器类型 Spark支持以下3中集群管理器: Standalone– Spark自带的一个简单的集群管理器,这使得启动一个Spark集群变得非常简单。 Apache Mesos– 一种可以运行Hadoop MapReduce或者服务型应用的通用集群管理器。 Hadoop YARN– Hadoop 2的集群管理器。 另外,使用Spark的EC2 launch scripts可以轻松地在Amazon EC2上启动一个独立集群。 提交Spark应用 利用spark-submit脚本,可以向Spark所支持的任意一种集群提交应用。详见:application submission guide 监控 每一个驱动器(driver)都有其对应的web UI,默认会绑定4040端口(多个并存会按顺序绑定4041、4042…),这个web UI会展示该Spark应用正在运行的任务(task)、执行器(executor)以及所使用的存储信息。只需在浏览器种打开http://<driver-node>:4040即可访问。monitoring guide详细描述了其他监控选项。 作业调度 Spark可以在应用程序之间(集群管理器这一层面)和之内(如:同一个SparkContext对象运行了多个计算作业)控制资源分配。job scheduling overview描述了更详细的信息。 概念和术语 下表简要说明了集群模式下的一些概念和术语: 术语 含义 Application(应用) Spark上运行的应用。包含了驱动器(driver)进程(一个)和集群上的执行器(executor)进程(多个) Application jar(应用jar包) 包含Spark应用程序的jar包。有时候,用户会想要把应用程序代码及其依赖打到一起,形成一个“uber jar”(包含自身以及所有依赖库的jar包),注意这时候不要把Spark或Hadoop的库打进来,这些库会在运行时加载 Driver program(驱动器) 运行main函数并创建SparkContext的进程。 Cluster manager(集群管理器) 用于在集群上申请资源的 外部服务(如:独立部署的集群管理器、Mesos或者YARN) Deploy mode(部署模式) 用于区分驱动器进程在哪里运行。在”cluster”模式下,驱动器将运行在集群上某个节点;在”client“模式下,驱动器在集群之外的客户端运行。 Worker node(工作节点) 集群上运行应用程序代码的任意一个节点。 Executor(执行器) 在集群工作节点上,为某个应用启动的工作进程;专门用于运行计算任务,并在内存或磁盘上保存数据。每个应用都独享其对应的多个执行器。 Task(任务) 下发给执行器的工作单元。 Job(作业) 一个并行计算作业,由一组任务(Task)组成,并由Spark的行动(action)算子(如:save、collect)触发启动;你会在驱动器日志中看到这个术语。 Stage(步骤) 每个作业(Job)可以划分为更小的任务(Task)集合,这就是步骤(Stage),这些步骤彼此依赖形成一个有向无环图(类似于MapReduce中的map和reduce);你会在驱动器日志中看到这个术语。 转载自并发编程网 - ifeve.com

优秀的个人博客,低调大师

ChatGPT 官方中文页面上线

据 ChatGPT 页面显示,OpenAI 现已向用户推出 ChatGPT 多语言功能 Alpha 版测试,用户可以参与该测试并选择不同语言的界面。 如下图所示,ChatGPT 会检测系统当前使用的语言,并提醒用户进行切换。 用户也可以通过设置页面选择不同的语言,目前 OpenAI 提供中文、日语、法语、意大利语、葡萄牙语、德语、俄语等不同的语言选项。 需要注意的是,当前版本仅仅是交互页面显示中文,默认语言仍然是英文。

资源下载

更多资源
优质分享App

优质分享App

近一个月的开发和优化,本站点的第一个app全新上线。该app采用极致压缩,本体才4.36MB。系统里面做了大量数据访问、缓存优化。方便用户在手机上查看文章。后续会推出HarmonyOS的适配版本。

Apache Tomcat

Apache Tomcat

Tomcat是Apache 软件基金会(Apache Software Foundation)的Jakarta 项目中的一个核心项目,由Apache、Sun 和其他一些公司及个人共同开发而成。因为Tomcat 技术先进、性能稳定,而且免费,因而深受Java 爱好者的喜爱并得到了部分软件开发商的认可,成为目前比较流行的Web 应用服务器。

Eclipse

Eclipse

Eclipse 是一个开放源代码的、基于Java的可扩展开发平台。就其本身而言,它只是一个框架和一组服务,用于通过插件组件构建开发环境。幸运的是,Eclipse 附带了一个标准的插件集,包括Java开发工具(Java Development Kit,JDK)。

Sublime Text

Sublime Text

Sublime Text具有漂亮的用户界面和强大的功能,例如代码缩略图,Python的插件,代码段等。还可自定义键绑定,菜单和工具栏。Sublime Text 的主要功能包括:拼写检查,书签,完整的 Python API , Goto 功能,即时项目切换,多选择,多窗口等等。Sublime Text 是一个跨平台的编辑器,同时支持Windows、Linux、Mac OS X等操作系统。