首页 文章 精选 留言 我的

精选列表

搜索[文档处理],共10015篇文章
优秀的个人博客,低调大师

Apache Storm 官方文档 —— 本地模式

本地模式是一种在本地进程中模拟 Storm 集群的工作模式,对于开发和测试拓扑很有帮助。在本地模式下运行拓扑与在集群模式下运行拓扑的方式很相似。 创建一个进程内的“集群”只需要使用LocalCluster类即可,例如: import backtype.storm.LocalCluster; LocalCluster cluster = new LocalCluster(); 随后,你就可以使用LocalCluster中的submitTopology方法来提交拓扑了。与StormSubmitter中相应的方法相似,submitTopology接收一个拓扑名称、拓扑配置以及拓扑对象作为输入参数。你也可以以拓扑名称为参数,使用killTopology方法来 kill 掉对应的拓扑。 使用以下语句关闭本地模式集群运行: cluster.shutdown(); 本地模式的常用配置 你可以在这里找到完整的配置项列表。以下是几个比较有用的配置项说明: Config.TOPOLOGY_MAX_TASK_PARALLELISM:该配置项设置了单个组件(bolt/spout)的线程数上限。生产环境下的拓扑往往含有很高的并行度(数百个线程),导致在本地模式下测试拓扑时会有较大的负载。这个配置项可以让你很容易地控制并行度。 Config.TOPOLOGY_DEBUG:此配置项设置为 true 时 Storm 会打印出 spout 或者 bolt 每一次发送消息的日志记录。这个功能对于调试拓扑很有用。 转载自并发编程网 - ifeve.com

优秀的个人博客,低调大师

《Spark 官方文档》Spark快速入门

