《深入理解Spark:核心思想与源码分析》——1.2节Spark初体验

本节书摘来自华章社区《深入理解Spark:核心思想与源码分析》一书中的第1章,第1.2节Spark初体验,作者耿嘉安,更多章节内容可以访问云栖社区“华章社区”公众号查看

1.2 Spark初体验
本节通过Spark的基本使用,让读者对Spark能有初步的认识,便于引导读者逐步深入学习。
1.2.1 运行spark-shell
要运行spark-shell,需要先对Spark进行配置。
1)进入Spark的conf文件夹:
cd ~/install/spark-1.2.0-bin-hadoop1/conf
2)复制一份spark-env.sh.template,命名为spark-env.sh,对它进行编辑,命令如下:
cp spark-env.sh.template spark-env.sh
vim spark-env.sh
3)添加如下配置:
export SPARK_MASTER_IP=127.0.0.1
export SPARK_LOCAL_IP=127.0.0.1
4)启动spark-shell:
cd ~/install/spark-1.2.0-bin-hadoop1/bin
./spark-shell
最后我们会看到spark启动的过程,如图1-3所示。


039140b89848760d181f78c6b2350135567d03c8

从以上启动日志中我们可以看到SparkEnv、MapOutputTracker、BlockManagerMaster、DiskBlockManager、MemoryStore、HttpFileServer、SparkUI等信息。它们是做什么的?此处望文生义即可,具体内容将在后边的章节详细讲解。
1.2.2 执行word count
这一节,我们通过word count这个耳熟能详的例子来感受下Spark任务的执行过程。启动spark-shell后,会打开scala命令行,然后按照以下步骤输入脚本。
1)输入val lines = sc.textFile("../README.md", 2),执行结果如图1-4所示。


b4568b98c806240f7d6ac1983c076f16059a7b40


55c543774d837acece31a958e0e10626770f658f


0eea5cc88d3158f3f1bbaefeefb07f04b3eb0ebc

5)输入counts.foreach(println),任务执行过程如图1-8和图1-9所示。输出结果如图1-10所示。


4d84cbcd67c1f133e0613b86dc7e3b737af548d8


c7cfa8cc8d2165f45491cf6b6ca0e02866a4518c

在这些输出日志中,我们先是看到Spark中任务的提交与执行过程,然后看到单词计数的输出结果,最后打印一些任务结束的日志信息。有关任务的执行分析,笔者将在第5章中展开。
1.2.3 剖析spark-shell
通过word count在spark-shell中执行的过程,我们想看看spark-shell做了什么。spark-shell中有以下一段脚本,见代码清单1-1。
代码清单1-1 spark-shell中的一段脚本

function main() {
    if $cygwin; then
stty -icanonmin 1 -echo > /dev/null 2>&1
        export SPARK_SUBMIT_OPTS="$SPARK_SUBMIT_OPTS -Djline.terminal=unix"
        "$FWDIR"/bin/spark-submit --class org.apache.spark.repl.Main "${SUBMISSION_OPTS[@]}" spark-shell "${APPLICATION_OPTS[@]}"
sttyicanon echo > /dev/null 2>&1
    else
        export SPARK_SUBMIT_OPTS
        "$FWDIR"/bin/spark-submit --class org.apache.spark.repl.Main "${SUBMISSION_OPTS[@]}" spark-shell "${APPLICATION_OPTS[@]}"
fi
}
我们看到脚本spark-shell里执行了spark-submit脚本,打开spark-submit脚本,发现其中包含以下脚本。
exec "$SPARK_HOME"/bin/spark-class org.apache.spark.deploy.SparkSubmit "${ORIG_ARGS[@]}"
脚本spark-submit在执行spark-class脚本时,给它增加了参数SparkSubmit。打开spark-class脚本,其中包含以下脚本,见代码清单1-2。
代码清单1-2 spark-class
if [ -n "${JAVA_HOME}" ]; then
    RUNNER="${JAVA_HOME}/bin/java"
else
    if [ `command -v java` ]; then
        RUNNER="java"
    else
       echo "JAVA_HOME is not set" >&2
       exit 1
    fi
fi

exec "$RUNNER" -cp "$CLASSPATH" $JAVA_OPTS "$@"

读到这里,应该知道Spark启动了以SparkSubmit为主类的jvm进程。
为便于在本地对Spark进程使用远程监控,给spark-class脚本追加以下jmx配置:

JAVA_OPTS="-XX:MaxPermSize=128m $OUR_JAVA_OPTS -Dcom.sun.management.jmxremote -Dcom.sun.management.jmxremote.port=10207 -Dcom.sun.management.jmxremote.authenticate=false -Dcom.sun.management.jmxremote.ssl=false"

在本地打开jvisualvm,添加远程主机,如图1-11所示。
右击已添加的远程主机,添加JMX连接,如图1-12所示。

单击右侧的“线程”选项卡,选择main线程,然后单击“线程Dump”按钮,如图1-13所示。
从dump的内容中找到线程main的信息,如代码清单1-3所示。


348e7b6674f0ac5deab2b81c9fc58a9c02ecb3e9

代码清单1-3 main线程dump信息

