Apache Spark源码走读(九)如何进行代码跟读&使用Intellij idea调试Spark源码
<一>如何进行代码跟读
概要
今天不谈Spark中什么复杂的技术实现,只稍为聊聊如何进行代码跟读。众所周知,Spark使用scala进行开发,由于scala有众多的语法糖,很多时候代码跟着跟着就觉着线索跟丢掉了,另外Spark基于Akka来进行消息交互,那如何知道谁是接收方呢?
new Throwable().printStackTrace
代码跟读的时候,经常会借助于日志,针对日志中输出的每一句,我们都很想知道它们的调用者是谁。但有时苦于对spark系统的了解程度不深,或者对scala认识不够,一时半会之内无法找到答案,那么有没有什么简便的办法呢?
我的办法就是在日志出现的地方加入下面一句话
new Throwable().printStackTrace()
现在举一个实际的例子来说明问题。
比如我们在启动spark-shell之后,输入一句非常简单的sc.textFile("README.md"),会输出下述的log
14/07/05 19:53:27 INFO MemoryStore: ensureFreeSpace(32816) called with curMem=0, maxMem=308910489 14/07/05 19:53:27 INFO MemoryStore: Block broadcast_0 stored as values in memory (estimated size 32.0 KB, free 294.6 MB) 14/07/05 19:53:27 DEBUG BlockManager: Put block broadcast_0 locally took 78 ms 14/07/05 19:53:27 DEBUG BlockManager: Putting block broadcast_0 without replication took 79 ms res0: org.apache.spark.rdd.RDD[String] = README.md MappedRDD[1] at textFile at :13
那我很想知道是第二句日志所在的tryToPut函数是被谁调用的该怎么办?
办法就是打开MemoryStore.scala,找到下述语句
logInfo("Block %s stored as %s in memory (estimated size %s, free %s)".format( blockId, valuesOrBytes, Utils.bytesToString(size), Utils.bytesToString(freeMemory)))
在这句话之上,添加如下语句
new Throwable().printStackTrace()
然后,重新进行源码编译
sbt/sbt assembly
再次打开spark-shell,执行sc.textFile("README.md"),就可以得到如下输出,从中可以清楚知道tryToPut的调用者是谁
14/07/05 19:53:27 INFO MemoryStore: ensureFreeSpace(32816) called with curMem=0, maxMem=308910489 14/07/05 19:53:27 WARN MemoryStore: just show the calltrace by entering some modified code java.lang.Throwable at org.apache.spark.storage.MemoryStore.tryToPut(MemoryStore.scala:182) at org.apache.spark.storage.MemoryStore.putValues(MemoryStore.scala:76) at org.apache.spark.storage.MemoryStore.putValues(MemoryStore.scala:92) at org.apache.spark.storage.BlockManager.doPut(BlockManager.scala:699) at org.apache.spark.storage.BlockManager.put(BlockManager.scala:570) at org.apache.spark.storage.BlockManager.putSingle(BlockManager.scala:821) at org.apache.spark.broadcast.HttpBroadcast.(HttpBroadcast.scala:52) at org.apache.spark.broadcast.HttpBroadcastFactory.newBroadcast(HttpBroadcastFactory.scala:35) at org.apache.spark.broadcast.HttpBroadcastFactory.newBroadcast(HttpBroadcastFactory.scala:29) at org.apache.spark.broadcast.BroadcastManager.newBroadcast(BroadcastManager.scala:62) at org.apache.spark.SparkContext.broadcast(SparkContext.scala:787) at org.apache.spark.SparkContext.hadoopFile(SparkContext.scala:556) at org.apache.spark.SparkContext.textFile(SparkContext.scala:468) at $line5.$read$$iwC$$iwC$$iwC$$iwC.(:13) at $line5.$read$$iwC$$iwC$$iwC.(:18) at $line5.$read$$iwC$$iwC.(:20) at $line5.$read$$iwC.(:22) at $line5.$read.(:24) at $line5.$read$.(:28) at $line5.$read$.() at $line5.$eval$.(:7) at $line5.$eval$.() at $line5.$eval.$print() at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:483) at org.apache.spark.repl.SparkIMain$ReadEvalPrint.call(SparkIMain.scala:788) at org.apache.spark.repl.SparkIMain$Request.loadAndRun(SparkIMain.scala:1056) at org.apache.spark.repl.SparkIMain.loadAndRunReq$1(SparkIMain.scala:614) at org.apache.spark.repl.SparkIMain.interpret(SparkIMain.scala:645) at org.apache.spark.repl.SparkIMain.interpret(SparkIMain.scala:609) at org.apache.spark.repl.SparkILoop.reallyInterpret$1(SparkILoop.scala:796) at org.apache.spark.repl.SparkILoop.interpretStartingWith(SparkILoop.scala:841) at org.apache.spark.repl.SparkILoop.command(SparkILoop.scala:753) at org.apache.spark.repl.SparkILoop.processLine$1(SparkILoop.scala:601) at org.apache.spark.repl.SparkILoop.innerLoop$1(SparkILoop.scala:608) at org.apache.spark.repl.SparkILoop.loop(SparkILoop.scala:611) at org.apache.spark.repl.SparkILoop$$anonfun$process$1.apply$mcZ$sp(SparkILoop.scala:936) at org.apache.spark.repl.SparkILoop$$anonfun$process$1.apply(SparkILoop.scala:884) at org.apache.spark.repl.SparkILoop$$anonfun$process$1.apply(SparkILoop.scala:884) at scala.tools.nsc.util.ScalaClassLoader$.savingContextLoader(ScalaClassLoader.scala:135) at org.apache.spark.repl.SparkILoop.process(SparkILoop.scala:884) at org.apache.spark.repl.SparkILoop.process(SparkILoop.scala:982) at org.apache.spark.repl.Main$.main(Main.scala:31) at org.apache.spark.repl.Main.main(Main.scala) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:483) at org.apache.spark.deploy.SparkSubmit$.launch(SparkSubmit.scala:303) at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:55) at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala) 14/07/05 19:53:27 INFO MemoryStore: Block broadcast_0 stored as values in memory (estimated size 32.0 KB, free 294.6 MB) 14/07/05 19:53:27 DEBUG BlockManager: Put block broadcast_0 locally took 78 ms 14/07/05 19:53:27 DEBUG BlockManager: Putting block broadcast_0 without replication took 79 ms res0: org.apache.spark.rdd.RDD[String] = README.md MappedRDD[1] at textFile at :13
git同步
对代码作了修改之后,如果并不想提交代码,那该如何将最新的内容同步到本地呢?
git reset --hard git pull origin master
Akka消息跟踪
追踪消息的接收者是谁,相对来说比较容易,只要使用好grep就可以了,当然前提是要对actor model有一点点了解。
还是举个实例吧,我们知道CoarseGrainedSchedulerBackend会发送LaunchTask消息出来,那么谁是接收方呢?只需要执行以下脚本即可。
grep LaunchTask -r core/src/main
从如下的输出中,可以清楚看出CoarseGrainedExecutorBackend是LaunchTask的接收方,接收到该函数之后的业务处理,只需要去看看接收方的receive函数即可。
core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala: case LaunchTask(data) => core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala: logError("Received LaunchTask command but executor was null") core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedClusterMessage.scala: case class LaunchTask(data: SerializableBuffer) extends CoarseGrainedClusterMessage core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala: executorActor(task.executorId) ! LaunchTask(new SerializableBuffer(serializedTask))
小结
今天的内容相对简单,没有技术含量,自己做个记述,免得时间久了,不记得。
<二>使用Intellij idea调试Spark源码
概要
上篇博文讲述了如何通过修改源码来查看调用堆栈,尽管也很实用,但每修改一次都需要编译,花费的时间不少,效率不高,而且属于侵入性的修改,不优雅。本篇讲述如何使用intellij idea来跟踪调试spark源码。
前提
本文假设开发环境是在Linux平台,并且已经安装下列软件,我个人使用的是arch linux。
- jdk
- scala
- sbt
- intellij-idea-community-edition
安装scala插件
为idea安装scala插件,具体步骤如下:
- 选择File->Setting
2. 选择右侧的Install Jetbrains Plugin,在弹出窗口的左侧输入scala,然后点击安装,如下图所示:
3. scala插件安装结束,需要重启idea生效
由于idea 13已经原生支持sbt,所以无须为idea安装sbt插件。
源码下载和导入
下载源码,假设使用git同步最新的源码:
git clone https://github.com/apache/spark.git
导入Spark源码
1. 选择File->Import Project, 在弹出的窗口中指定spark源码目录
2. 选择项目类型为sbt project,然后点击next
3. 在新弹出的窗口中先选中"Use auto-import",然后点击Finish
导入设置完成,进入漫长的等待,idea会对导入的源码进行编译,同时会生成文件索引。
如果在提示栏出现如下的提示内容"is waiting for .sbt.ivy.lock",说明该lock文件无法创建,需要手工删除,具体操作如下:
cd $HOME/.ivy2 rm *.lock
手工删除掉lock之后,重启idea,重启后会继续上次没有完成的sbt过程。
源码编译
使用idea来编译spark源码,中间会有多次出错,问题的根源是sbt/sbt gen-idea的时候并没有很好的解决依赖关系。
解决办法如下,
1. 选择File->Project Structures
2. 在右侧dependencies中添加新的module
选择spark-core
其它模块如streaming-twitter, streaming-kafka, streaming-flume, streaming-mqtt出错的情况解决方案与此类似。
注意Example编译报错时的处理稍有不同,在指定Dependencies的时候,不是选择Library而是选择Module dependency,在弹出的窗口中选择sql.
有关编译出错问题的解决可以看一下这个链接,http://apache-spark-user-list.1001560.n3.nabble.com/Errors-occurred-while-compiling-module-spark-streaming-zeromq-IntelliJ-IDEA-13-0-2-td1282.html
调试LogQuery
1. 选择Run->Edit configurations
2. 添加Application,注意右侧窗口中配置项内容的填写,分别为Main class, vm options, working directory, use classpath of module
-Dspark.master=local 指定Spark的运行模式,可根据需要作适当修改。
3. 至此,在Run菜单中可以发现有"Run LogQuery"一项存在,尝试运行,保证编译成功。
4. 断点设置,在源文件的左侧双击即可打上断点标记,然后点击Run->"Debug LogQuery", 大功告成,如下图所示,可以查看变量和调用堆栈了。
参考

