Hadoop框架:MapReduce基本原理和入门案例
本文源码:GitHub·点这里 || GitEE·点这里
一、MapReduce概述
1、基本概念
Hadoop核心组件之一:分布式计算的方案MapReduce,是一种编程模型,用于大规模数据集的并行运算,其中Map(映射)和Reduce(归约)。
MapReduce既是一个编程模型,也是一个计算组件,处理的过程分为两个阶段,Map阶段:负责把任务分解为多个小任务,Reduce负责把多个小任务的处理结果进行汇总。其中Map阶段主要输入是一对Key-Value,经过map计算后输出一对Key-Value值;然后将相同Key合并,形成Key-Value集合;再将这个Key-Value集合转入Reduce阶段,经过计算输出最终Key-Value结果集。
2、特点描述
MapReduce可以实现基于上千台服务器并发工作,提供很强大的数据处理能力,如果其中单台服务挂掉,计算任务会自动转义到另外节点执行,保证高容错性;但是MapReduce不适应于实时计算与流式计算,计算的数据是静态的。
二、操作案例
1、流程描述
数据文件一般以CSV格式居多,数据行通常以空格分隔,这里需要考虑数据内容特点;
文件经过切片分配在不同的MapTask任务中并发执行;
MapTask任务执行完毕之后,执行ReduceTask任务,依赖Map阶段的数据;
ReduceTask任务执行完毕后,输出文件结果。
2、基础配置
hadoop: # 读取的文件源 inputPath: hdfs://hop01:9000/hopdir/javaNew.txt # 该路径必须是程序运行前不存在的 outputPath: /wordOut
3、Mapper程序
public class WordMapper extends Mapper<LongWritable, Text, Text, IntWritable> { Text mapKey = new Text(); IntWritable mapValue = new IntWritable(1); @Override protected void map (LongWritable key, Text value, Context context) throws IOException, InterruptedException { // 1、读取行 String line = value.toString(); // 2、行内容切割,根据文件中分隔符 String[] words = line.split(" "); // 3、存储 for (String word : words) { mapKey.set(word); context.write(mapKey, mapValue); } } }
4、Reducer程序
public class WordReducer extends Reducer<Text, IntWritable, Text, IntWritable> { int sum ; IntWritable value = new IntWritable(); @Override protected void reduce(Text key, Iterable<IntWritable> values,Context context) throws IOException, InterruptedException { // 1、累加求和统计 sum = 0; for (IntWritable count : values) { sum += count.get(); } // 2、输出结果 value.set(sum); context.write(key,value); } }
5、执行程序
@RestController public class WordWeb { @Resource private MapReduceConfig mapReduceConfig ; @GetMapping("/getWord") public String getWord () throws IOException, ClassNotFoundException, InterruptedException { // 声明配置 Configuration hadoopConfig = new Configuration(); hadoopConfig.set("fs.hdfs.impl", org.apache.hadoop.hdfs.DistributedFileSystem.class.getName() ); hadoopConfig.set("fs.file.impl", org.apache.hadoop.fs.LocalFileSystem.class.getName() ); Job job = Job.getInstance(hadoopConfig); // Job执行作业 输入路径 FileInputFormat.addInputPath(job, new Path(mapReduceConfig.getInputPath())); // Job执行作业 输出路径 FileOutputFormat.setOutputPath(job, new Path(mapReduceConfig.getOutputPath())); // 自定义 Mapper和Reducer 两个阶段的任务处理类 job.setMapperClass(WordMapper.class); job.setReducerClass(WordReducer.class); // 设置输出结果的Key和Value的类型 job.setOutputKeyClass(Text.class); job.setOutputValueClass(IntWritable.class); //执行Job直到完成 job.waitForCompletion(true); return "success" ; } }
6、执行结果查看
将应用程序打包放到hop01服务上执行;
java -jar map-reduce-case01.jar
三、案例分析
1、数据类型
Java数据类型与对应的Hadoop数据序列化类型;
Java类型 | Writable类型 | Java类型 | Writable类型 |
---|---|---|---|
String | Text | float | FloatWritable |
int | IntWritable | long | LongWritable |
boolean | BooleanWritable | double | DoubleWritable |
byte | ByteWritable | array | DoubleWritable |
map | MapWritable |
2、核心模块
Mapper模块:处理输入的数据,业务逻辑在map()方法中完成,输出的数据也是KV格式;
Reducer模块:处理Map程序输出的KV数据,业务逻辑在reduce()方法中;
Driver模块:将程序提交到yarn进行调度,提交封装了运行参数的job对象;
四、序列化操作
1、序列化简介
序列化:将内存中对象转换为二进制的字节序列,可以通过输出流持久化存储或者网络传输;
反序列化:接收输入字节流或者读取磁盘持久化的数据,加载到内存的对象过程;
Hadoop序列化相关接口:Writable实现的序列化机制、Comparable管理Key的排序问题;
2、案例实现
案例描述:读取文件,并对文件相同的行做数据累加计算,输出计算结果;该案例演示在本地执行,不把Jar包上传的hadoop服务器,驱动配置一致。
实体对象属性
public class AddEntity implements Writable { private long addNum01; private long addNum02; private long resNum; // 构造方法 public AddEntity() { super(); } public AddEntity(long addNum01, long addNum02) { super(); this.addNum01 = addNum01; this.addNum02 = addNum02; this.resNum = addNum01 + addNum02; } // 序列化 @Override public void write(DataOutput dataOutput) throws IOException { dataOutput.writeLong(addNum01); dataOutput.writeLong(addNum02); dataOutput.writeLong(resNum); } // 反序列化 @Override public void readFields(DataInput dataInput) throws IOException { // 注意:反序列化顺序和写序列化顺序一致 this.addNum01 = dataInput.readLong(); this.addNum02 = dataInput.readLong(); this.resNum = dataInput.readLong(); } // 省略Get和Set方法 }
Mapper机制
public class AddMapper extends Mapper<LongWritable, Text, Text, AddEntity> { Text myKey = new Text(); @Override protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException { // 读取行 String line = value.toString(); // 行内容切割 String[] lineArr = line.split(","); // 内容格式处理 String lineNum = lineArr[0]; long addNum01 = Long.parseLong(lineArr[1]); long addNum02 = Long.parseLong(lineArr[2]); myKey.set(lineNum); AddEntity myValue = new AddEntity(addNum01,addNum02); // 输出 context.write(myKey, myValue); } }
Reducer机制
public class AddReducer extends Reducer<Text, AddEntity, Text, AddEntity> { @Override protected void reduce(Text key, Iterable<AddEntity> values, Context context) throws IOException, InterruptedException { long addNum01Sum = 0; long addNum02Sum = 0; // 处理Key相同 for (AddEntity addEntity : values) { addNum01Sum += addEntity.getAddNum01(); addNum02Sum += addEntity.getAddNum02(); } // 最终输出 AddEntity addRes = new AddEntity(addNum01Sum, addNum02Sum); context.write(key, addRes); } }
案例最终结果:
五、源代码地址
GitHub·地址 https://github.com/cicadasmile/big-data-parent GitEE·地址 https://gitee.com/cicadasmile/big-data-parent

低调大师中文资讯倾力打造互联网数据资讯、行业资源、电子商务、移动互联网、网络营销平台。
持续更新报道IT业界、互联网、市场资讯、驱动更新,是最及时权威的产业资讯及硬件资讯报道平台。
转载内容版权归作者及来源网站所有,本站原创内容转载请注明来源。
- 上一篇
浅谈 Pull Request 与 Change Request 研发协作模式
说起 PullRequest 相信大部分人都不会陌生,它是由 Github 推出的一种开源协作模式,由于 Gitlab 占据着企业内部私有部署的半壁江山,这种模式也更多的用在企业内部代码审核流程,也就是所谓的 CodeReview。其实还有很多企业和团队会选择 Gerrit 这个工具,Gerrit 提供的是 ChangeRequest 模式,这种模式更具有针对性,对代码审核的粒度也更细,近期有客户需求在 Gitee 上实现类似 ChangeRequest 的需求,所以针对两种模式做一个介绍,探讨两种模式的具体适用场景。 什么是 CodeReview 关于 CodeReview 是什么,Gerrit 官方文档给了一个非常形象的解释,一个软件或者 Feature 的生产过程,如同一首歌的问世前一样,需要歌手反复录制不同的片段达到最佳的效果,要保证每一个片段都令人满意,并且要把所有的片段融合起来,最终目的就是把最好的版本提供给大家品鉴。Queen 乐队有一首《波西米亚狂想曲》,用了3周进行录制,其中有一些片段甚至重复录制了180多次,最终才有了这首经典。 同样的情况也在软件工程领域每天上演着...
- 下一篇
微服务三大利器之限流
背景 随着微服务的流行,服务和服务之间的稳定性变得越来越重要。缓存、降级和限流是保护微服务系统运行稳定性的三大利器。 缓存:提升系统访问速度和增大系统能处理的容量 降级:当服务出问题或者影响到核心流程的性能则需要暂时屏蔽掉 限流:解决服务雪崩,级联服务发生阻塞时,及时熔断,防止请求堆积消耗占用系统的线程、IO等资源,造成其他级联服务所在服务器的崩溃 这里我们主要说一下限流,限流的目的应当是通过对并发访问/请求进行限速或者一个时间窗口内的的请求进行限速来保护系统,一旦达到限制速率就可以拒绝服务、等待、降级。 首先,我们需要去了解最基本的两种限流算法。 限流算法 漏桶算法 令牌桶算法 计算器算法 这里主要是提一下,详细了解限流算法请参考下面链接 https://www.cnblogs.com/hopeiscoming/p/12297528.html 限流框架 下面说一下现有流行的限流工具 guava Google的Guava工具包中就提供了一个限流工具类——RateLimiter。RateLimiter是基于“令牌通算法”来实现限流的。 hystrix hystrix主要是通过资源池以及信...
相关文章
文章评论
共有0条评论来说两句吧...