高可用Hadoop平台-启航
1.概述
在上篇博客中,我们搭建了《配置高可用Hadoop平台》, 接下来我们就可以驾着Hadoop这艘巨轮在大数据的海洋中遨游了。工欲善其事,必先利其器。是的,没错;我们开发需要有开发工具(IDE);本篇文章, 我打算讲解如何搭建和使用开发环境,以及编写和讲解WordCount这个例子,给即将在Hadoop的海洋驰骋的童鞋入个门。上次,我在《网站日志统计案例分析与实现》中说会将源码放到Github,后来,我考虑了下,决定将《高可用的Hadoop平台》做一个系列,后面基于这个平台,我会单独写一篇来赘述具体的实现过程,和在实现过程中遇到的一些问题,以及解决这些问题的方案。下面我们开始今天的启航。
2.启航
IDE:JBoss Developer Studio 8.0.0.GA (Eclipse的升级版,Redhat公司出的)
JDK:1.7(或1.8)
Hadoop2x-eclipse-plugin:这个插件,本地单元测试或自己做学术研究比较好用
插件下载地址:https://github.com/smartdengjie/hadoop2x-eclipse-plugin
由于JBoss Developer Studio 8基本适合于Retina屏,所以,我们这里直接使用JBoss Developer Studio 8,JBoss Developer Studio 7对Retina屏的支持不是很完美,这里就不赘述了。
附上一张IDE的截图:
2.1安装插件
下面我们开始安装插件,首先展示首次打开的界面,如下图所示:
然后,我们到上面给的Github的地址,clone整个工程,里面有编译好的jar和源码,可自行选择(使用已存在的和自己编译对应的版本),这里我直接使用编译好的版本。我们将jar放到IDE的plugins目录下,如下图所示:
接着,我们重启IDE,界面出现如下图所示的,即表示插件添加成功,若没有,查看IDE的启动日志,根据异常日志定位出原因。
2.2设置Hadoop插件
配置信息如下所示(已在图中说明):
添加本地的hadoop源码目录:
到这里,IDE和插件的搭建就完成了,下面我们进入一段简单的开发,hadoop的源码中提供了许多example让我学习,这里我以WordCount为例子来说明:
3.WordCount
首先我们看下hadoop的源码文件目录,如下图所示:
3.1源码解读
package cn.hdfs.mr.example; import java.io.IOException; import java.util.Random; import java.util.StringTokenizer; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.Mapper; import org.apache.hadoop.mapreduce.Reducer; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import cn.hdfs.utils.ConfigUtils; /** * * @author dengjie * @date 2015年03月13日 * @description Wordcount的例子是一个比较经典的mapreduce例子,可以叫做Hadoop版的hello world。 * 它将文件中的单词分割取出,然后shuffle,sort(map过程),接着进入到汇总统计 * (reduce过程),最后写道hdfs中。基本流程就是这样。 */ public class WordCount { private static Logger log = LoggerFactory.getLogger(WordCount.class); public static class TokenizerMapper extends Mapper<Object, Text, Text, IntWritable> { private final static IntWritable one = new IntWritable(1); private Text word = new Text(); /* * 源文件:a b b * * map之后: * * a 1 * * b 1 * * b 1 */ public void map(Object key, Text value, Context context) throws IOException, InterruptedException { StringTokenizer itr = new StringTokenizer(value.toString());// 整行读取 while (itr.hasMoreTokens()) { word.set(itr.nextToken());// 按空格分割单词 context.write(word, one);// 每次统计出来的单词+1 } } } /* * reduce之前: * * a 1 * * b 1 * * b 1 * * reduce之后: * * a 1 * * b 2 */ public static class IntSumReducer extends Reducer<Text, IntWritable, Text, IntWritable> { private IntWritable result = new IntWritable(); public void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException { int sum = 0; for (IntWritable val : values) { sum += val.get(); } result.set(sum); context.write(key, result); } } @SuppressWarnings("deprecation") public static void main(String[] args) throws Exception { Configuration conf1 = new Configuration(); Configuration conf2 = new Configuration(); long random1 = new Random().nextLong();// 重定下输出目录1 long random2 = new Random().nextLong();// 重定下输出目录2 log.info("random1 -> " + random1 + ",random2 -> " + random2); Job job1 = new Job(conf1, "word count1"); job1.setJarByClass(WordCount.class); job1.setMapperClass(TokenizerMapper.class);// 指定Map计算的类 job1.setCombinerClass(IntSumReducer.class);// 合并的类 job1.setReducerClass(IntSumReducer.class);// Reduce的类 job1.setOutputKeyClass(Text.class);// 输出Key类型 job1.setOutputValueClass(IntWritable.class);// 输出值类型 Job job2 = new Job(conf2, "word count2"); job2.setJarByClass(WordCount.class); job2.setMapperClass(TokenizerMapper.class); job2.setCombinerClass(IntSumReducer.class); job2.setReducerClass(IntSumReducer.class); job2.setOutputKeyClass(Text.class); job2.setOutputValueClass(IntWritable.class); // FileInputFormat.addInputPath(job, new // Path(String.format(ConfigUtils.HDFS.WORDCOUNT_IN, "test.txt"))); // 指定输入路径 FileInputFormat.addInputPath(job1, new Path(String.format(ConfigUtils.HDFS.WORDCOUNT_IN, "word"))); // 指定输出路径 FileOutputFormat.setOutputPath(job1, new Path(String.format(ConfigUtils.HDFS.WORDCOUNT_OUT, random1))); FileInputFormat.addInputPath(job2, new Path(String.format(ConfigUtils.HDFS.WORDCOUNT_IN, "word"))); FileOutputFormat.setOutputPath(job2, new Path(String.format(ConfigUtils.HDFS.WORDCOUNT_OUT, random2))); boolean flag1 = job1.waitForCompletion(true);// 执行完MR任务后退出应用 boolean flag2 = job1.waitForCompletion(true); if (flag1 && flag2) { System.exit(0); } else { System.exit(1); } } }
4.总结
这篇文章就和大家分享到这里,如果在研究的过程有什么问题,可以加群讨论或发送邮件给我,我会尽我所能为您解答,与君共勉!

低调大师中文资讯倾力打造互联网数据资讯、行业资源、电子商务、移动互联网、网络营销平台。
持续更新报道IT业界、互联网、市场资讯、驱动更新,是最及时权威的产业资讯及硬件资讯报道平台。
转载内容版权归作者及来源网站所有,本站原创内容转载请注明来源。
- 上一篇
MapReduce 不适合处理实时数据的原因剖析
1.概述 Hadoop已被公认为大数据分析领域无可争辩的王者,它专注与批处理。这种模型对许多情形(比如:为网页建立索引)已经足够,但还存在其他一 些使用模型,它们需要来自高度动态的来源的实时信息。为了解决这个问题,就得借助Twitter推出得Storm。Storm不处理静态数据,但它处理预 计会连续的流数据。考虑到Twitter用户每天生成1.4亿条推文,那么就很容易看到此技术的巨大用途。 但Storm不只是一个传统的大数据分析系统:它是复杂事件处理(CEP)系统的一个示例。CEP系统通常分类为计算和面向检测,其中每个系统都是通过用户定义的算法在Storm中实现。举例而言,CEP可用于识别事件洪流中有意义的事件,然后实时的处理这些事件。 2.为什么Hadoop不适合实时计算 这里说的不适合,是一个相对的概念。如果业务对时延要求较低,那么这个 问题就不存在了;但事实上企业中的有些业务要求是对时延有高要求的。下面我 就来说说: 2.1时延 Storm 的网络直传与内存计算,其时延必然比 Hadoop 的 HDFS 传输低得多;当计算模型比较适合流式时,Storm 的流试处理,省去了批处理...
- 下一篇
Hadoop项目实战-用户行为分析之应用概述(二)
1.概述 本课程的视频教程地址:《项目整体概述》 本节给大家分享的主题如下图所示: 下面我开始为大家分享第二节的内容——《项目整体概述》,下面开始今天的分享内容。 2.内容 从本节开始,我们将进入到Hadoop项目的整体概述一节学习,本节课程为大家介绍的主要知识点有一下内容,如下图所示: 下面,我们首先来看看项目的整体流程,其流程如下图所示: 项目流程可以分为4个模块,他们分别是数据收集,集群存储,分析计算和结果处理。 下面我分别为大家讲解这4个模块的作用。 我们知道,在做统计时,数据源是前提,有了数据源我们才能在此基础上做相应的计算和分析。 收集数据一般都有专门的集群去负责收集这方面的工作。 在完成收集工作后,我们需要将这些文件集中起来,这里存储采用的是分布式文件系统(HDFS)。我们将收集的数据 按一定的规则分类,并存储在指定的HDFS文件系统中。从收集到存储,数据源的准备阶段就算完成了。接着,我们可以对数据源进行相关指标的分析与计算,在 Hadoop 2.x 版本后编程模型有了良好的拓展,除了支持MapReduce,还支持其以外的模型,如:Spark。另外,还有Hive,Pig,...
相关文章
文章评论
共有0条评论来说两句吧...
文章二维码
点击排行
推荐阅读
最新文章
- CentOS7编译安装Gcc9.2.0,解决mysql等软件编译问题
- SpringBoot2配置默认Tomcat设置,开启更多高级功能
- SpringBoot2整合Redis,开启缓存,提高访问速度
- Windows10,CentOS7,CentOS8安装Nodejs环境
- MySQL8.0.19开启GTID主从同步CentOS8
- Docker快速安装Oracle11G,搭建oracle11g学习环境
- SpringBoot2整合MyBatis,连接MySql数据库做增删改查操作
- Hadoop3单机部署,实现最简伪集群
- Eclipse初始化配置,告别卡顿、闪退、编译时间过长
- 设置Eclipse缩进为4个空格,增强代码规范