通过代码来实现.
我们需要3样东西: 一个map函数, 一个reduce函数和一些用来运行作业的代码.
map函数由Mapper接口实现来表示, 后者声明了一个map()方法.
import java.io.IOException;
// 是hadoop针对流处理优化的类型
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
// 会继承这个基类
import org.apache.hadoop.mapred.MapReduceBase;
// 会实现这个接口
import org.apache.hadoop.mapred.Mapper;
// 处理后数据由它来收集
import org.apache.hadoop.mapred.OoutputCollector;
import org.apache.hadoop.mapred.Reporter;
// 虽然还没有开始系统学习java语法, 我猜测, extends是继承基类,
// implements 是实现接口, java把它们语法上分开了
public class MaxTemperatureMapper extends MapReduceBase
// Mapper是一个泛型接口
implements Mapper<LongWritable, Text, Text, IntWritable> {
private static final int MISSING = 9999;
public void map(LongWritable key, Text value,
OutputCollector<Text, IntWritable> output, Reporter reporter)
throws IOException {
String line = value.toString();
String year = line.substring(15, 19);
int airTemperature;
if( line.charAt(87) == '+') {// parseInt doesn't like leading plus signs
airTemperature = Integer.parseInt(line.substring(88, 92));
} else {
airTemperature = Integer.parseInt(line.substring(87, 92));
}
String quality = line.substring(92, 93);
if(airTemperature != MISSING && quality.matches("01459]")) {
output.collect(new Text(year), new IntWritable(airtemperature));
}
}
}
Mapper是一个泛型接口:
Mapper<LongWritable, Text, Text, IntWritable>
它有4个形参类型, 分别是map函数的输入键, 输入值, 输出键和输出值的类型.
就目前来说, 输入键是长整数偏移量, 输入值是一行文本, 输出键是年份, 输出值是气温(整数).
Hadoop提供了一套可优化网络序列化传输的基本类型, 不直接使用java内嵌的类型. 在这里, LongWritable 相当于 Long, IntWritable 相当于 Int, Text 相当于 String.
map() 方法的输入是一个键和一个值.
map() 还提供了 OutputCollector 实例用于输出内容的写入.
reduce函数通过Reducer进行类似的定义.
import java.io.IOException;
import java.util.Iterator;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapred.MapReduceBase;
import org.apache.hadoop.mapred.OutputCollector;
import org.apache.hadoop.mapred.Reducer;
import org.apache.hadoop.mapred.Reporter;
public class MaxTemperatureReducer extends MapReduceBase
implements Reducer<Text, IntWritable, Text, IntWritable> {
public void reduce(Text key, Iterator<IntWritable> values,
OutputCollector<Text, Intwritable> output, Reporter reporter)
throws IOException {
int maxValue = Integer.MIN_VALUE;
while (values.hasNext()) {
maxValue = Math.max(maxValue, values.next().get());
}
output.collect(key, new IntWritable(maxValue));
}
}
reduce函数的输入键值对必须与map函数的输出键值对匹配.
reduce函数的输出键值对类型为Text/IntWritable, 分别表示年份和最高气温.
第三部分的代码为负责运行MapReduce的作业.
import java.io.IOException;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapred.FileInputFormat;
import org.apache.hadoop.mapred.FileOutputFormat;
import org.apache.hadoop.mapred.JobClient;
import org.apache.hadoop.mapred.JobConf;
public class MaxTemperature {
public static void main(String[] args) throws IOException {
if(args.length !=2 ) {
System.err.println("Usage: MaxTemperature <input path> <output path>");
System.exit(-1);
}
JobConf conf = new JobConf(MaxTemperatuer.class);
conf.setJobName("Max temperature");
FileInputFormat.addInputPath(conf, new Path(args[0]));
FileOutputFormat.setOutputPath(conf, new Path(args[1]));
conf.setMapperClass(MaxTemperatuerMapper.class);
conf.setReducerClass(MaxTemperatuerReducer.class);
conf.setOutputKeyClass(Text.class);
conf.setOutputValueClass(IntWritable.class);
JobClient.runJob(conf);
}
}
JobConf 对象制定了作业的执行规范. 构造函数的参数为作业所在的类, Hadoop会通过该类来查找包含给类的JAR文件.
构造 JobConf 对象后, 制定输入和输出数据的路径. 这里是通过 FileInputFormat 的静态方法 addInputPath() 来定义输入数据的路径, 路径可以是单个文件, 也可以是目录(即目录下的所有文件)或符合特定模式的一组文件. 可以多次调用(从名称可以看出, addInputPath() ).
同理, FileOutputFormat.setOutputPath() 指定输出路径. 即写入目录. 运行作业前, 写入目录不应该存在, Hadoop会拒绝并报错. 这样设计, 主要是防止数据被覆盖, 数据丢失. 毕竟Hadoop运行的时间是很长的, 丢失了非常恼人.
FileOutputFormat.setOutputPath() 和 conf.setMapperClass() 指定map和reduce类型.
接着, setOutputKeyClass 和 setOutputValueClass 指定map和reduce函数的 输出 类型, 这两个函数的输出类型往往相同. 如果不同, map的输出函数类型通过 setMapOutputKeyClass 和 setMapOutputValueClass 指定.
输入的类型用 InputFormat 设置, 本例中没有指定, 使用的是默认的 TextInputFormat (文本输入格式);
最后, JobClient.runJob() 会提交作业并等待完成, 将结果写到控制台.