Hadoop学习笔记(一):MapReduce的输入格式
Hadoop学习有一段时间了,但是缺乏练手的项目,老是学了又忘。想想该整理一个学习笔记啥的,这年头打字比写字方便。果断开博客,咩哈哈~~
开场白结束(木有文艺细胞)
默认的MapReduce作业
import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configured; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapred.FileInputFormat; import org.apache.hadoop.mapred.FileOutputFormat; import org.apache.hadoop.mapred.JobClient; import org.apache.hadoop.mapred.JobConf; import org.apache.hadoop.mapred.MapRunner; import org.apache.hadoop.mapred.TextInputFormat; import org.apache.hadoop.mapred.lib.HashPartitioner; import org.apache.hadoop.mapred.lib.IdentityMapper; import org.apache.hadoop.mapred.lib.IdentityReducer; import org.apache.hadoop.util.Tool; import org.apache.hadoop.util.ToolRunner; public class MinimalMapReducewithDefaults extends Configured implements Tool { @Override public int run(String[] arg0) throws Exception { JobConf conf = JobBuilder.parseInputAndOutput(this, getConf(), arg0); if (conf == null) { return -1; } conf.setInputFormat(TextInputFormat.class); conf.setNumMapTasks(1); conf.setMapperClass(IdentityMapper.class); conf.setMapRunnerClass(MapRunner.class); conf.setMapOutputKeyClass(LongWritable.class); conf.setMapOutputValueClass(Text.class); conf.setPartitionerClass(HashPartitioner.class); conf.setReducerClass(IdentityReducer.class); conf.setOutputKeyClass(LongWritable.class); conf.setOutputValueClass(Text.class); JobClient.runJob(conf); return 0; } /** * @param args */ public static void main(String[] args) throws Exception { int exitCode = ToolRunner.run(new MinimalMapReducewithDefaults(), args); System.exit(exitCode); //hdfs://192.168.174.128:9000/user/root/input/Temperature.txt hdfs://192.168.174.128:9000/user/root/output/test00001 } public static class JobBuilder { public static JobConf parseInputAndOutput(Tool tool, Configuration conf, String[] args) { if (args.length != 2) { printUsage(tool, "<inout> <output>"); return null; } JobConf jobConf = new JobConf(conf, tool.getClass()); FileInputFormat.addInputPath(jobConf, new Path(args[0])); FileOutputFormat.setOutputPath(jobConf, new Path(args[1])); return jobConf; } public static void printUsage(Tool tool, String extraArgsUsage) { System.err.printf("Usage: % [genericOptions] %s\n\n", tool .getClass().getSimpleName(), extraArgsUsage); } } }
这些个默认设置都不需要显式的设置,但是需要知道默认设置的是啥,贴出来省的忘记了。要注意的是:
1) conf.setInputFormat(TextInputFormat.class); 默认的输入格式;
2) conf.setMapOutputKeyClass(LongWritable.class); conf.setMapOutputValueClass(Text.class); 默认的map输出格式,其实就是将原样输出而已;
3) conf.setPartitionerClass(HashPartitioner.class); 默认的分区函数,没有特殊操作都会使用这个了。具体算法是:return (key.hashCode()&Integer.Max_VALUE)%numPartitions;
4) conf.setOutputKeyClass(LongWritable.class); conf.setOutputValueClass(Text.class);还是原样输出的格式。
输入分片
一个输入分片(split)是指由单个map处理的输入块,每个map操作之处理一个输入分片。
包含一个一字节为单位的长度和一组存储位置(即一组主机名)。
inputsplit由inputformat创建。inputformat负责产生输入分片并将它们分割成记录。
FileInputFormat类
所有使用文件作为其数据源的inputformat实现的基类。
提供两个功能: 1)定义哪些文件包含在一个作业的输入中;
2)未输入文件生成分片的实现。
输入分片大小: 由最小分片大小、最大分片大小、块大小决定。分片大小在[最小分片大小,最大分片大小]区间内,且取最接近块大小的值。
FileInputFormat的子类:TextInputFormat(默认类型,键是LongWritable类型,值为Text类型,key为当前行在文件中的偏移量,value为当前行本身);
KeyValueTextInputFormat(适合文件自带key,value的情况,只要指定分隔符即可,比较实用);
NLineInputFormat(key为当前行在文件中的偏移量,value为当前行本身,和TextInputFormat不同的是这个类型会为每个mapper指定固定行数的输入分片,N为每个mapper收到的输入行数;
mapred.line.input.format.linespermap属性控制N的值);
SequenceFileInputFormat(使用sequencefile作为map的输入)。
DBInputFormat类
数据库输入,在map中使用jdbc操作数据库,由于多个map将并发操作,故最好用于加载小量的数据集。操作数据库一般使用sqoop。
TextOutputFormat类
把每条记录写为文本行,每个键值使用制表符分割,(当然也可以使用mapred.textoutputformat.separator属性改变默认的分隔符)与TextOutputFormat对应的输入格式是keyValueTextInputFormat。可以用NullWritable来省略输出的键或者值(或者两个都省略,即相当于
NullOutputFormat)
SequenceFileOutputFormat类
即以SequenceFile的格式输出,如果输出需要作为后续MapReduce作业的输入,这是一种很好的输出格式。格式紧凑,容易被压缩。
(昨天弄到太晚了,代码写完直接睡了,今天上。。。)
MultipleOutputFormat类
这个类可以将数据写到多个文件中,比较实用。比如将输出的数据按一定的逻辑归类到不同文件中。(通常是按照输出的键或者值中的信息归类)
这个类有两个实体子类:MultipleTextOutputFormat,属于TextOutputFormat的多版本文件;
MultipleSequenceFileOutputFormat,属于SequenceFileOutputFormat的多版本文件。
关键点:: MultipleOutputFormat类提供了一些子类覆盖来控制输出文件名的protected方法(generateFileNameForKeyValue()方法),这个方法的返回值将用来作为输出的文件名称
这里提到一点,我使用的时候开始继承的是MultipleTextOutputFormat,但是怎么调试都无效,还是输出 part-00000文件。然后各种查资料,最后好像网上有人说hadoop0.20.0版本还是之前的版本中存在bug,必须继承MultipleOutputFormat类才有效。但是我装的是hadoop1.04,不过我用的仍然是老版本的API在写,不知道是否是这个原因,有大神看到的话帮忙解惑下,这里谢谢了!
下面是我继承MultipleOutputFormat类实现reducer输出到多个文件的代码:
import java.io.IOException; import java.util.Iterator; import org.apache.hadoop.conf.Configured; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.util.Progressable; import org.apache.hadoop.util.Tool; import org.apache.hadoop.util.ToolRunner; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.NullWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapred.JobClient; import org.apache.hadoop.mapred.JobConf; import org.apache.hadoop.mapred.MapReduceBase; import org.apache.hadoop.mapred.Mapper; import org.apache.hadoop.mapred.OutputCollector; import org.apache.hadoop.mapred.RecordWriter; import org.apache.hadoop.mapred.Reducer; import org.apache.hadoop.mapred.Reporter; import org.apache.hadoop.mapred.OutputFormat; import org.apache.hadoop.mapred.TextOutputFormat; import org.apache.hadoop.mapred.lib.MultipleOutputFormat; public class PartitionByStationUsingMultipleOutputormat extends Configured implements Tool { static class StationMapper extends MapReduceBase implements Mapper<LongWritable, Text, Text, Text> { private NcdcRecordParser parser = new NcdcRecordParser(); @Override public void map(LongWritable key, Text value, OutputCollector<Text, Text> output, Reporter reporter) throws IOException { parser.parse(value); output.collect(new Text(parser.getYear()), value); } } static class StationReducer extends MapReduceBase implements Reducer<Text, Text, Text, NullWritable> { @Override public void reduce(Text key, Iterator<Text> values, OutputCollector<Text, NullWritable> output, Reporter reporter) throws IOException { while (values.hasNext()) { output.collect(values.next(), NullWritable.get()); } } } static class StationNameMultipleTextOutputFormat extends MultipleOutputFormat<Text, NullWritable> { private NcdcRecordParser parser = new NcdcRecordParser(); private OutputFormat<Text, NullWritable> theTextOutputFormat = null; protected String generateFileNameForKeyValue(Text key, NullWritable value, String name) { parser.parse(key); return parser.getStationId() + "/" + parser.getYear(); //return name + "-" + parser.getStationId(); } @Override protected RecordWriter<Text, NullWritable> getBaseRecordWriter( FileSystem arg0, JobConf arg1, String arg2, Progressable arg3) throws IOException { // TODO Auto-generated method stub if(theTextOutputFormat==null){ theTextOutputFormat= new TextOutputFormat<Text, NullWritable>(); } return theTextOutputFormat.getRecordWriter(arg0, arg1, arg2, arg3); } } @Override public int run(String[] args) throws Exception { JobConf conf = JobBuilder.parseInputAndOutput(this, getConf(), args); if (conf == null) { return -1; } conf.setMapperClass(StationMapper.class); conf.setMapOutputKeyClass(Text.class); conf.setReducerClass(StationReducer.class); conf.setOutputKeyClass(NullWritable.class); conf.setOutputFormat(StationNameMultipleTextOutputFormat.class); JobClient.runJob(conf); return 0; } public static void main(String[] args) throws Exception { int exitCode = ToolRunner.run( new PartitionByStationUsingMultipleOutputormat(), args); System.exit(exitCode); //args //hdfs://192.168.174.128:9000/user/root/input/Temperature.txt hdfs://192.168.174.128:9000/user/root/output/test1234567890 } }
这里面引用了两个帮助类:JobBuilder,NcdcRecordParser。之后的笔记中也会使用到,就一并在这里贴出来了。
JobBuilder:
import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.hadoop.mapred.FileInputFormat; import org.apache.hadoop.mapred.FileOutputFormat; import org.apache.hadoop.mapred.JobConf; import org.apache.hadoop.util.Tool; public class JobBuilder { public static JobConf parseInputAndOutput(Tool tool, Configuration conf, String[] args) { if (args.length != 2) { printUsage(tool, "<inout> <output>"); return null; } JobConf jobConf = new JobConf(conf, tool.getClass()); FileInputFormat.addInputPath(jobConf, new Path(args[0])); FileOutputFormat.setOutputPath(jobConf, new Path(args[1])); return jobConf; } public static void printUsage(Tool tool, String extraArgsUsage) { System.err.printf("Usage: % [genericOptions] %s\n\n", tool .getClass().getSimpleName(), extraArgsUsage); } }
NcdcRecordParser:
import org.apache.hadoop.io.Text; public class NcdcRecordParser { private static final int MISSING_TEMPERATURE = 9999; private String year; private String stationId; private int airTemperature; boolean isMalformed; public void parse(String record) { year = record.substring(5, 9); stationId = record.substring(0, 2); String airTemperatureString; // Remove leading plus sign as parseInt doesn't like them airTemperatureString = record.substring(15, 19); try { airTemperature = Integer.parseInt(airTemperatureString); isMalformed = false; } catch (NumberFormatException e) { isMalformed = true; } } public void parse(Text record) { parse(record.toString()); } public boolean isValidTemperature() { return airTemperature != MISSING_TEMPERATURE && !isMalformed; } public boolean isMissingTemperature() { return airTemperature == MISSING_TEMPERATURE; } public boolean isMalformedTemperature() { return isMalformed; } public String getYear() { return year; } public String getStationId() { return stationId; } public int getAirTemperature() { return airTemperature; } }
输出文件截个图吧:
截图中看到的是以getStationId()命名的子文件夹,在这些文件夹中是以getYear()命名的文件。
MultipleOutputs类
这个类用于在原有输出基础上附加输出,输出是指定名称的,可以写到一个文件或者多个文件中。
使用这个类的静态方法addMultiNamedOutput来设置输出名称,(其实是输出文件名的前缀了,后面都会加上其他数据)
需要注意的一点是,在这个静态方法中指定的名称,必须在Reducer中的MultipleOutputs类的实例方法 getCollector中接收,即作为这个方法的第一个参数传入,不对应的话我试了下会报错。这个报错是因为啥呢?(没找到,或者我还没理解到,以后补充吧,同时希望看到帖子的大神帮忙解惑,这里谢谢了!)
下面是实现的代码,同样使用到了上面贴出来的帮助类:
import java.io.IOException; import java.util.Iterator; import org.apache.hadoop.util.ToolRunner; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.NullWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapred.JobClient; import org.apache.hadoop.mapred.JobConf; import org.apache.hadoop.mapred.MapReduceBase; import org.apache.hadoop.mapred.Mapper; import org.apache.hadoop.mapred.OutputCollector; import org.apache.hadoop.mapred.Reducer; import org.apache.hadoop.mapred.Reporter; import org.apache.hadoop.mapred.TextOutputFormat; import org.apache.hadoop.mapred.lib.MultipleOutputs; import org.apache.hadoop.conf.Configured; import org.apache.hadoop.util.Tool; public class PartitionByStationUsingMultipleOutputs extends Configured implements Tool { static class StationMapper extends MapReduceBase implements Mapper<LongWritable,Text,Text,Text>{ private NcdcRecordParser parser = new NcdcRecordParser(); @Override public void map(LongWritable key, Text value, OutputCollector<Text, Text> output, Reporter reporter) throws IOException { parser.parse(value); output.collect(new Text(parser.getStationId()), value); } } static class MultipleOutputsReducer extends MapReduceBase implements Reducer<Text,Text,NullWritable,Text>{ private MultipleOutputs multipleOutputs; @Override public void configure(JobConf conf){ multipleOutputs = new MultipleOutputs(conf); } @Override public void reduce(Text key, Iterator<Text> values, OutputCollector<NullWritable, Text> output, Reporter reporter) throws IOException { OutputCollector collector = multipleOutputs.getCollector("station", key.toString(),reporter); while(values.hasNext()){ collector.collect(NullWritable.get(), values.next()); } } @Override public void close() throws IOException{ multipleOutputs.close(); } } @Override public int run(String[] args) throws Exception { JobConf conf = JobBuilder.parseInputAndOutput(this, getConf(), args); if (conf == null) { return -1; } conf.setMapperClass(StationMapper.class); conf.setMapOutputKeyClass(Text.class); conf.setReducerClass(MultipleOutputsReducer.class); conf.setOutputKeyClass(NullWritable.class); conf.setOutputFormat(TextOutputFormat.class); MultipleOutputs.addMultiNamedOutput(conf, "station", TextOutputFormat.class, NullWritable.class, Text.class); JobClient.runJob(conf); return 0; } public static void main(String[] args) throws Exception { int exitCode = ToolRunner.run( new PartitionByStationUsingMultipleOutputs(), args); System.exit(exitCode); //args //hdfs://192.168.174.128:9000/user/root/input/Temperature.txt hdfs://192.168.174.128:9000/user/root/output/testMultipleOutputs123456789 } }
下面是运行结果截图:
截图中看到的是以"station"+getStationid()+partNum命名的文件。
上面主要介绍了MapReduce作业使用到的一些常用的输入格式,输出格式。(都是书本上的理论知识了,只能拿书本上的例子来练手,哎 苦于无项目实战,自己买机器配集群太不现实了,而且也搞不到实际需求和实际数据。悲催~~)
低调大师中文资讯倾力打造互联网数据资讯、行业资源、电子商务、移动互联网、网络营销平台。
持续更新报道IT业界、互联网、市场资讯、驱动更新,是最及时权威的产业资讯及硬件资讯报道平台。
转载内容版权归作者及来源网站所有,本站原创内容转载请注明来源。
- 上一篇
Hadoop学习笔记(三):Hive简介
定义 Hive是一个构建在Hadoop上的数据仓库框架。可以将结构化的数据文件映射为一张数据库表,并提供完整的sql查询功能,可以将sql语句转换为MapReduce任务进行运行。 其优点是学习成本低,可以通过类SQL语句快速实现简单的MapReduce统计,不必开发专门的MapReduce应用,十分适合数据仓库的统计分析。(来自百度百科~~) metastore metastore是Hive元数据的集中存放地,metastore包括两部分:服务和后台数据的存储。默认情况下,元数据存储在Derby数据库实例中。由于是本地数据库且不支持并发访问,故多作为练手使用。还一种使用其他数据库的方式称为“Local metastore”,支持并发访问,但metestore服务仍然和hive服务运行在同一个进程中,故称之为本地metastore。还有第三种方式:“Remote metastore”,metastore无武器和hive服务运行在不同的进程内,可以提供更好的可管理性和安全性。(怎么配Local metastore和Remte metastore先不搞了,先拿默认方式练手为主~...
- 下一篇
Hadoop学习笔记(二):MapReduce的特性-计数器、排序
计数器 计数器是一种收集作业统计信息的有效手段,用于质量控制或应用级统计。说白了就是统计整个mr作业所有数据行中符合某个if条件的数量,(除某些内置计数器之外)。仅当一个作业执行成功之后,计数器的值才是完整可靠的。如果一个任务在作业执行期间失败,则相关计数器值会减小,计数器是全局的。 计数器分为以下几种: 1)内置计数器,内置的作业计数器实际上由jobtracker维护,而不必在整个网络中发送; 2)用户自定义的java计数器,由其关联任务维护,并定期传到tasktracker,再由tasktracker传给jobtracker,可以定义多个枚举类型,每个枚举类型有多个字段,枚举类型名称即为组名,枚举字段名称即为计数器名称。 Reporter对象的incrCounter()方法重载: public void incrCounter(enum,long amout) 3)动态计数器,不由java枚举类型定义的计数器,由于在编译阶段就已指定java枚举类型的字段,故无法使用枚举类型动态新建计数器。 Reporter对象的incrC...
相关文章
文章评论
共有0条评论来说两句吧...
文章二维码
点击排行
推荐阅读
最新文章
- Eclipse初始化配置,告别卡顿、闪退、编译时间过长
- Red5直播服务器,属于Java语言的直播服务器
- Docker使用Oracle官方镜像安装(12C,18C,19C)
- Springboot2将连接池hikari替换为druid,体验最强大的数据库连接池
- Docker安装Oracle12C,快速搭建Oracle学习环境
- CentOS6,CentOS7官方镜像安装Oracle11G
- Jdk安装(Linux,MacOS,Windows),包含三大操作系统的最全安装
- CentOS关闭SELinux安全模块
- CentOS7编译安装Gcc9.2.0,解决mysql等软件编译问题
- Windows10,CentOS7,CentOS8安装MongoDB4.0.16