我们照着Hadoop教程简单的写了一个例子,它可以用于分析天气数据然后找到某年的最高气温。
我们是用hadoop 0.20的新的API写的,具体代码如下:
Mapper类:
-
-
- package com.charles.parseweather;
-
- import java.io.IOException;
-
- import org.apache.hadoop.io.IntWritable;
- import org.apache.hadoop.io.LongWritable;
- import org.apache.hadoop.io.Text;
- import org.apache.hadoop.mapreduce.Mapper;
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
- public class MaxTemperatureMapper extends Mapper<LongWritable, Text, Text, IntWritable> {
-
-
- private static final int MISSING = 9999;
-
-
-
-
- public void map(LongWritable key,Text value ,Context context)
- throws IOException ,InterruptedException{
-
-
-
- String line = value.toString();
-
- String year = line.substring(15,19);
-
-
- int airTemperature;
-
-
-
-
-
- if(line.charAt(87) == '+'){
- airTemperature = Integer.parseInt(line.substring(88,92));
- }else{
- airTemperature = Integer.parseInt(line.substring(87,92));
- }
-
-
- String quantity = line.substring(92,93);
-
- if(airTemperature != MISSING && quantity.matches("[01459]")){
-
-
- context.write(new Text(year),new IntWritable(airTemperature));
- }
-
- }
-
- }
Reducer类:
-
-
- package com.charles.parseweather;
-
- import java.io.IOException;
-
- import org.apache.hadoop.io.IntWritable;
- import org.apache.hadoop.io.Text;
- import org.apache.hadoop.mapreduce.Reducer;
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
- public class MaxTemperatureReducer extends Reducer<Text, IntWritable, Text, IntWritable> {
-
-
-
-
- public void reduce(Text key,Iterable<IntWritable> values, Context context)
- throws IOException, InterruptedException{
-
-
- int maxValue=Integer.MIN_VALUE;
-
- for (IntWritable value : values){
- maxValue=Math.max(maxValue, value.get());
- }
-
-
- context.write(key,new IntWritable(maxValue));
- }
- }
驱动类,它会负责提交数据给Map-Reduce过程,然后输出结果:
- package com.charles.parseweather;
-
-
- import org.apache.hadoop.conf.Configuration;
- import org.apache.hadoop.fs.Path;
- import org.apache.hadoop.io.IntWritable;
- import org.apache.hadoop.io.Text;
- import org.apache.hadoop.mapreduce.Job;
- import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
- import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
- import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
- import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
-
-
-
-
-
-
-
-
-
-
-
-
- public class MaxTemperature {
-
-
-
-
- public static void main(String[] args) throws Exception{
-
-
-
- if (args.length !=2){
- System.err.println("Usage: MaxTemperature <input path> <output path>");
- System.exit(-1);
- }
-
-
- Configuration conf = new Configuration();
- conf.set("hadoop.job.ugi", "hadoop-user,hadoop-user");
-
- Job job = new Job(conf,"Get Maximum Weather Information! ^_^");
-
-
- job.setJarByClass(MaxTemperature.class);
-
-
- FileInputFormat.addInputPath(job, new Path(args[0]));
- FileOutputFormat.setOutputPath(job, new Path(args[1]));
-
-
- job.setMapperClass(MaxTemperatureMapper.class);
- job.setReducerClass(MaxTemperatureReducer.class);
- job.setOutputKeyClass(Text.class);
- job.setOutputValueClass(IntWritable.class);
-
- System.exit(job.waitForCompletion(true)?0:1);
-
-
-
-
-
-
- }
-
- }
因为驱动要2个参数,第一个参数是HDFS文件系统中包含样本数据的文件位置,第二个参数是HDFS文件系统中处理完后的输出文件所在的目录。
我们先把样本数据(1901.txt)放入指定位置并且确认其存在:
![]()
当然了,你也可以在IDE中用Hadoop 视图来查看文件系统:
![]()
然后传入2个参数 (HDFS中输入文件位置和输出目录):
![]()
运行时报错,堆内存溢出:
![]()
因为默认JDK设置的堆内存是64M,所以我们把它调大:
![]()
最终运行成功,并且正确的显示整个过程在控制台上:
- 12/05/25 18:33:37 INFO jvm.JvmMetrics: Initializing JVM Metrics with processName=JobTracker, sessionId=
- 12/05/25 18:33:37 WARN mapred.JobClient: Use GenericOptionsParser for parsing the arguments. Applications should implement Tool for the same.
- 12/05/25 18:33:37 WARN mapred.JobClient: No job jar file set. User classes may not be found. See JobConf(Class) or JobConf#setJar(String).
- 12/05/25 18:33:42 INFO input.FileInputFormat: Total input paths to process : 1
- 12/05/25 18:33:42 INFO mapred.JobClient: Running job: job_local_0001
- 12/05/25 18:33:42 INFO input.FileInputFormat: Total input paths to process : 1
- 12/05/25 18:33:42 INFO mapred.MapTask: io.sort.mb = 100
- 12/05/25 18:33:42 INFO mapred.MapTask: data buffer = 79691776/99614720
- 12/05/25 18:33:42 INFO mapred.MapTask: record buffer = 262144/327680
- 12/05/25 18:33:42 INFO mapred.MapTask: Starting flush of map output
- 12/05/25 18:33:43 INFO mapred.MapTask: Finished spill 0
- 12/05/25 18:33:43 INFO mapred.TaskRunner: Task:attempt_local_0001_m_000000_0 is done. And is in the process of commiting
- 12/05/25 18:33:43 INFO mapred.LocalJobRunner:
- 12/05/25 18:33:43 INFO mapred.TaskRunner: Task 'attempt_local_0001_m_000000_0' done.
- 12/05/25 18:33:43 INFO mapred.LocalJobRunner:
- 12/05/25 18:33:43 INFO mapred.Merger: Merging 1 sorted segments
- 12/05/25 18:33:43 INFO mapred.Merger: Down to the last merge-pass, with 1 segments left of total size: 72206 bytes
- 12/05/25 18:33:43 INFO mapred.LocalJobRunner:
- 12/05/25 18:33:43 INFO mapred.TaskRunner: Task:attempt_local_0001_r_000000_0 is done. And is in the process of commiting
- 12/05/25 18:33:43 INFO mapred.LocalJobRunner:
- 12/05/25 18:33:43 INFO mapred.TaskRunner: Task attempt_local_0001_r_000000_0 is allowed to commit now
- 12/05/25 18:33:43 INFO output.FileOutputCommitter: Saved output of task 'attempt_local_0001_r_000000_0' to hdfs://192.168.129.35:9000/user/hadoop-user/output
- 12/05/25 18:33:43 INFO mapred.LocalJobRunner: reduce > reduce
- 12/05/25 18:33:43 INFO mapred.TaskRunner: Task 'attempt_local_0001_r_000000_0' done.
- 12/05/25 18:33:43 INFO mapred.JobClient: map 100% reduce 100%
- 12/05/25 18:33:43 INFO mapred.JobClient: Job complete: job_local_0001
- 12/05/25 18:33:43 INFO mapred.JobClient: Counters: 14
- 12/05/25 18:33:43 INFO mapred.JobClient: FileSystemCounters
- 12/05/25 18:33:43 INFO mapred.JobClient: FILE_BYTES_READ=105868
- 12/05/25 18:33:43 INFO mapred.JobClient: HDFS_BYTES_READ=1776380
- 12/05/25 18:33:43 INFO mapred.JobClient: FILE_BYTES_WRITTEN=212428
- 12/05/25 18:33:43 INFO mapred.JobClient: HDFS_BYTES_WRITTEN=9
- 12/05/25 18:33:43 INFO mapred.JobClient: Map-Reduce Framework
- 12/05/25 18:33:43 INFO mapred.JobClient: Reduce input groups=1
- 12/05/25 18:33:43 INFO mapred.JobClient: Combine output records=0
- 12/05/25 18:33:43 INFO mapred.JobClient: Map input records=6565
- 12/05/25 18:33:43 INFO mapred.JobClient: Reduce shuffle bytes=0
- 12/05/25 18:33:43 INFO mapred.JobClient: Reduce output records=1
- 12/05/25 18:33:43 INFO mapred.JobClient: Spilled Records=13128
- 12/05/25 18:33:43 INFO mapred.JobClient: Map output bytes=59076
- 12/05/25 18:33:43 INFO mapred.JobClient: Combine input records=0
- 12/05/25 18:33:43 INFO mapred.JobClient: Map output records=6564
- 12/05/25 18:33:43 INFO mapred.JobClient: Reduce input records=6564
从控制台,我们可以看到如下的事实:
第2行:使用GenericOptionsParser可以来解析我们传入的参数(输入文件,输出目录),所以分析结果是只有一个输入文件。
第5行:为我们的作业(也就是这个main应用)分配了一个作业id叫job_local_0001
第7到11行:是MapTask的任务,它对Map进行了设置。
第7行:io.sort.mb表明map输出结果在内存中占用的buffer的大小,这里设为100MB,因为map的输出不直接写硬盘,而是写入缓存,直到缓存到达一定数量,则后台线程会去写硬盘。
第11行:每次内存向硬盘flush数据会产生一个spill文件,所以这一行就是这个 spill文件。
第12-14行:map任务完成,这个map任务的id为attempt_local_0001_m_000000_0
第16-17行:进行合并(Merge)Map 过程的结果。
第18-19行:reduce任务完成,这个reduce任务的id为attempt_local_0001_r_000000_0
第20-24行:reduce任务用于提交结果到HDFS文件系统,结果文件存放在命令行参数指定的目录中。
第25-43行:是map-reduce过程的一个总结性报告。
我们校验文件系统,也发现了最终结果是我们所希望的(1901年最高气温为37度):
![]()
我们还不满足,我们要继续看namenode,datanode,secondarynode做的具体事情。
namenode不做具体计算,而是吧整个文件分块然后分配给datanode:
- 2012-05-25 18:31:42,170 INFO org.apache.hadoop.hdfs.server.namenode.FSNamesystem.audit: ugi=hadoop-user,hadoop-user ip=/192.168.40.16 cmd=open src=/user/hadoop-user/input/1901.txt dst=null perm=null
- 2012-05-25 18:31:42,284 INFO org.apache.hadoop.hdfs.server.namenode.FSNamesystem.audit: ugi=hadoop-user,hadoop-user ip=/192.168.40.16 cmd=open src=/user/hadoop-user/input/1901.txt dst=null perm=null
- 2012-05-25 18:31:42,287 INFO org.apache.hadoop.hdfs.server.namenode.FSNamesystem: Number of transactions: 21 Total time for transactions(ms): 0Number of transactions batched in Syncs: 1 Number of syncs: 14 SyncTimes(ms): 8
- 2012-05-25 18:31:42,288 INFO org.apache.hadoop.hdfs.server.namenode.FSNamesystem.audit: ugi=hadoop-user,hadoop-user ip=/192.168.40.16 cmd=mkdirs src=/user/hadoop-user/output/_temporary dst=null perm=hadoop-user:supergroup:rwxr-xr-x
- 2012-05-25 18:31:42,398 INFO org.apache.hadoop.hdfs.server.namenode.FSNamesystem.audit: ugi=hadoop-user,hadoop-user ip=/192.168.40.16 cmd=open src=/user/hadoop-user/input/1901.txt dst=null perm=null
- 2012-05-25 18:31:43,033 INFO org.apache.hadoop.hdfs.server.namenode.FSNamesystem.audit: ugi=hadoop-user,hadoop-user ip=/192.168.40.16 cmd=create src=/user/hadoop-user/output/_temporary/_attempt_local_0001_r_000000_0/part-r-00000 dst=null perm=hadoop-user:supergroup:rw-r
- 2012-05-25 18:31:43,054 INFO org.apache.hadoop.hdfs.StateChange: BLOCK* NameSystem.allocateBlock: /user/hadoop-user/output/_temporary/_attempt_local_0001_r_000000_0/part-r-00000. blk_624828232551808657_1006
- 2012-05-25 18:31:43,068 INFO org.apache.hadoop.hdfs.StateChange: BLOCK* NameSystem.addStoredBlock: blockMap updated: 192.168.129.35:50010 is added to blk_624828232551808657_1006 size 9
- 2012-05-25 18:31:43,074 INFO org.apache.hadoop.hdfs.StateChange: DIR* NameSystem.completeFile: file /user/hadoop-user/output/_temporary/_attempt_local_0001_r_000000_0/part-r-00000 is closed by DFSClient_281756056
- 2012-05-25 18:31:43,079 INFO org.apache.hadoop.hdfs.server.namenode.FSNamesystem.audit: ugi=hadoop-user,hadoop-user ip=/192.168.40.16 cmd=listStatus src=/user/hadoop-user/output/_temporary/_attempt_local_0001_r_000000_0 dst=null perm=null
- 2012-05-25 18:31:43,081 INFO org.apache.hadoop.hdfs.server.namenode.FSNamesystem.audit: ugi=hadoop-user,hadoop-user ip=/192.168.40.16 cmd=mkdirs src=/user/hadoop-user/output dst=null perm=hadoop-user:supergroup:rwxr-xr-x
- 2012-05-25 18:31:43,084 INFO org.apache.hadoop.hdfs.server.namenode.FSNamesystem.audit: ugi=hadoop-user,hadoop-user ip=/192.168.40.16 cmd=rename src=/user/hadoop-user/output/_temporary/_attempt_local_0001_r_000000_0/part-r-00000 dst=/user/hadoop-user/output/part-r-00000 perm=hadoop-user:supergroup:rw-r
- 2012-05-25 18:31:43,086 INFO org.apache.hadoop.hdfs.server.namenode.FSNamesystem.audit: ugi=hadoop-user,hadoop-user ip=/192.168.40.16 cmd=delete src=/user/hadoop-user/output/_temporary/_attempt_local_0001_r_000000_0 dst=null perm=null
- 2012-05-25 18:31:43,090 INFO org.apache.hadoop.hdfs.server.namenode.FSNamesystem.audit: ugi=hadoop-user,hadoop-user ip=/192.168.40.16 cmd=delete src=/user/hadoop-user/output/_temporary dst=null perm=null
- 2012-05-25 18:32:09,469 INFO org.apache.hadoop.hdfs.server.namenode.FSNamesystem.audit: ugi=root,Domain,Users,Remote,Desktop,Users,Users ip=/192.168.40.16 cmd=listStatus src=/ dst=null perm=null
datanode是做具体的计算任务:
- 2012-05-25 17:41:20,336 INFO org.apache.hadoop.hdfs.server.datanode.DataNode.clienttrace: src: /192.168.129.35:43564, dest: /192.168.129.35:50010, bytes: 888190, op: HDFS_WRITE, cliID: DFSClient_150980970, srvID: DS-1002949858-192.168.129.35-50010-1337839176422, blockid: blk_-3989731445395160971_1003
- 2012-05-25 17:41:20,336 INFO org.apache.hadoop.hdfs.server.datanode.DataNode: PacketResponder 0 for block blk_-3989731445395160971_1003 terminating
- 2012-05-25 17:49:18,074 INFO org.apache.hadoop.hdfs.server.datanode.DataBlockScanner: Verification succeeded for blk_-3989731445395160971_1003
- 2012-05-25 17:50:46,190 INFO org.apache.hadoop.hdfs.server.datanode.DataNode: BlockReport of 2 blocks got processed in 1 msecs
- 2012-05-25 17:50:57,918 INFO org.apache.hadoop.hdfs.server.datanode.DataNode.clienttrace: src: /192.168.129.35:50010, dest: /192.168.40.16:3288, bytes: 895130, op: HDFS_READ, cliID: DFSClient_-1875022449, srvID: DS-1002949858-192.168.129.35-50010-1337839176422, blockid: blk_-3989731445395160971_1003
- 2012-05-25 18:02:43,887 INFO org.apache.hadoop.hdfs.server.datanode.DataNode: Receiving block blk_-3335953923528359002_1004 src: /192.168.129.35:43677 dest: /192.168.129.35:50010
- 2012-05-25 18:02:43,906 INFO org.apache.hadoop.hdfs.server.datanode.DataNode.clienttrace: src: /192.168.129.35:43677, dest: /192.168.129.35:50010, bytes: 8117, op: HDFS_WRITE, cliID: DFSClient_-1876464305, srvID: DS-1002949858-192.168.129.35-50010-1337839176422, blockid: blk_-3335953923528359002_1004
- 2012-05-25 18:02:43,918 INFO org.apache.hadoop.hdfs.server.datanode.DataNode: PacketResponder 0 for block blk_-3335953923528359002_1004 terminating
- 2012-05-25 18:10:20,759 INFO org.apache.hadoop.hdfs.server.datanode.DataBlockScanner: Verification succeeded for blk_-3335953923528359002_1004
- 2012-05-25 18:14:38,593 INFO org.apache.hadoop.hdfs.server.datanode.DataNode.clienttrace: src: /192.168.129.35:50010, dest: /192.168.40.16:3417, bytes: 8181, op: HDFS_READ, cliID: DFSClient_-677944339, srvID: DS-1002949858-192.168.129.35-50010-1337839176422, blockid: blk_-3335953923528359002_1004
- 2012-05-25 18:14:38,985 INFO org.apache.hadoop.hdfs.server.datanode.DataNode: Receiving block blk_1243738109451377502_1005 src: /192.168.40.16:3418 dest: /192.168.129.35:50010
- 2012-05-25 18:14:38,997 INFO org.apache.hadoop.hdfs.server.datanode.DataNode.clienttrace: src: /192.168.40.16:3418, dest: /192.168.129.35:50010, bytes: 8, op: HDFS_WRITE, cliID: DFSClient_-677944339, srvID: DS-1002949858-192.168.129.35-50010-1337839176422, blockid: blk_1243738109451377502_1005
- 2012-05-25 18:14:38,997 INFO org.apache.hadoop.hdfs.server.datanode.DataNode: PacketResponder 0 for block blk_1243738109451377502_1005 terminating
- 2012-05-25 18:14:59,063 INFO org.apache.hadoop.hdfs.server.datanode.DataNode.clienttrace: src: /192.168.129.35:50010, dest: /192.168.40.16:3419, bytes: 12, op: HDFS_READ, cliID: DFSClient_-1875022449, srvID: DS-1002949858-192.168.129.35-50010-1337839176422, blockid: blk_1243738109451377502_1005
而secondarynode是和namenode进行通信,从而设置检查点以便namenode出故障后的恢复:
- 2012-05-25 18:05:04,619 INFO org.apache.hadoop.hdfs.server.namenode.FSNamesystem: Number of transactions: 0 Total time for transactions(ms): 0Number of transactions batched in Syncs: 0 Number of syncs: 0 SyncTimes(ms): 0
- 2012-05-25 18:05:04,626 INFO org.apache.hadoop.hdfs.server.namenode.SecondaryNameNode: Downloaded file fsimage size 789 bytes.
- 2012-05-25 18:05:04,626 INFO org.apache.hadoop.hdfs.server.namenode.SecondaryNameNode: Downloaded file edits size 1261 bytes.
- 2012-05-25 18:05:04,628 INFO org.apache.hadoop.hdfs.server.namenode.FSNamesystem: fsOwner=hadoop-user,hadoop-user
- 2012-05-25 18:05:04,628 INFO org.apache.hadoop.hdfs.server.namenode.FSNamesystem: supergroup=supergroup
- 2012-05-25 18:05:04,628 INFO org.apache.hadoop.hdfs.server.namenode.FSNamesystem: isPermissionEnabled=true
- 2012-05-25 18:05:04,629 INFO org.apache.hadoop.hdfs.server.common.Storage: Number of files = 8
- 2012-05-25 18:05:04,630 INFO org.apache.hadoop.hdfs.server.common.Storage: Number of files under construction = 0
- 2012-05-25 18:05:04,633 INFO org.apache.hadoop.hdfs.server.common.Storage: Edits file /tmp/hadoop-hadoop-user/dfs/namesecondary/current/edits of size 1261 edits # 16 loaded in 0 seconds.
- 2012-05-25 18:05:04,638 INFO org.apache.hadoop.hdfs.server.common.Storage: Image file of size 1208 saved in 0 seconds.
- 2012-05-25 18:05:04,640 INFO org.apache.hadoop.hdfs.server.namenode.FSNamesystem: Number of transactions: 0 Total time for transactions(ms): 0Number of transactions batched in Syncs: 0 Number of syncs: 0 SyncTimes(ms): 0
- 2012-05-25 18:05:04,642 INFO org.apache.hadoop.hdfs.server.namenode.SecondaryNameNode: Posted URL 0.0.0.0:50070putimage=1&port=50090&machine=192.168.129.35&token=-18:1535880146:0:1337940304000:1337936704022
- 2012-05-25 18:05:04,660 WARN org.apache.hadoop.hdfs.server.namenode.SecondaryNameNode: Checkpoint done. New Image Size: 1208
本文转自 charles_wang888 51CTO博客,原文链接:http://blog.51cto.com/supercharles888/878422,如需转载请自行联系原作者