Spark-神奇的共享变量
广播 变 量
广播 变 量允 许 程序 员缓 存一个只 读 的 变 量在每台机器上面,而不是每个任 务 保存一份拷 贝 。例如,利用广播 变 量,我 们 能 够 以一种更有效率的方式将一个大数据量 输 入集合的副本分配给 每个 节 点。(Broadcast variables allow the programmer to keep a read-only variable cached on each machine rather than shipping a copy of it with tasks.They can be used, for example, to give every node a copy of a large input dataset in an efficient manner.)Spark也尝试 着利用有效的广播算法去分配广播 变 量,以 减 少通信的成本。一个广播变量可以通过调用 SparkContext.broadcast(v) 方法从一个初始变量v中创建。广播变量是v的一个包装 变 量,它的 值 可以通 过 value 方法 访问 ,下面的代 码说 明了 这 个 过 程:
<span style="font-size:24px;"> scala> val broadcastVar = sc.broadcast(Array(1, 2, 3)) broadcastVar: spark.Broadcast[Array[Int]] = spark.Broadcast(b5c40191-a864-4c7d-b9bf-d87e1a4e787c) scala> broadcastVar.value res0: Array[Int] = Array(1, 2, 3)</span>
广播变量创建以后,我们就能够在集群的任何函数中使用它来代替变量 v ,这样我们就不需要再次传递变量v到每个节点上。另外,为了保证所有的节点得到广播变量具有相同的值,对象v不能在广播之后被修改。
累加器
顾 名思 义 ,累加器是一种只能通 过 关 联 操作 进 行“加”操作的 变 量,因此它能 够 高效的 应 用于并行操作中。它们能够用来实现 counters 和 sums 。Spark原生支持数值类型的累加器,开发者 可以自己添加支持的类型。 如果创建了一个具名的累加器,它可以在spark的UI中显示。这对 于理解 运 行 阶 段(running stages)的 过 程有很重要的作用。(注意: 这 在python中 还 不被支 持) 一个累加器可以通 过调 用 SparkContext.accumulator(v) 方法从一个初始 变 量 v 中 创 建。 运 行在 集群上的任 务 可以通 过 add 方法或者使用 += 操作来 给 它加 值 。然而,它 们 无法 读 取 这 个 值 。只有 驱动 程序可以使用 value 方法来 读 取累加器的 值 。 如下的代 码 ,展示了如何利用累 加器将一个数 组 里面的所有元素相加:
<span style="font-size:24px;">scala> val accum = sc.accumulator(0, "My Accumulator") accum: spark.Accumulator[Int] = 0 scala> sc.parallelize(Array(1, 2, 3, 4)).foreach(x => accum += x) ... 10/09/29 18:41:08 INFO SparkContext: Tasks finished in 0.317106 s scala> accum.value res2: Int = 10</span>
这 个例子利用了内置的整数 类 型累加器。开 发 者可以利用子 类 AccumulatorParam 创 建自己的 累加器 类 型。AccumulatorParam接口有 两 个方法: zero 方法 为 你的数据 类 型提供一个“0 值 ”(zero value); addInPlace 方法 计 算 两 个 值 的和。例如,假 设 我 们 有一个 Vector 类 代表 数学上的向量,我 们 能 够 如下定 义 累加器:
<span style="font-size:24px;">object VectorAccumulatorParam extends AccumulatorParam[Vector] { def zero(initialValue: Vector): Vector = { Vector.zeros(initialValue.size) } def addInPlace(v1: Vector, v2: Vector): Vector = { v1 += v2 } } // Then, create an Accumulator of this type: val vecAccum = sc.accumulator(new Vector(...))(VectorAccumulatorParam)</span>
在scala中,Spark支持用更一般的Accumulable接口来累积数据-结果类型和用于累加的元素类 型 不一 样 (例如通 过 收集的元素建立一个列表)。Spark也支持用 SparkContext.accumulableCollection 方法累加一般的scala集合类型。
从 spark 官方网站 查 看一些 spark 运 行例子。 另 外, Spark 的 example 目 录 包含几个 Spark 例子,你能 够 通 过 如下方式 运 行 Java 或者 scala 例子:./bin/run-example SparkPi
为 了 优 化你的 项 目, configuration 和 tuning 指南提高了最佳实 践的信息保 证你保存在内存 中的数据是有效的格式是非常重要的事情。
低调大师中文资讯倾力打造互联网数据资讯、行业资源、电子商务、移动互联网、网络营销平台。
持续更新报道IT业界、互联网、市场资讯、驱动更新,是最及时权威的产业资讯及硬件资讯报道平台。
转载内容版权归作者及来源网站所有,本站原创内容转载请注明来源。
- 上一篇
Spark-rdd的持久化
Spark 最重要的一个功能是它可以通 过 各种操作( operations )持久化(或者 缓 存)一个集合到内存中。当你持久化一个 RDD 的 时 候,每一个 节 点都将参与 计 算的所有分区数据存 储 到内存中,并且 这 些 数据可以被 这 个集合(以及 这 个集合衍生的其他集合)的 动 作( action )重复利用。 这 个能力使后 续 的 动 作速度更快(通常快 10 倍以上)。 对应 迭代算法和快速的交互使用来 说 , 缓 存是一个关 键 的工具。你能通 过 persist() 或者 cache() 方法持久化一个 rdd 。首先,在 action 中 计 算得到 rdd ;然后,将其保存在每个 节 点的内存中。Spark的 缓 存是一个容 错 的技 术 -如果RDD的任何一个分区 丢 失,它 可以通 过 原有的 转换 ( transformations )操作自 动 的重复 计 算并且 创 建出 这 个分区。此外,我 们 可以利用不同的存 储级别 存 储 每一个被持久化的RDD。例如,它允 许 我 们 持久化 集合到磁盘上、将集合作为序列化的 Java 对象持久化到内存...
- 下一篇
基于超出内存可加载范围的数据集的逻辑回归分类器LR分类器
假如你想创建一个机器学习模型,但却发现你的输入数据集与你的计算机内存不相符?对于多机器的计算集群环境中通常可以使用如Hadoop和Apache Spark分布式计算工具。然而,Apache Spark能够在本地机器独立模式上,甚至在当输入数据集大于你的计算机内存时通过创建模型处理你的数据。 1.输入数据和预期结果 在上一篇文章我们讨论了“How To Find Simple And Interesting Multi-Gigabytes Data Set”,本文将使用上文中提及数据集的Posts.xml文件。文件大小是34.6千兆字节,这个xml文件包含stackoverflow.com文章数据作为xml属性: 标题 – 文章标题 主体 – 文章文本 标签 – 文章的标签列表 10+ 更多的xml -我们不需要使用的属性 关于stackoverflow.com的Posts.xml完整数据集信息请点击:https://archive.org/details/stackexchange. 另外我创建一个较小版本的这种文件,里面只有10个条目或文章。此文件包含一个小尺寸的原始数据集,这个数据...
相关文章
文章评论
共有0条评论来说两句吧...
文章二维码
点击排行
推荐阅读
最新文章
- Linux系统CentOS6、CentOS7手动修改IP地址
- SpringBoot2配置默认Tomcat设置,开启更多高级功能
- Eclipse初始化配置,告别卡顿、闪退、编译时间过长
- Red5直播服务器,属于Java语言的直播服务器
- CentOS7编译安装Cmake3.16.3,解决mysql等软件编译问题
- Windows10,CentOS7,CentOS8安装MongoDB4.0.16
- Windows10,CentOS7,CentOS8安装Nodejs环境
- SpringBoot2整合Thymeleaf,官方推荐html解决方案
- Jdk安装(Linux,MacOS,Windows),包含三大操作系统的最全安装
- SpringBoot2编写第一个Controller,响应你的http请求并返回结果