"main" - Thread t@1
    java.lang.Thread.State: RUNNABLE
        at java.io.FileInputStream.read0(Native Method)
        at java.io.FileInputStream.read(FileInputStream.java:210)
        at scala.tools.jline.TerminalSupport.readCharacter(TerminalSupport.java:152)
        at scala.tools.jline.UnixTerminal.readVirtualKey(UnixTerminal.java:125)
        at scala.tools.jline.console.ConsoleReader.readVirtualKey(ConsoleReader.
        java:933)
        at scala.tools.jline.console.ConsoleReader.readBinding(ConsoleReader.java:1136)
        at scala.tools.jline.console.ConsoleReader.readLine(ConsoleReader.java:1218)
        at scala.tools.jline.console.ConsoleReader.readLine(ConsoleReader.java:1170)
        at org.apache.spark.repl.SparkJLineReader.readOneLine(SparkJLineReader.
        scala:80)
        at scala.tools.nsc.interpreter.InteractiveReader$class.readLine(Interactive-
        Reader.scala:43)
        at org.apache.spark.repl.SparkJLineReader.readLine(SparkJLineReader.scala:25)
        at org.apache.spark.repl.SparkILoop.readOneLine$1(SparkILoop.scala:619)
        at org.apache.spark.repl.SparkILoop.innerLoop$1(SparkILoop.scala:636)
        at org.apache.spark.repl.SparkILoop.loop(SparkILoop.scala:641)
        at org.apache.spark.repl.SparkILoop$$anonfun$process$1.apply$mcZ$sp 
        (SparkI-Loop.scala:968)
        at org.apache.spark.repl.SparkILoop$$anonfun$process$1.apply(SparkILoop.
        scala:916)
        at org.apache.spark.repl.SparkILoop$$anonfun$process$1.apply(SparkILoop.
        scala:916)
        at scala.tools.nsc.util.ScalaClassLoader$.savingContextLoader(ScalaClass
        Loader.scala:135)
        at org.apache.spark.repl.SparkILoop.process(SparkILoop.scala:916)
        at org.apache.spark.repl.SparkILoop.process(SparkILoop.scala:1011)
        at org.apache.spark.repl.Main$.main(Main.scala:31)
        at org.apache.spark.repl.Main.main(Main.scala)
        at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
        at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.
        java:57)
        at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAcces-
        sorImpl.java:43)
        at java.lang.reflect.Method.invoke(Method.java:606)
        at org.apache.spark.deploy.SparkSubmit$.launch(SparkSubmit.scala:358)
        at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:75)
        at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)
从main线程的栈信息中可看出程序的调用顺序:SparkSubmit.main→repl.Main→SparkI-Loop.process。SparkILoop.process方法中会调用initializeSpark方法,initializeSpark的实现见代码清单1-4。
代码清单1-4 initializeSpark的实现
def initializeSpark() {
intp.beQuietDuring {
    command("""
        @transient val sc = {
            val _sc = org.apache.spark.repl.Main.interp.createSparkContext()
            println("Spark context available as sc.")
            _sc
        }
        """)
        command("import org.apache.spark.SparkContext._")
    }
}
我们看到initializeSpark调用了createSparkContext方法,createSparkContext的实现见代码清单1-5。
代码清单1-5 createSparkContext的实现
def createSparkContext(): SparkContext = {
valexecUri = System.getenv("SPARK_EXECUTOR_URI")
valjars = SparkILoop.getAddedJars
valconf = new SparkConf()
    .setMaster(getMaster())
    .setAppName("Spark shell")
    .setJars(jars)
    .set("spark.repl.class.uri", intp.classServer.uri)
if (execUri != null) {
                      conf.set("spark.executor.uri", execUri)
    }
sparkContext = new SparkContext(conf)
    logInfo("Created spark context..")
    sparkContext
}

这里最终使用SparkConf和SparkContext来完成初始化,具体内容将在第3章讲解。代码分析中涉及的repl主要用于与Spark实时交互。

优秀的个人博客,低调大师

微信关注我们

原文链接:https://yq.aliyun.com/articles/107732

转载内容版权归作者及来源网站所有!

低调大师中文资讯倾力打造互联网数据资讯、行业资源、电子商务、移动互联网、网络营销平台。持续更新报道IT业界、互联网、市场资讯、驱动更新,是最及时权威的产业资讯及硬件资讯报道平台。

相关文章

发表评论

资源下载

更多资源
Oracle Database,又名Oracle RDBMS

Oracle Database,又名Oracle RDBMS

Oracle Database,又名Oracle RDBMS,或简称Oracle。是甲骨文公司的一款关系数据库管理系统。它是在数据库领域一直处于领先地位的产品。可以说Oracle数据库系统是目前世界上流行的关系数据库管理系统,系统可移植性好、使用方便、功能强,适用于各类大、中、小、微机环境。它是一种高效率、可靠性好的、适应高吞吐量的数据库方案。

Apache Tomcat7、8、9(Java Web服务器)

Apache Tomcat7、8、9(Java Web服务器)

Tomcat是Apache 软件基金会(Apache Software Foundation)的Jakarta 项目中的一个核心项目,由Apache、Sun 和其他一些公司及个人共同开发而成。因为Tomcat 技术先进、性能稳定,而且免费,因而深受Java 爱好者的喜爱并得到了部分软件开发商的认可,成为目前比较流行的Web 应用服务器。

Java Development Kit(Java开发工具)

Java Development Kit(Java开发工具)

JDK是 Java 语言的软件开发工具包,主要用于移动设备、嵌入式设备上的java应用程序。JDK是整个java开发的核心,它包含了JAVA的运行环境(JVM+Java系统类库)和JAVA工具。

Sublime Text 一个代码编辑器

Sublime Text 一个代码编辑器

Sublime Text具有漂亮的用户界面和强大的功能,例如代码缩略图,Python的插件,代码段等。还可自定义键绑定,菜单和工具栏。Sublime Text 的主要功能包括:拼写检查,书签,完整的 Python API , Goto 功能,即时项目切换,多选择,多窗口等等。Sublime Text 是一个跨平台的编辑器,同时支持Windows、Linux、Mac OS X等操作系统。