Spark读取压缩文件
我的原创地址:https://dongkelun.com/2018/05/30/sparkGZ/
前言
本文讲如何用spark读取gz类型的压缩文件,以及如何解决我遇到的各种问题。
1、文件压缩
下面这一部分摘自Spark快速大数据分析:
在大数据工作中,我们经常需要对数据进行压缩以节省存储空间和网络传输开销。对于大多数Hadoop输出格式来说,我们可以指定一种压缩编解码器来压缩数据。
选择一个输出压缩编解码器可能会对这些数据以后的用户产生巨大影响。对于像Spark 这样的分布式系统,我们通常会尝试从多个不同机器上一起读入数据。要实现这种情况,每个工作节点都必须能够找到一条新记录的开端。有些压缩格式会使这变得不可能,而必须要单个节点来读入所有数据,这就很容易产生性能瓶颈。可以很容易地从多个节点上并行读取的格式被称为“可分割”的格式。下表列出了可用的压缩选项。
格式 | 可分割 | 平均压缩速度 | 文本文件压缩效率 | Hadoop压缩编解码器 | 纯Java实现 | 原生 | 备注 |
---|---|---|---|---|---|---|---|
gzip | 否 | 快 | 高 | org.apache.hadoop.io.compress.GzipCodec | 是 | 是 | |
lzo | 是(取决于所使用的库) | 非常快 | 中等 | com.hadoop.compression.lzo.LzoCodec | 是 | 是 | 需要在每个节点上安装LZO |
bzip2 | 是 | 慢 | 非常高 | org.apache.hadoop.io.compress.Bzip2Codec | 是 | 是 | 为可分割版本使用纯Java |
zlib | 否 | 慢 | 中等 | org.apache.hadoop.io.compress.DefaultCodec | 是 | 是 | Hadoop 的默认压缩编解码器 |
Snappy | 否 | 非常快 | 低 | org.apache.hadoop.io.compress.SnappyCodec | 否 | 是 | Snappy 有纯Java的移植版,但是在Spark/Hadoop中不能用 |
尽管Spark 的textFile() 方法可以处理压缩过的输入,但即使输入数据被以可分割读取的方式压缩,Spark 也不会打开splittable。因此,如果你要读取单个压缩过的输入,最好不要考虑使用Spark 的封装,而是使用newAPIHadoopFile 或者hadoopFile,并指定正确的压缩编解码器。
关于上面一段话的个人测试:选取一个大文件txt,大小为1.5G,写spark程序读取hdfs上的该文件然后写入hive,经测试在多个分区的情况下,txt执行时间最短,因为在多个机器并行执行,而gz文件是不可分割的,即使指定分区数目,但依然是一个分区,一个task,即在一个机器上执行,bzip2格式的文件虽然是可分割的,即可以按照指定的分区分为不同的task在多个机器上执行,但是执行时间长,比gz时间还长,经过四次改变bzip2的分区,发现最快的时间和gz时间是一样的,如果指定一个分区的话,比gz要慢很多,我想这样就可以更好的理解:”尽管Spark 的textFile() 方法可以处理压缩过的输入,但即使输入数据被以可分割读取的方式压缩,Spark 也不会打开splittable”这句话了。
后续测试:根据集群的cpu合理分配executor的个数的情况下,txt的时间缩短到1分钟,bzip2缩短到1.3分钟,而对gz重新分区(reparation)缩短到2分钟,可以看到在合理分配资源的情况下,bzip2比gz快不少,但依然赶不上txt,当然这也的结果可能受文件大小和集群资源的限制,所以根据自己的实际需求测试再决定用哪个即可。
2、代码
代码很简单,用textFile()即可,假设,我的数据名为data.txt.gz,我把它放在hdfs上的/tmp/dkl路径下那么代码为:
val path = "hdfs://ambari.master.com:8020/tmp/dkl/data.txt.gz" val data = sc.textFile(path)
注:把数据放在hdfs的命令为
hadoop fs -put data.tar.gz /tml/dkl
3、一些小问题
3.1 数据
首先造几个数据吧,先创建一个txt,名字为data.txt,内容如下
1 张三 上海 2018-05-25 2 张三 上海 2018-05-25 3 张三 上海 2018-05-25 4 张三 上海 2018-05-25 5 张三 上海 2018-05-25
3.2 如何压缩
那么如如何打包为gz格式的压缩文件呢,分两种
一、 在windows上打包,如果不想在Linux服务器上用命令打包,那么可以直接用windows上的软件打包(win上常见的zip,rar格式,spark是不支持的),我用7-zip软件压缩,大家可百度7-zip或直接在https://www.7-zip.org/下载安装,压缩格式选gzip即可。
二、 在Linux上压缩,可通过下面的命令
1、保留原文件
gzip –c data.txt > data.txt.gz
2、不保留原文件,默认生成的文件名为原文件名.gz,即data.txt.gz
gzip data.txt
压缩完了之后,跑一下程序测试一下
data.take(3).foreach(println)
1 张三 上海 2018-05-25 2 张三 上海 2018-05-25 3 张三 上海 2018-05-25
根据结果看没问题。
三、 说明
在Linux上用tar命令压缩,spark虽然可以读,但是第一行会有文件信息
tar -zcvf data.tar.gz data.txt
3.3 文件编码问题
别人给我的原文件是.rar,那我需要将其解压之后得到txt,然后按照上述方式压缩为.gz,然后上传到hdfs,进行代码测试,打印前几条发现乱码,查了一下发现原文件是gbk编码的,且sc.textFile()不能指定编码,只能读取utf8格式,其他格式就会乱码。
注意:因为实际情况下解压后的txt文件很大,windows是直接打不开的,所以不能通过打开文件修改编码的方法去解决。
3.3.1 构建测试gbk格式的文件
1、windows上可以用记事本打开,另存为,编码选择ANSI即可
2、Linux可以通过下面的命令修改
iconv -f utf8 -t gbk data.txt > data_gbk.txt
测试一下输出,发现确实乱码了(直接测试txt即可)
1 ���� �Ϻ� 2018-05-25 2 ���� �Ϻ� 2018-05-25 3 ���� �Ϻ� 2018-05-25
3.3.2 代码解决
通过如下代码测试即可
定义方法
import org.apache.spark.rdd.RDD import org.apache.spark.SparkContext import org.apache.hadoop.io.LongWritable import org.apache.hadoop.mapred.TextInputFormat import org.apache.hadoop.io.Text def transfer(sc: SparkContext, path: String): RDD[String] = { sc.hadoopFile(path, classOf[TextInputFormat], classOf[LongWritable], classOf[Text], 1) .map(p => new String(p._2.getBytes, 0, p._2.getLength, "GBK")) }
测试方法
transfer(sc, path3).take(3).foreach(println)
3.3.3 Linux命令
可直接通过Linux命令转换txt的编码格式,再压缩,这样代码就不用修改
其实在3.2.1中已经涉及到了
1、通过Linux自带的命令iconv
iconv不能覆盖原来的文件,只能生成新的文件之后,再通过mv命令去覆盖
iconv -f gbk -t utf8 data_gbk.txt > data_new.txt
2、通过enca
enca可以直接覆盖原来的文件,这样如果不想改变来的文件名,就少一步mv操作了,enca不是子系统自带的,需要自己下载安装,可在http://dl.cihar.com/enca/下载最新版本。
#下载&解压 wget http://dl.cihar.com/enca/enca-1.19.tar.gz tar -zxvf enca-1.19.tar.gz cd enca-1.19 #编译安装 ./configure make make install
安装好了之后通过下面的命令转换即可
enca -L zh_CN -x UTF-8 data_gbk.txt
转换编码格式之后,在通过程序测试即可。
3.4 rdd换df
由于文件过大,不能直接打开看也没用垃圾数据,造成格式问题,如果有垃圾数据,在rdd转df的过程中会产生异常,这里记录一下我碰见的问题。
1、首先可以先打印出前几行数据查看一下该文件的大体格式
2、碰到的一个一个异常
代码用的旧版spark(1.6版本) 将rdd动态转为dataframe里面的方法。
if (assertnotnull(input[0, org.apache.spark.sql.Row, true]).isNullAt) null else staticinvoke(class org.apache.spark.unsafe.types.UTF8String, StringType, fromString, validateexternaltype(getexternalrowfield(assertnotnull(input[0, org.apache.spark.sql.Row, true])....
原因是因为文件里有一行数据为垃圾数据,这行数据的列数和列名的个数不一样导致的,可以在代码中过滤掉这样数据即可。
.filter(_.length == colName.length)

低调大师中文资讯倾力打造互联网数据资讯、行业资源、电子商务、移动互联网、网络营销平台。
持续更新报道IT业界、互联网、市场资讯、驱动更新,是最及时权威的产业资讯及硬件资讯报道平台。
转载内容版权归作者及来源网站所有,本站原创内容转载请注明来源。
- 上一篇
大数据Hadoop:杀鸡用的宰牛刀
Hadoop是个庞大的重型解决方案,它的设计目标本来就是大规模甚至超大规模的集群,面对的是上百甚至上千个节点,这样就会带来两个问题: 自动化管理管任务分配机制:这样规模的集群,显然不大可能针对每个节点提供个性化的管理控制,否则工作量会大到累死人,必须采用自动化的管理和任务分配手段,而这并不是件简单的事情。 强容错能力:大规模集群在某个任务执行周期内,也就是几小时之内,都有可能发生设备故障。如果没有强容错能力可能任何任务都无法执行出来。而容错同样并不容易实现,同样非常消耗计算和存储资源。 而且,Hadoop的产品线丰富,这本来是好事情,但要把这些模块都放在一个平台上运行,还要梳理好各个模块之间的相互依赖性,就需要一个包罗万象的复杂框架,这也使得Hadoop体系显得很沉重。 但是,很多情况下,用户的数据并不总会有那么多。除了一些互联网巨头企业和国家级通信运营商及银行外,大多数用户的数据量并没有大到需要几百上千个节点才能处理的地步。而且,很多用户也只是为了常规的结构化数据运算(主要也就是SQL),用不着那么完整的产品线。 结果,我们经常看到的现象是:用户上了Hadoop,只有四个或八个节点,...
- 下一篇
开源的Trafodion如何实现事务与分析一体化?
Trafodion是Apache基金会的一个开源项目,提供了一个成熟的企业级SQL-on-HBase解决方案。Trafodion的主要设计思想是处理operational类型的工作负载,或者是传统的OLTP应用。此外,对于需要保证数据一致性、需要标准SQL开发接口,或者需要实时数据读写分析的应用,Trafodion也是一个十分合适的解决方案。 Trafodion的前世今生 Trafodion的渊源可以追溯到数据库技术的“史前时代”。 Trafodion的鼻祖是天腾 (Tandem) 公司的NonStop SQL。之后在1989年,天腾推出了NonStop SQL/MP,它是第一个MPP分布式数据库,实现海量并发SQL执行。在当时的历史条件下,NonStop SQL/MP开创性地提供了线性横向扩展能力(我们如今耳熟能详的scale out)。 1999年,在Graefe Goetz的帮助下,NonStop SQL/MX诞生了,它实现了基于成本的CBO SQL优化器和基于数据流的MPP SQL执行器。2002年,惠普公司和康柏公司合并,已被康柏收购的天腾也成为了惠普的一部分。2006年,N...
相关文章
文章评论
共有0条评论来说两句吧...
文章二维码
点击排行
推荐阅读
最新文章
- CentOS7编译安装Cmake3.16.3,解决mysql等软件编译问题
- SpringBoot2全家桶,快速入门学习开发网站教程
- CentOS8编译安装MySQL8.0.19
- SpringBoot2整合MyBatis,连接MySql数据库做增删改查操作
- CentOS7,CentOS8安装Elasticsearch6.8.6
- CentOS8,CentOS7,CentOS6编译安装Redis5.0.7
- SpringBoot2整合Redis,开启缓存,提高访问速度
- SpringBoot2编写第一个Controller,响应你的http请求并返回结果
- SpringBoot2初体验,简单认识spring boot2并且搭建基础工程
- Eclipse初始化配置,告别卡顿、闪退、编译时间过长