快速入门 本教程是对Spark的一个快速简介。首先,我们通过Spark的交互式shell介绍一下API(主要是Python或Scala),然后展示一下如何用Java、Scala、Python写一个Spark应用。更完整参考看这里:programming guide 首先,请到Spark website下载一个Spark发布版本,以便后续方便学习。我们暂时还不会用到HDFS,所以你可以使用任何版本的Hadoop。 使用Spark shell交互式分析 基础 利用Spark shell 很容易学习Spark API,同时也Spark shell也是强大的交互式数据分析工具。Spark shell既支持Scala(Scala版本的shell在Java虚拟机中运行,所以在这个shell中可以引用现有的Java库),也支持Python。在Spark目录下运行下面的命令可以启动一个Spark shell: Scala Python ./bin/spark-shell Spark最主要的抽象概念是个分布式集合,也叫作弹性分布式数据集(Resilient Distributed Dataset –RDD)。RDD可以由Hadoop InputFormats读取HDFS文件创建得来,或者从其他RDD转换得到。下面我们就先利用Spark源代码目录下的README文件来新建一个RDD: scala> val textFile = sc.textFile("README.md") textFile: spark.RDD[String] = spark.MappedRDD@2ee9b6e3 RDD有两种算子,action算子(actions)返回结果,transformation算子(transformations)返回一个新RDD。我们先来看一下action算子: scala> textFile.count() // Number of items in this RDD res0: Long = 126 scala> textFile.first() // First item in this RDD res1: String = # Apache Spark 再来看下如何使用transformation算子。我们利用filter这个transformation算子返回一个只包含原始文件子集的新RDD。 scala> val linesWithSpark = textFile.filter(line => line.contains("Spark")) linesWithSpark: spark.RDD[String] = spark.FilteredRDD@7dd4af09 把这两个例子串起来,我们可以这样写: scala> textFile.filter(line => line.contains("Spark")).count() // How many lines contain "Spark"? res3: Long = 15 更多RDD算子 RDD action 和 transformation 算子可以做更加复杂的计算。下面的代码中,我们将找出文件中包含单词数最多的行有多少个单词: Scala Python scala> textFile.map(line => line.split(" ").size).reduce((a, b) => if (a > b) a else b) res4: Long = 15 首先,用一个map算子将每一行映射为一个整数,返回一个新RDD。然后,用reduce算子找出这个RDD中最大的单词数。map和reduce算组的参数都是scala 函数体(闭包),且函数体内可以使用任意的语言特性,或引用scala/java库。例如,我们可以调用其他函数。为了好理解,下面我们用Math.max作为例子: scala> import java.lang.Math import java.lang.Math scala> textFile.map(line => line.split(" ").size).reduce((a, b) => Math.max(a, b)) res5: Int = 15 Hadoop上的MapReduce是大家耳熟能详的一种通用数据流模式。而Spark能够轻松地实现MapReduce流程: scala> val wordCounts = textFile.flatMap(line => line.split(" ")).map(word => (word, 1)).reduceByKey((a, b) => a + b) wordCounts: spark.RDD[(String, Int)] = spark.ShuffledAggregatedRDD@71f027b8 这个例子里,我使用了flatMap,map, andreduceByKey这几个transformation算子,把每个单词及其在文件中出现的次数转成一个包含(String,int)键值对的RDD,计算出每个单词在文件中出现的次数 scala> wordCounts.collect() res6: Array[(String, Int)] = Array((means,1), (under,2), (this,3), (Because,1), (Python,2), (agree,1), (cluster.,1), ...) 缓存 Spark同样支持把数据集拉到集群范围的内存缓存中。这对于需要重复访问的数据非常有用,比如:查询一些小而”热“(频繁访问)的数据集 或者 运行一些迭代算法(如 PageRank)。作为一个简单的示例,我们把 linesWithSpark 这个数据集缓存一下: Scala Python scala> linesWithSpark.cache() res7: spark.RDD[String] = spark.FilteredRDD@17e51082 scala> linesWithSpark.count() res8: Long = 19 scala> linesWithSpark.count() res9: Long = 19 用Spark来缓存一个100行左右的文件,看起来确实有点傻。但有趣的是,同样的代码可以用于缓存非常大的数据集,即使这些数据集可能分布在数十或数百个节点,也是一样。你可以用 bin/spark-shell 连到一个集群上来验证一下,更详细的请参考:programming guide. 独立的应用程序 假设我们想写一个独立的Spark应用程序。我们将快速的过一下一个简单的应用程序,分别用Scala(sbt编译),Java(maven编译)和Python。 Scala Java Python 首先用Scala新建一个简单的Spark应用 – 简单到连名字都叫SimpleApp.scala /* SimpleApp.scala */ import org.apache.spark.SparkContext import org.apache.spark.SparkContext._ import org.apache.spark.SparkConf object SimpleApp { def main(args: Array[String]) { val logFile = "YOUR_SPARK_HOME/README.md" // Should be some file on your system val conf = new SparkConf().setAppName("Simple Application") val sc = new SparkContext(conf) val logData = sc.textFile(logFile, 2).cache() val numAs = logData.filter(line => line.contains("a")).count() val numBs = logData.filter(line => line.contains("b")).count() println("Lines with a: %s, Lines with b: %s".format(numAs, numBs)) } } 注意,应用程序需要定义一个main方法,而不是继承scala.App。scala.App的子类可能不能正常工作。 这个程序,统计了Spark README文件中分别包含‘a’和’b’的行数。注意,你需要把YOUR_SPARK_HOME换成你的Spark安装目录。与之前用spark-shell不同,这个程序有一个单独的SparkContext对象,我们初始化了这个SparkContext对象并将其作为程序的一部分。 我们把一个SparkConf对象传给SparkContext的构造函数,SparkConf对象包含了我们这个应用程序的基本信息和配置。 我们的程序需要依赖Spark API,所以我们需要包含一个sbt配置文件,simple.sbt,在这个文件里,我们可以配置Spark依赖项。这个文件同时也添加了Spark本身的依赖库: name := "Simple Project" version := "1.0" scalaVersion := "2.10.5" libraryDependencies += "org.apache.spark" %% "spark-core" % "1.6.0" 为了让sbt能正常工作,我们需要一个典型的目录结构来放SimpleApp.scala和simple.sbt程序。一旦建立好目录,我们就可以创建一个jar包,然后用spark-submit脚本运行我们的代码。 # Your directory layout should look like this $ find . . ./simple.sbt ./src ./src/main ./src/main/scala ./src/main/scala/SimpleApp.scala # Package a jar containing your application $ sbt package ... [info] Packaging {..}/{..}/target/scala-2.10/simple-project_2.10-1.0.jar # Use spark-submit to run your application $ YOUR_SPARK_HOME/bin/spark-submit \ --class "SimpleApp" \ --master local[4] \ target/scala-2.10/simple-project_2.10-1.0.jar ... Lines with a: 46, Lines with b: 23 下一步 恭喜你!你的首个Spark应用已经跑起来了! 进一步的API参考,请看这里:Spark programming guide,或者在其他页面上点击“Programming Guides”菜单 如果想了解集群上运行应用程序,请前往:deployment overview 最后,Spark examples子目录下包含了多个示例,你可以这样来运行这些例子: # For Scala and Java, use run-example: ./bin/run-example SparkPi # For Python examples, use spark-submit directly: ./bin/spark-submit examples/src/main/python/pi.py # For R examples, use spark-submit directly: ./bin/spark-submit examples/src/main/r/dataframe.R 转载自并发编程网 - ifeve.com