低调大师中文资讯倾力打造互联网数据资讯、行业资源、电子商务、移动互联网、网络营销平台。
持续更新报道IT业界、互联网、市场资讯、驱动更新,是最及时权威的产业资讯及硬件资讯报道平台。
转载内容版权归作者及来源网站所有,本站原创内容转载请注明来源。
- 上一篇
Apache Spark源码走读(八)Graphx实现剖析&spark repl实现详解
<一>Graphx实现剖析 概要 图的并行化处理一直是一个非常热门的话题,这里头的重点有两个,一是如何将图的算法并行化,二是找到一个合适的并行化处理框架。Spark作为一个非常优秀的并行处理框架,将一些并行化的算法移到其上面就成了一个很自然的事情。 Graphx是一些图的常用算法在Spark上的并行化实现,同时提供了丰富的API接口。本文就Graphx的代码架构及pagerank在graphx中的具体实现做一个初步的学习。 Google为什么赢得了搜索引擎大战 当Google还在起步的时候,在搜索引擎领域,Yahoo正如日中天,红的发紫。显然,在Google面前的是一堵让人几乎没有任何希望的墙。 但世事难料,现在“外事问谷歌”成了不争的事实,Yahoo应也陪客了。 这种转换到底是如何形成的了,有一个因素是这样的,那就是Google发明了显著提高搜索准确率的PageRank算法。如果说PageRank算法的提出让谷歌牢牢站稳了搜索引擎大战的脚跟,这是毫不夸张的。 搜索引擎有几个要考虑的关键因素(个人观点而已)。 要想吸引用户,就必须要有出色的搜索准确率 有了用户,才能做广告投...
- 下一篇
Apache Spark源码走读(十一)浅谈mllib中线性回归的算法实现&Spark MLLib中拟牛顿法L-BFGS的源码实现
<一>浅谈mllib中线性回归的算法实现 概要 本文简要描述线性回归算法在Spark MLLib中的具体实现,涉及线性回归算法本身及线性回归并行处理的理论基础,然后对代码实现部分进行走读。 线性回归模型 机器学习算法是的主要目的是找到最能够对数据做出合理解释的模型,这个模型是假设函数,一步步的推导基本遵循这样的思路 假设函数 为了找到最好的假设函数,需要找到合理的评估标准,一般来说使用损失函数来做为评估标准 根据损失函数推出目标函数 现在问题转换成为如何找到目标函数的最优解,也就是目标函数的最优化 具体到线性回归来说,上述就转换为: 梯度下降法 那么如何求得损失函数的最优解,针对最小二乘法来说可以使用梯度下降法。 算法实现 随机梯度下降 正则化 如何解决这些问题呢?可以采用收缩方法(shrinkage method),收缩方法又称为正则化(regularization)。 主要是岭回归(ridge regression)和lasso回归。通过对最小二乘估计加 入罚约束,使某些系数的估计为0。 线性回归的代码实现 上面讲述了一些数学基础,在将这些数学理论用代码来实现的时候,最...
相关文章
文章评论
共有0条评论来说两句吧...
文章二维码
点击排行
推荐阅读
最新文章
- CentOS7编译安装Gcc9.2.0,解决mysql等软件编译问题
- SpringBoot2整合MyBatis,连接MySql数据库做增删改查操作
- CentOS7,8上快速安装Gitea,搭建Git服务器
- Springboot2将连接池hikari替换为druid,体验最强大的数据库连接池
- CentOS6,7,8上安装Nginx,支持https2.0的开启
- Windows10,CentOS7,CentOS8安装Nodejs环境
- Red5直播服务器,属于Java语言的直播服务器
- CentOS7编译安装Cmake3.16.3,解决mysql等软件编译问题
- Windows10,CentOS7,CentOS8安装MongoDB4.0.16
- CentOS8,CentOS7,CentOS6编译安装Redis5.0.7