Hapdoop的一个Mapreduce示例代码--统计单词个数有排序功能
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 package com.mzsx.hadoop; import java.io.IOException; import java.util.Random; import java.util.StringTokenizer; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.io.WritableComparable; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.Mapper; import org.apache.hadoop.mapreduce.Reducer; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; public class MySortWordCount { public static class MyMapper extends Mapper<Object, Text, Text, IntWritable> { private final static IntWritable one = new IntWritable( 1 ); // 类似于int类型 private Text word = new Text(); // 可以理解成String类型 public void map(Object key, Text value, Context context) throws IOException, InterruptedException { System.err.println(key + "," + value); // 默认情况下即根据空格分隔字符串 String tmp=value.toString(); tmp=tmp.replace( '\'' , ' ' ); tmp=tmp.replace( '.' , ' ' ); tmp=tmp.replace( ',' , ' ' ); tmp=tmp.replace( ':' , ' ' ); tmp=tmp.replace( '!' , ' ' ); tmp=tmp.replace( ';' , ' ' ); tmp=tmp.replace( '?' , ' ' ); tmp=tmp.replace( '`' , ' ' ); tmp=tmp.replace( '"' , ' ' ); tmp=tmp.replace( '&' , ' ' ); tmp=tmp.replace( '(' , ' ' ); tmp=tmp.replace( ')' , ' ' ); tmp=tmp.replace( '-' , ' ' ); StringTokenizer itr = new StringTokenizer(tmp); while (itr.hasMoreTokens()) { word.set(itr.nextToken()); context.write(word, one); } }; } // Reducer<KEYIN, VALUEIN, KEYOUT, VALUEOUT> public static class MyReducer extends Reducer<Text, IntWritable, Text, IntWritable> { private IntWritable result = new IntWritable(); protected void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException { System.err.println(key + "," + values); int sum = 0 ; for (IntWritable val : values) { sum += val.get(); } result.set(sum); ; context.write(key, result); // 这是最后结果 }; } public static class SortMapper extends Mapper<Object, Text, IntWritable,Text>{ public void map(Object key, Text value, Context context) throws IOException, InterruptedException { IntWritable times = new IntWritable( 1 ); Text password = new Text(); String eachline=value.toString(); String[] eachterm =eachline.split( "\t" ); password.set(eachterm[ 0 ]); times.set(Integer.parseInt(eachterm[ 1 ])); context.write(times,password); } } public static class SortReducer extends Reducer<IntWritable,Text,IntWritable,Text> { private Text password = new Text(); public void reduce(IntWritable key,Iterable<Text> values, Context context) throws IOException, InterruptedException { for (Text val : values) { password.set(val); context.write(key,password); } } } private static class IntDecreasingComparator extends IntWritable.Comparator { public int compare(WritableComparable a, WritableComparable b) { //return -super.compare(a, b); return super .compare(a, b); } public int compare( byte [] b1, int s1, int l1, byte [] b2, int s2, int l2) { //return -super.compare(b1, s1, l1, b2, s2, l2); return super .compare(b1, s1, l1, b2, s2, l2); } } public static void main(String[] args) throws Exception { // 声明配置信息 Configuration conf = new Configuration(); // 声明Job Job job = new Job(conf, "Word Count" ); // 设置工作类 job.setJarByClass(MySortWordCount. class ); // 设置mapper类 job.setMapperClass(MyMapper. class ); // 可选 job.setCombinerClass(MyReducer. class ); // 设置合并计算类 job.setReducerClass(MyReducer. class ); // 设置key为String类型 job.setOutputKeyClass(Text. class ); // 设置value为int类型 job.setOutputValueClass(IntWritable. class ); //job.setInputFormatClass(KeyValueTextInputFormat.class); // 设置或是接收输入输出 /*FileInputFormat.setInputPaths(job, new Path("/user/root/aoman.txt")); FileOutputFormat.setOutputPath(job, new Path("/user/root/r3")); // 执行 System.exit(job.waitForCompletion(true) ? 0 : 1);*/ //定义一个临时目录,先将词频统计任务的输出结果写到临时目录中, 下一个排序任务以临时目录为输入目录。 FileInputFormat.addInputPath(job, new Path( "/user/root/aoman.txt" )); Path tempDir = new Path( "MySortWordCount-temp-" + Integer.toString( new Random().nextInt(Integer.MAX_VALUE))); FileOutputFormat.setOutputPath(job, tempDir); if (job.waitForCompletion( true )) { Job sortJob = new Job(conf, "csdnsort" ); sortJob.setJarByClass(MySortWordCount. class ); FileInputFormat.addInputPath(sortJob, tempDir); sortJob.setMapperClass(SortMapper. class ); FileOutputFormat.setOutputPath(sortJob, new Path( "/user/root/sort1" )); sortJob.setOutputKeyClass(IntWritable. class ); sortJob.setOutputValueClass(Text. class ); sortJob.setSortComparatorClass(IntDecreasingComparator. class ); FileSystem.get(conf).deleteOnExit(tempDir); System.exit(sortJob.waitForCompletion( true ) ? 0 : 1 ); } System.exit(job.waitForCompletion( true ) ? 0 : 1 ); } } 版权声明:原创作品,如需转载,请注明出处。否则将追究法律责任 本文转自 梦朝思夕 51CTO博客,原文链接:http://blog.51cto.com/qiangmzsx/1404661