优秀的个人博客,低调大师

《Spark 官方文档》Spark独立模式

Spark独立模式 Spark除了可以在Mesos和YARN集群上运行之外,还支持一种简单的独立部署模式。独立部署模式下,你既可以手工启动(手动运行master和workers),也可以利用我们提供的启动脚本(launch scripts)。同时,独立部署模式下,你可以在单机上运行这些程序,以方便测试。 Spark集群独立安装 要独立安装Spark,你只需要将编译好的Spark包复制到集群中每一个节点上即可。你可以下载一个编译好的Spark版本,也可以在这里自己编译一个版本(build it yourself)。 手动启动集群 执行以下命令,启动独立部署的master server: ./sbin/start-master.sh 一旦启动完成,master会打印出master URL(spark://HOST:PORT),后续worker需要用这个URL来连接master,写代码的时候SparkContext中的master参数也需要设置成这个master URL。你还可以在master的web UI(默认为http://localhost:8080)上查看master URL。 类似地,你可以通过以下命令,启动一个或多个worker节点,并将其连接到master: ./sbin/start-slave.sh <master-spark-URL> 启动一个worker以后,刷新一下master的web UI(默认为http://localhost:8080),你应该可以在这里看到一个新的节点。 最后,以下配置选项将会被传给master和worker: 参数 含义 -h HOST,--host HOST 监听的主机名 -i HOST,--ip HOST 监听的主机名(已经废弃,请使用-h 或者 –host) -p PORT,--port PORT 服务监听的端口(master节点默认7077,worker节点随机) --webui-port PORT web UI端口(master节点默认8080,worker节点默认8081) -c CORES,--cores CORES 单节点上,Spark应用能够使用的CPU core数上限(默认,等于CPU core的个数);仅worker节点有效 -m MEM,--memory MEM 单节点上,Spark应用能够使用的内存上限,格式为1000M 或者 2G(默认为,机器上所有内存减去1G);仅worker节点有效 -d DIR,--work-dir DIR 工作目录,同时job的日志也输出到该目录(默认:${SPAKR_HOME}/work);仅worker节点有效 --properties-file FILE 自定义Spark属性文件的加载路径(默认:conf/spark-defaults.conf) 集群启动脚本 要使用启动脚本,来启动一个Spark独立部署集群,首先你需要在Spark目录下创建一个文件conf/slaves,并且在文件中写入你需要作为worker节点启动的每一台机器主机名(或IP),每行一台机器。如果conf/slaves文件不存在,启动脚本会默认会用单机方式启动,这种方式对测试很有帮助。注意,master节点访问各个worker时使用ssh。默认情况下,你需要配置ssh免密码登陆(使用秘钥文件)。如果你没有设置免密码登陆,那么你也可以通过环境变量SPARK_SSH_FOREGROUND来一个一个地设置每个worker的密码。 设置好conf/slaves文件以后,你就可以用一下shell脚本来启动或停止集群了,类似于Hadoop的部署,这些脚本都在${SPARK_HOME}/sbin目录下: sbin/start-master.sh– 在本机启动一个master实例 sbin/start-slaves.sh– 在conf/slaves文件所指定的每一台机器上都启动一个slave实例 sbin/start-slave.sh– 在本机启动一个slave实例 sbin/start-all.sh– 启动一个master和多个slave实例,详细见上面的描述。 sbin/stop-master.sh– 停止 start-master.sh所启动的master实例 sbin/stop-slaves.sh– 停止所有在conf/slaves中指定的slave实例 sbin/stop-all.sh– 停止master节点和所有slave节点,详细见上面的描述 注意,这些脚本都需要在你启动Spark master的机器上运行,而不是你的本地机器。 Spark独立部署集群的其他可选的环境变量见conf/spark-env.sh。你可以通过复制conf/spark-env.sh.template来创建这个文件,同时你还需要将配置好的文件复制到所有的worker节点上。以下是可用的设置: 环境变量 含义 SPARK_MASTER_IP master实例绑定的IP地址,例如,绑定到一个公网IP SPARK_MASTER_PORT mater实例绑定的端口(默认7077) SPARK_MASTER_WEBUI_PORT master web UI的端口(默认8080) SPARK_MASTER_OPTS master专用配置属性,格式如”-Dx=y” (默认空),可能的选项请参考下面的列表。 SPARK_LOCAL_DIRS Spark的本地工作目录,包括:映射输出的临时文件和RDD保存到磁盘上的临时数据。这个目录需要快速访问,最好设成本地磁盘上的目录。也可以通过使用逗号分隔列表,将其设成多个磁盘上的不同路径。 SPARK_WORKER_CORES 本机上Spark应用可以使用的CPU core上限(默认所有CPU core) SPARK_WORKER_MEMORY 本机上Spark应用可以使用的内存上限,如:1000m,2g(默认为本机所有内存减去1GB);注意每个应用单独使用的内存大小要用 spark.executor.memory 属性配置的。 SPARK_WORKER_PORT Spark worker绑定的端口(默认随机) SPARK_WORKER_WEBUI_PORT worker web UI端口(默认8081) SPARK_WORKER_INSTANCES 每个slave机器上启动的worker实例个数(默认:1)。如果你的slave机器非常强劲,可以把这个值设为大于1;相应的,你需要设置SPARK_WORKER_CORES参数来显式地限制每个worker实例使用的CPU个数,否则每个worker实例都会使用所有的CPU。 SPARK_WORKER_DIR Spark worker的工作目录,包括worker的日志以及临时存储空间(默认:${SPARK_HOME}/work) SPARK_WORKER_OPTS worker的专用配置属性,格式为:”-Dx=y”,可能的选项请参考下面的列表。 SPARK_DAEMON_MEMORY Spark master和worker后台进程所使用的内存(默认:1g) SPARK_DAEMON_JAVA_OPTS Spark master和workers后台进程所使用的JVM选项,格式为:”-Dx=y”(默认空) SPARK_PUBLIC_DNS Spark master和workers使用的公共DNS(默认空) 注意:启动脚本目前不支持Windows。如需在Windows上运行,请手工启动master和workers。 SPARK_MASTER_OPTS支持以下属性: 属性名 默认值 含义 spark.deploy.retainedApplications 200 web UI上最多展示几个已结束应用。更早的应用的数将被删除。 spark.deploy.retainedDrivers 200 web UI上最多展示几个已结束的驱动器。更早的驱动器进程数据将被删除。 spark.deploy.spreadOut true 独立部署集群的master是否应该尽可能将应用分布到更多的节点上;设为true,对数据本地性支持较好;设为false,计算会收缩到少数几台机器上,这对计算密集型任务比较有利。 spark.deploy.defaultCores (无限制) Spark独立模式下应用程序默认使用的CPU个数(没有设置spark.cores.max的情况下)。如果不设置,则为所有可用CPU个数(除非设置了spark.cores.max)。如果集群是共享的,最好将此值设小一些,以避免用户占满整个集群。 spark.worker.timeout 60 如果master没有收到worker的心跳,那么将在这么多秒之后,master将丢弃该worker。 SPARK_WORKER_OPTS支持以下属性: 属性名 默认值 含义 spark.worker.cleanup.enabled false 是否定期清理 worker 和应用的工作目录。注意,该设置仅在独立模式下有效,YARN有自己的清理方式;同时,只会清理已经结束的应用对应的目录。 spark.worker.cleanup.interval 1800 (30 minutes) worker清理本地应用工作目录的时间间隔(秒) spark.worker.cleanup.appDataTtl 7 * 24 * 3600 (7 days) 清理多久以前的应用的工作目录。这个选项值将取决于你的磁盘总量。spark应用会将日志和jar包都放在其对应的工作目录下。随着时间流逝,应用的工作目录很快会占满磁盘,尤其是在你的应用提交比较频繁的情况下。 连接到集群 要在Spark集群上运行一个应用,只需把spark://IP:PORT这个master URL传给SparkContext(参考SparkContextconstructor) 如需要运行交互式的spark shell,运行如下命令: ./bin/spark-shell --master spark://IP:PORT 你也可以通过设置选线 –total-executor-cores <numCores> 来控制spark-shell在集群上使用的CPU总数。 启动Spark应用 spark-submit脚本(spark-submitscript)是提交spark应用最简洁的方式。对于独立安装的集群来说,spark目前支持两种运行模式。客户端(client)模式下,驱动器进程(driver)将在提交应用的机器上启动。而在集群(cluster)模式下,驱动器(driver)将会在集群中的某一台worker上启动,同时提交应用的客户端在提交动作完成之后立即退出,而不会等到Spark应用运行结束。 如果你的应用时通过spark-submit提交启动的,那么应用对应的jar包会自动发布到所有的worker节点上。任何额外的依赖项jar包,都必须在–jars参数中指明,并以逗号分隔(如:–jars jar1,jar2)。 另外,独立安装的集群还支持异常退出(返回值非0)时自动重启。要启用这个特性,你需要在spark-submit时指定–supervise参数。其后,如果你需要杀掉一个重复失败的应用,你可能需要运行如下指令: ./bin/spark-class org.apache.spark.deploy.Client kill <master url> <driver ID> 你可以在master web UI(http://<master url>:8080)上查看驱动器ID。 资源调度 独立安装集群目前只支持简单的先进先出(FIFO)调度器。这个调度器可以支持多用户,你可以控制每个应用所使用的最大资源。默认情况下,Spark应用会申请集群中所有的CPU,这不太合理,除非你的进群同一时刻只运行一个应用。你可以通过SparkConf中的spark.cores.max,来设置一个CPU帽子以限制其使用的CPU总数。例如: val conf = new SparkConf() .setMaster(...) .setAppName(...) .set("spark.cores.max", "10") val sc = new SparkContext(conf) 另外,你也可以通过conf/spark-env.sh中的spark.deploy.defaultCores设置应用默认使用的CPU个数(特别针对没有设置spark.cores.max的应用)。 export SPARK_MASTER_OPTS="-Dspark.deploy.defaultCores=<value>" 在一些共享的集群上,用户很可能忘记单独设置一个最大CPU限制,那么这个参数将很有用。 监控和日志 Spark独立安装模式提供了一个基于web的集群监控用户界面。master和每个worker都有其对应的web UI,展示集群和Spark作业的统计数据。默认情况下,你可以在master机器的8080端口上访问到这个web UI。这个端口可以通过配置文件或者命令行来设置。 另外,每个作业的详细日志,将被输出到每个slave节点上的工作目录下(默认为:${SPARK_HOME}/work)。每个Spark作业下都至少有两个日志文件,stdout和stderr,这里将包含所有的输出到控制台的信息。 和Hadoop同时运行 你可以让Spark和已有的Hadoop在同一集群上同时运行,只需要将Spark作为独立的服务启动即可。这样Spark可以通过hdfs:// URL来访问Hadoop上的数据(通常情况下是,hdfs://<namenode>:9000/path,你可以在Hadoop Namenode的web UI上找到正确的链接)。当然,你也可以为Spark部署一个独立的集群,这时候Spark仍然可以通过网络访问HDFS上的数据;这会比访问本地磁盘慢一些,但如果Spark和Hadoop集群都在同一个本地局域网内的话,问题不大(例如,你可以在Hadoop集群的每个机架上新增一些部署Spark的机器)。 网络安全端口配置 Spark会大量使用网络资源,而有些环境会设置严密的防火墙设置,以严格限制网络访问。完整的端口列表,请参考这里:security page. 高可用性 默认情况下,独立调度的集群能够容忍worker节点的失败(在Spark本身来说,它能够将失败的工作移到其他worker节点上)。然而,调度器需要master做出调度决策,而这(默认行为)会造成单点失败:如果master挂了,任何应用都不能提交和调度。为了绕过这个单点问题,我们有两种高可用方案,具体如下: 基于Zookeeper的热备master 概要 利用Zookeeper来提供领导节点选举以及一些状态数据的存储,你可以在集群中启动多个master并连接到同一个Zookeeper。其中一个将被选举为“领导”,而另一个将处于备用(standby)状态。如果“领导”挂了,则另一个master会立即被选举,并从Zookeeper恢复已挂“领导”的状态,并继续调度。整个恢复流程(从“领导”挂开始计时)可能需要1到2分钟的时间。注意,整个延时只会影响新增应用 – 已经运行的应用不会受到影响。 更多关于Zookeeper信息请参考这里:here 配置 要启用这种恢复模式,你可以在spark-env中设置SPARK_DAEMON_JAVA_OPTS,可用的属性如下: 系统属性 含义 spark.deploy.recoveryMode 设为ZOOKEEPER以启用热备master恢复模式(默认空) spark.deploy.zookeeper.url Zookeeper集群URL(如:192.168.1.100:2181,192.168.1.101:2181) spark.deploy.zookeeper.dir 用于存储可恢复状态的Zookeeper目录(默认 /spark) 可能的问题:如果你有多个master,但没有正确设置好master使用Zookeeper的配置,那么这些master彼此都不可见,并且每个master都认为自己是“领导”。这件会导致整个集群处于不稳定状态(多个master都会独立地进行调度) 详细 如果你已经有一个Zookeeper集群,那么启动高可用特性是很简单的。只需要在不同节点上启动多个master,并且配置相同的Zookeeper(包括Zookeeper URL和目录)即可。masters可以随时添加和删除。 在调度新提交的Spark应用或者新增worker节点时,需要知道当前”领导“的IP地址。你只需要将以前单个的master地址替换成多个master地址即可。例如,你可以在SparkContext中设置master URL为spark://host1:port1.host2:port2。这会导致SparkContext在两个master中都进行登记 – 那么这时候,如果host1挂了,这个应用的配置同样可以在新”领导“(host2)中找到。 ”在master注册“和普通操作有一个显著的区别。在Spark应用或worker启动时,它们需要找当前的”领导“master,并在该master上注册。一旦注册成功,它们的状态将被存储到Zookeeper上。如果”老领导“挂了,”新领导“将会联系所有之前注册过的Spark应用和worker并通知它们领导权的变更,所以Spark应用和worker在启动时甚至没有必要知道”新领导“的存在。 由于这一特性,新的master可以在任何时间添加进来,你唯一需要关注的就是,新的应用或worker能够访问到这个master。总之,只要应用和worker注册成功,其他的你都不用管了。 基于本地文件系统的单点恢复 概要 利用Zookeeper当然是生成环境下高可用的最佳选择,但有时候你仍然希望在master挂了的时候能够重启之,FILESYSTEM模式能帮你实现这一需求。当应用和worker注册到master的时候,他们的状态都将被写入文件系统目录中,一旦master挂了,你只需要重启master,这些状态都能够恢复。 配置 要使用这种恢复模式,你需要在spark-env中设置SPARK_DAEMON_JAVA_OPTS,可用的属性如下: 系统属性 含义 spark.deploy.recoveryMode 设为FILESYSTEM以启用单点恢复模式(默认空) spark.deploy.recoveryDirectory 用于存储可恢复状态数据的目录,master进程必须有访问权限 Details(详细) 这个解决方案可以用于和一些监控、管理系统进行串联(如:monit),或者与手动重启相结合。 至少,基于文件系统工单恢复总比不能恢复强;同时,这种恢复模式也会是开发或者实验场景下的不错的选择。在某些情况下,通过stop-master.sh杀死master可能不会清理其状态恢复目录下的数据,那么这时候你启动或重启master,将会进入恢复模式。这可能导致master的启动时间长达一分钟(master可能要等待之前注册的worker和客户端超时)。 你也可以使用NFS目录来作为数据恢复目录(虽然这不是官方声明支持的)。如果老的master挂了,你可以在另一个节点上启动master,这个master只要能访问同一个NFS目录,它就能够正确地恢复状态数据,包括之前注册的worker和应用(等价于Zookeeper模式)。后续的应用必须使用新的master来进行注册。 转载自并发编程网 - ifeve.com

优秀的个人博客,低调大师

《Spark官方文档》集群模式概览

集群模式概览 本文简要描述了Spark在集群中各个组件如何运行。想了解如何在集群中启动Spark应用,请参考application submission guide。 组件 Spark应用在集群上运行时,包括了多个独立的进程,这些进程之间通过你的主程序(也叫作驱动器,即:driver)中的SparkContext对象来进行协调。 特别要指出的是,SparkContext能与多种集群管理器通信(包括:Spark独立部署时自带的集群管理器,Mesos或者YARN)。一旦连接上集群管理器,Spark会为该应用在各个集群节点上申请执行器(executor),用于执行计算任务和存储数据。接下来,Spark将应用程序代码(JAR包或者Python文件)发送给所申请到的执行器。最后SparkContext将分割出的任务(task)发送给各个执行器去运行。 这个架构中有几个值得注意的地方: 每个Spark应用程序都有其对应的多个执行器进程,执行器进程在整个应用程序生命周期内,都保持运行状态,并以多线程方式运行所收到的任务。这样的好处是,可以隔离各个Spark应用,从调度角度来看,每个驱动器可以独立调度本应用程序内部的任务,从执行器角度来看,不同的Spark应用对应的任务将会在不同的JVM中运行。然而这种架构同样也有其劣势,多个Spark应用程序之间无法共享数据,除非把数据写到外部存储中。 Spark对底层的集群管理器一无所知。只要Spark能申请到执行器进程,并且能与之通信即可。这种实现方式可以使Spark相对比较容易在一个支持多种应用的集群管理器上运行(如:Mesos或YARN) 驱动器(driver)程序在整个生命周期内必须监听并接受其对应的各个执行器的连接请求(参考:spark.driver.port and spark.fileserver.port in the network config section)。因此,驱动器程序必须能够被所有worker节点访问到。 因为集群上的任务是由驱动器来调度的,所以驱动器应该和worker节点距离近一些,最好在同一个本地局域网中。如果你需要远程对集群发起请求,最好还是在驱动器节点上启动RPC服务,来响应这些远程请求,同时把驱动器本身放在集群worker节点比较近的机器上。 集群管理器类型 Spark支持以下3中集群管理器: Standalone– Spark自带的一个简单的集群管理器,这使得启动一个Spark集群变得非常简单。 Apache Mesos– 一种可以运行Hadoop MapReduce或者服务型应用的通用集群管理器。 Hadoop YARN– Hadoop 2的集群管理器。 另外,使用Spark的EC2 launch scripts可以轻松地在Amazon EC2上启动一个独立集群。 提交Spark应用 利用spark-submit脚本,可以向Spark所支持的任意一种集群提交应用。详见:application submission guide 监控 每一个驱动器(driver)都有其对应的web UI,默认会绑定4040端口(多个并存会按顺序绑定4041、4042…),这个web UI会展示该Spark应用正在运行的任务(task)、执行器(executor)以及所使用的存储信息。只需在浏览器种打开http://<driver-node>:4040即可访问。monitoring guide详细描述了其他监控选项。 作业调度 Spark可以在应用程序之间(集群管理器这一层面)和之内(如:同一个SparkContext对象运行了多个计算作业)控制资源分配。job scheduling overview描述了更详细的信息。 概念和术语 下表简要说明了集群模式下的一些概念和术语: 术语 含义 Application(应用) Spark上运行的应用。包含了驱动器(driver)进程(一个)和集群上的执行器(executor)进程(多个) Application jar(应用jar包) 包含Spark应用程序的jar包。有时候,用户会想要把应用程序代码及其依赖打到一起,形成一个“uber jar”(包含自身以及所有依赖库的jar包),注意这时候不要把Spark或Hadoop的库打进来,这些库会在运行时加载 Driver program(驱动器) 运行main函数并创建SparkContext的进程。 Cluster manager(集群管理器) 用于在集群上申请资源的 外部服务(如:独立部署的集群管理器、Mesos或者YARN) Deploy mode(部署模式) 用于区分驱动器进程在哪里运行。在”cluster”模式下,驱动器将运行在集群上某个节点;在”client“模式下,驱动器在集群之外的客户端运行。 Worker node(工作节点) 集群上运行应用程序代码的任意一个节点。 Executor(执行器) 在集群工作节点上,为某个应用启动的工作进程;专门用于运行计算任务,并在内存或磁盘上保存数据。每个应用都独享其对应的多个执行器。 Task(任务) 下发给执行器的工作单元。 Job(作业) 一个并行计算作业,由一组任务(Task)组成,并由Spark的行动(action)算子(如:save、collect)触发启动;你会在驱动器日志中看到这个术语。 Stage(步骤) 每个作业(Job)可以划分为更小的任务(Task)集合,这就是步骤(Stage),这些步骤彼此依赖形成一个有向无环图(类似于MapReduce中的map和reduce);你会在驱动器日志中看到这个术语。 转载自并发编程网 - ifeve.com

资源下载

更多资源
Mario

Mario

马里奥是站在游戏界顶峰的超人气多面角色。马里奥靠吃蘑菇成长,特征是大鼻子、头戴帽子、身穿背带裤,还留着胡子。与他的双胞胎兄弟路易基一起,长年担任任天堂的招牌角色。

腾讯云软件源

腾讯云软件源

为解决软件依赖安装时官方源访问速度慢的问题,腾讯云为一些软件搭建了缓存服务。您可以通过使用腾讯云软件源站来提升依赖包的安装速度。为了方便用户自由搭建服务架构,目前腾讯云软件源站支持公网访问和内网访问。

Rocky Linux

Rocky Linux

Rocky Linux(中文名:洛基)是由Gregory Kurtzer于2020年12月发起的企业级Linux发行版,作为CentOS稳定版停止维护后与RHEL(Red Hat Enterprise Linux)完全兼容的开源替代方案,由社区拥有并管理,支持x86_64、aarch64等架构。其通过重新编译RHEL源代码提供长期稳定性,采用模块化包装和SELinux安全架构,默认包含GNOME桌面环境及XFS文件系统,支持十年生命周期更新。

WebStorm

WebStorm

WebStorm 是jetbrains公司旗下一款JavaScript 开发工具。目前已经被广大中国JS开发者誉为“Web前端开发神器”、“最强大的HTML5编辑器”、“最智能的JavaScript IDE”等。与IntelliJ IDEA同源,继承了IntelliJ IDEA强大的JS部分的功能。

用户登录
用户注册