MapReduce框架Mapper和Reducer类源码分析
一:Mapper类
在Hadoop的mapper类中,有4个主要的函数,分别是:setup,cleanup,map,run。代码如下:
- protected void setup(Context context) throws IOException, InterruptedException {
- // NOTHING
- }
- protected void map(KEYIN key, VALUEIN value,
- Context context) throws IOException, InterruptedException {
- context.write((KEYOUT) key, (VALUEOUT) value);
- }
- protected void cleanup(Context context) throws IOException, InterruptedException {
- // NOTHING
- }
- public void run(Context context) throws IOException, InterruptedException {
- setup(context);
- while (context.nextKeyValue()) {
- map(context.getCurrentKey(), context.getCurrentValue(), context);
- }
- cleanup(context);
- }
- }
二:Reducer类
- /**
- * Called once at the start of the task.
- */
- protected void setup(Context context
- ) throws IOException, InterruptedException {
- // NOTHING
- }
- /**
- * This method is called once for each key. Most applications will define
- * their reduce class by overriding this method. The default implementation
- * is an identity function.
- */
- @SuppressWarnings("unchecked")
- protected void reduce(KEYIN key, Iterable<VALUEIN> values, Context context
- ) throws IOException, InterruptedException {
- for(VALUEIN value: values) {
- context.write((KEYOUT) key, (VALUEOUT) value);
- }
- }
- /**
- * Called once at the end of the task.
- */
- protected void cleanup(Context context
- ) throws IOException, InterruptedException {
- // NOTHING
- }
- /*
- * control how the reduce task works.
- */
- @SuppressWarnings("unchecked")
- public void run(Context context) throws IOException, InterruptedException {
- setup(context);
- while (context.nextKey()) {
- reduce(context.getCurrentKey(), context.getValues(), context);
- // If a back up store is used, reset it
- ((ReduceContext.ValueIterator)
- (context.getValues().iterator())).resetBackupStore();
- }
- cleanup(context);
- }
- }
关注公众号
低调大师中文资讯倾力打造互联网数据资讯、行业资源、电子商务、移动互联网、网络营销平台。
持续更新报道IT业界、互联网、市场资讯、驱动更新,是最及时权威的产业资讯及硬件资讯报道平台。
转载内容版权归作者及来源网站所有,本站原创内容转载请注明来源。
-
上一篇
MapReduce 二次排序详解
1 首先说一下工作原理: 在map阶段,使用job.setInputFormatClass定义的InputFormat将输入的数据集分割成小数据块splites,同时InputFormat提供一个RecordReder的实现。本例子中使用的是TextInputFormat,他提供的RecordReder会将文本的一行的行号作为key,这一行的文本作为value。这就是自定义Map的输入是<LongWritable, Text>的原因。然后调用自定义Map的map方法,将一个个<LongWritable, Text>对输入给Map的map方法。注意输出应该符合自定义Map中定义的输出<IntPair, IntWritable>。最终是生成一个List<IntPair, IntWritable>。在map阶段的最后,会先调用job.setPartitionerClass对这个List进行分区,每个分区映射到一个reducer。每个分区内又调用job.setSortComparatorClass设置的key比较函数类排序。可以看到,这本身就是一...
-
下一篇
MapReduce框架Partitioner分区方法
前言:对于二次排序相信大家也是似懂非懂,我也是一样,对其中的很多方法都不理解诶,所有只有暂时放在一边,当你接触到其他的函数,你知道的越多时你对二次排序的理解也就更深入了,同时建议大家对wordcount的流程好好分析一下,要真正的知道每一步都是干什么的。 1.Partitioner分区类的作用是什么?2.getPartition()三个参数分别是什么?3.numReduceTasks指的是设置的Reducer任务数量,默认值是是多少?扩展:如果不同类型的数据被分配到了同一个分区,输出的数据是否还是有序的? 在进行MapReduce计算时,有时候需要把最终的输出数据分到不同的文件中,比如按照省份划分的话,需要把同一省份的数据放到一个文件中;按照性别划分的话,需要把同一性别的数据放到一个文件中。我们知道最终的输出数据是来自于Reducer任务。那么,如果要得到多个文件,意味着有同样数量的Reducer任务在运行。Reducer任务的数据来自于Mapper任务,也就说Mapper任务要划分数据,对于不同的数据分配给不同的Reducer任务运行。Mapper任务划分数据的过程就称作Partit...
相关文章
文章评论
共有0条评论来说两句吧...
文章二维码
点击排行
推荐阅读
最新文章
- Docker使用Oracle官方镜像安装(12C,18C,19C)
- CentOS7设置SWAP分区,小内存服务器的救世主
- Crontab安装和使用
- CentOS7编译安装Gcc9.2.0,解决mysql等软件编译问题
- CentOS6,7,8上安装Nginx,支持https2.0的开启
- MySQL数据库在高并发下的优化方案
- Dcoker安装(在线仓库),最新的服务器搭配容器使用
- SpringBoot2整合MyBatis,连接MySql数据库做增删改查操作
- SpringBoot2初体验,简单认识spring boot2并且搭建基础工程
- Springboot2将连接池hikari替换为druid,体验最强大的数据库连接池

微信收款码
支付宝收款码