MapReduce编程job概念原理
在Hadoop中,每个MapReduce任务都被初始化为一个job,每个job又可分为两个阶段:map阶段和reduce阶段。这两个阶段分别用两个函数来表示。Map函数接收一个<key,value>形式的输入,然后同样产生一个<ey,value>形式的中间输出,Hadoop会负责将所有具有相同中间key值的value集合在一起传递给reduce函数,reduce函数接收一个如<key,(list of values)>形式的输入,然后对这个value集合进行处理,每个reduce产生0或1个输出,reduce的输出也是<key,value>形式。
简易代码:
public static class Map extends MapReduceBase implments Mapper<LongWritable,Text,Text,IntWritable>{ //设置常量1,用来形成<word,1>形式的输出 private fianll static IntWritable one = new IntWritable(1) private Text word = new Text(); public void map(LongWritable key,Text value,OutputCollector<Text,output,Reporter reporter) throws IOException{ //hadoop执行map函数时为是一行一行的读取数据处理,有多少行,就会执行多少次map函数 String line = value.toString(); //进行单词的分割,可以多传入进行分割的参数 StringTokenizer tokenizer = new StringTokenizer(line); //遍历单词 while(tokenizer.hasMoreTokens()){ //往Text中写入<word,1> word.set(tokenizer.nextToken()); output.collect(word,one); } } } //需要注意的是,reduce将相同key值(这里是word)的value值收集起来,形成<word,list of 1>的形式,再将这些1累加 public static class Reduce extends MapReduceBase implements Reducer<Text IntWritable,Text,IntWritable>{ public void reduce(Text key,Iterator<IntWritable> values,OutputCollector<Text,IntWritable> output,Reporter reporter) throws IOException{ //初始word个数设置 int sum = 0; while(values,hasNext()){ //单词个数相加 sum += value.next().get(); } output.collect(key,new IntWritbale(sum)); } }
执行概念总结:
job.setInputFormatClass(TextInputFormat.class);
1.InputFormat()和inputSplit
inputSplit是Hadoop定义的用来传送给每个单独的map的数据,InputSplit存储的并非数据本身,而是一个分片长度和一个记录数据位置的数组,生成InputSplit的方法可以通过InputFormat(I)来设置。当数据传送给map时,map会将输入分片传送到inputFormat上,InputFormat则调用getREcordReduer()方法生成RecordReader,RecordReader再通过createKey()、createValue()方法创建可供map处理的<key,value>对,即<k1,v1>,简而言之InputFormat方法是用来生成可供map处理的<key,value>对的。
在这里如果不设置的话,TextInputFormat会是Hadoop默认的输入方法,在TextInputFormat中,每个人间(或其一部分)都会单独地作为map的输入,继承自FileInputFormat,之后,每行数据都会生成一条记录,每条记录则表示成<key,value>形式:
其中,key值是每个数据的记录在数据分片中的字节偏移量,数据类型是LongWritable.
value值是每行的内容,数据类型是Text。
job.setOutputValueClass(TextInputFormat.class);
2、OutputFormat
每一种输入格式都有一种输出格式与其对应。同样,默认的输出格式是TextOutputFormat,这种输出方式与输入类似,会将每条记录以一行的形式存入文本文件。不过它的键和值都可以以任意形式的,因为程序内部会调用toString()方法将键和值转换为String类型再输出。
3、map和reduce
map函数接收经过inputFormat处理产生的<k1,v1>,然后输出<k2,v2>,map函数老的版本写法是继承MapReduceBase然后实现Mapper接口,但是现在可以直接继承Mapper接口,此接口是一个泛型类型,有4种形式的参数,分别用来指定map的输入key值类型(LongWritable key),输入value值类型(Text value)、输出key值类型和(Text)输出value值类型(IntWritable,本例是reporter)。
reduce函数以map的输出作为输入,因此reduce的输入类型是<Text,IntWritable>.而reduce的输出是单词和它的数目,因此,它的输出类型是<Text,IntWritable>
4、任务调度
计算方面:Hadoop总会有限将任务分配给空闲的机器,使所有的任务能公平地分享系统资源,I/O方面:Hadoop会尽量将map任务分配给InputSplit所在机器,以减少网络I/O的消耗。
5、数据预处理与InputSplit的大小。
Hadoop会在处理每个block后将其作为一个InputSplit,因此合理地甚至block块大小是很重要的。也可通过合理地设置map任务的数量来调节map任务的数据输入。
6、map和reduce任务的数量
设置map任务槽和reduce任务槽,map/reduce任务槽是这个集群能够同时运行的map/reduce任务的最大数量。可以通过hadoop的配置文件设置每台机器最多可以同时运行map任务和reduce任务的个数,比如有10台机器,设置每台最多可以同时运行10个map任务和5个reduce任务,那么这个集群的map任务槽就是1000,reduce任务槽就是500.一般来说,设置的reduce任务数量应该是reduce任务槽的0.95或是1.75倍
7、combine函数
combine函数是用于在本地合并数据的函数,从wordcount程序中,词频是一个接近于zipf分布的,每个map任务可能会产生成千上万个<the,i>记录,若将这些记录一一传给reduce任务是很耗时的,所以可以设置一个combine函数,用于本地合并,大大减少网络I/O操作的消耗。
job.setCombinerClass(combine.class); //指定reduce函数为combine函数 job.setReducerClass(Reduce.class);
8、Hadoop流的工作原理
当一个可执行文件作为Mapper时,每个map任务会以一个独立的进程启动这个可执行文件,然后在map任务运行时,会把输入切分成行提供给可执行文件,并作为它的标准输入(stdin)内容。当可执行文件运行处结果时,map从标准输出(stdout)中手机数据,并将其转化为<key,value>对,作为map的输出。
参考:<Hadoop实战>

低调大师中文资讯倾力打造互联网数据资讯、行业资源、电子商务、移动互联网、网络营销平台。
持续更新报道IT业界、互联网、市场资讯、驱动更新,是最及时权威的产业资讯及硬件资讯报道平台。
转载内容版权归作者及来源网站所有,本站原创内容转载请注明来源。
- 上一篇
Hadoop2.6.0运行mapreduce之推断(speculative)执行(上)
版权声明:本文为博主原创文章,未经博主允许不得转载。 https://blog.csdn.net/beliefer/article/details/51249119 前言 当一个应用向YARN集群提交作业后,此作业的多个任务由于负载不均衡、资源分布不均等原因都会导致各个任务运行完成的时间不一致,甚至会出现一个任务明显慢于同一作业的其它任务的情况。如果对这种情况不加优化,最慢的任务最终会拖慢整个作业的整体执行进度。好在mapreduce框架提供了任务推断执行机制,当有必要时就启动一个备份任务。最终会采用备份任务和原任务中率先执行完的结果作为最终结果。 由于具体分析推断执行机制,篇幅很长,所以我会分成几篇内容陆续介绍。 推断执行测试 本文在我自己搭建的集群(集群搭建可以参阅《Linux下Hadoop2.6.0集群环境的搭建》一文)上,执行wordcount例子,来验证mapreduce框架的推断机制。我们输入以下命令: hadoop jar share/hadoop/mapreduce/hadoop-mapreduce-examples-2.6.0.jar wordcount -D map...
- 下一篇
Storm Topology及分组原理
Storm的通信机制,需要满足如下一些条件以满足Storm的语义。 1、建立数据传输的缓冲区。在通信连接没有建立之前把发送的数据缓存起来。数据发送方可以在连接建立之前发送消息,而不需要等连接建立起来,可是的接收方是独立运行的。 2、在消息传输层保证消息最多只能发送一次,Storm系统有ACK机制,是的没有被发送成功的消息会被重发,若消息层面也重发,会导致消息发送多次。 这种消息机制由两个接口来定义,backtype.storm.messaging.IContext和backtype.storm.messaging.IConnection. IContext负责客户端和服务器端建立的连接,主要有四个方法。 1、prepare(Map stormConf):总从Storm定义的prepare方法,可以接收storm的配置。 2、term():终止,方法会在worker卸载这个传输插件的时候调用,自定义实现时可以在这里释放占用的资源。 3、bind(String topologyId,int port):建立服务器端的连接。 4、connect(String stormId,String h...
相关文章
文章评论
共有0条评论来说两句吧...