spark MapOutputTrackerMaster
最近用了一个RowNumber() over()函数 进行三张4000万数据的关联筛选,建表语句如下:
create table CiCustomerPortrait2 as SELECT ROW_NUMBER() OVER() as id,* from (select t_7.phone_no,t_7.L1301,t_7.L1431,t_7.L1449,t_7.L1489,t_8.L1713,t_92.L1879,t_92.L1907 from DW_COC_LABEL_INTERNET_D_20151123 t_7 inner join DW_COC_LABEL_INTERNET_M_201510 t_8 on t_7.phone_no = t_8.phone_no inner join DW_COC_LABEL_BITEMP_M_201510 t_92 on t_7.phone_no = t_92.phone_no ) a 一方面由于多表关联(每个表4000W以上数据)一方面窗口函数的原因异常吃内存。
spark-env.sh参数如下:(default我就不贴了)
给了18个executor,每个executor有12G内存,每台服务器启动3个core,那么每个core就是4G内存。但执行过程中,一直卡在这里:
查询相关资料。
Shuffle的数据如何拉取过来
作业提交的时候,DAGScheduler会把Shuffle的过程切分成map和reduce两个Stage(之前一直被我叫做shuffle前和shuffle后),map的中间结果是写入到本地硬盘的,而不是内存,所以对磁盘的读写要求非常高,(最好是固态硬盘比较快,本人亲自尝试,同样的性能参数下,固态硬盘会比普通磁盘快10倍。)默认是一个map的中间结果文件是M*R(M=map数量,R=reduce的数量),设置了spark.shuffle.consolidateFiles为true之后是R个文件,根据bucketId把要分到同一个reduce的结果写入到一个文件中。MapOutputTrackerWorker向MapOutputTrackerMaster获取shuffle相关的map结果信息。把map结果信息构造成BlockManagerId --> Array(BlockId, size)的映射关系,通过BlockManager的getMultiple批量拉取block。
当过了N久执行过去了后,将生成好的文件拷贝到hdfs相应路径下
最终将生成好的文件拷贝到目录下,整个耗时10多分钟。Spark beeline方式连接有个缺陷,如果你一个job执行的时间过长,就会卡在那里,即便执行完也卡在那里,这样项目中用jdbc连接的时候,程序也不会退出,一直等待着结束,造成程序无法继续向下执行。这个还要调整各方面参数想办法优化执行效率。
低调大师中文资讯倾力打造互联网数据资讯、行业资源、电子商务、移动互联网、网络营销平台。
持续更新报道IT业界、互联网、市场资讯、驱动更新,是最及时权威的产业资讯及硬件资讯报道平台。
转载内容版权归作者及来源网站所有,本站原创内容转载请注明来源。
- 上一篇
Hadoop MapReduce编程学习
一直在搞spark,也没时间弄hadoop,不过Hadoop基本的编程我觉得我还是要会吧,看到一篇不错的文章,不过应该应用于hadoop2.0以前,因为代码中有 conf.set("mapred.job.tracker","192.168.1.2:9001");新框架中已改为 Yarn-site.xml 中的 resouceManager 及 nodeManager 具体配置项,新框架中历史 job 的查询已从 Job tracker 剥离,归入单独的mapreduce.jobtracker.jobhistory 相关配置.mapred.job.tracker的主要用途在于合并map之后的中间文件,就如同spark的repatition函数吧,为了防止接下来shuffle所造成的RDD过多,合并下~ 转自:http://www.cnblogs.com/xia520pi/archive/2012/06/04/2534533.html 1、数据去重 "数据去重"主要是为了掌握和利用并行化思想来对数据进行有意义的筛选。统计大数据集上的数据种类个数、从网站日志中计算访问地等这些看似庞杂的任务都...
- 下一篇
Spark RDDRelation
package main.asiainfo.coc.sparksql import org.apache.spark.sql.SQLContext import org.apache.spark.{SparkConf, SparkContext} case class Record(key: Int, value: String) object RDDRelation { def main(args: Array[String]) { val sparkConf = new SparkConf().setAppName("RDDRelation").setMaster("local") val sc = new SparkContext(sparkConf) val sqlContext = new SQLContext(sc) //引入SQL context 提供所有的SQL 与 隐式转换方法 import sqlContext.implicits._ //生成1到100的数字,并做成key value形式的DateFrame val df = sc.parallelize((1 t...
相关文章
文章评论
共有0条评论来说两句吧...
文章二维码
点击排行
-
Docker使用Oracle官方镜像安装(12C,18C,19C)
- Springboot2将连接池hikari替换为druid,体验最强大的数据库连接池
- CentOS8编译安装MySQL8.0.19
- Docker快速安装Oracle11G,搭建oracle11g学习环境
- SpringBoot2配置默认Tomcat设置,开启更多高级功能
- MySQL8.0.19开启GTID主从同步CentOS8
- CentOS7,8上快速安装Gitea,搭建Git服务器
- Jdk安装(Linux,MacOS,Windows),包含三大操作系统的最全安装
- SpringBoot2编写第一个Controller,响应你的http请求并返回结果
推荐阅读
最新文章
- Jdk安装(Linux,MacOS,Windows),包含三大操作系统的最全安装
- SpringBoot2配置默认Tomcat设置,开启更多高级功能
- Springboot2将连接池hikari替换为druid,体验最强大的数据库连接池
- SpringBoot2整合MyBatis,连接MySql数据库做增删改查操作
- Hadoop3单机部署,实现最简伪集群
- Docker使用Oracle官方镜像安装(12C,18C,19C)
- MySQL8.0.19开启GTID主从同步CentOS8
- SpringBoot2编写第一个Controller,响应你的http请求并返回结果
- Windows10,CentOS7,CentOS8安装MongoDB4.0.16
- CentOS7,CentOS8安装Elasticsearch6.8.6