【技术解析】Dolphinscheduler实现MapReduce任务的高效管理
MapReduce是一种编程模型,用于处理和生成大数据集,主要用于大规模数据集(TB级数据规模)的并行运算。本文详细介绍了Dolphinscheduler在MapReduce任务中的应用,包括GenericOptionsParser与args的区别、hadoop jar命令参数的完整解释、MapReduce实例代码,以及如何在Dolphinscheduler中配置和运行MapReduce任务。
GenericOptionsParser vs args区别
GenericOptionsParser 如下:
GenericOptionsParser optionParser = new GenericOptionsParser(conf, args);
String[] remainingArgs = optionParser.getRemainingArgs();
查看 GenericOptionsParser 源码做了什么?
1、构造方法
public GenericOptionsParser(Configuration conf, String[] args)
throws IOException {
this(conf, new Options(), args);
}
2、点击 this
public GenericOptionsParser(Configuration conf,
Options options, String[] args) throws IOException {
this.conf = conf;
parseSuccessful = parseGeneralOptions(options, args);
}
3、查看 parseGeneralOptions
private boolean parseGeneralOptions(Options opts, String[] args)
throws IOException {
opts = buildGeneralOptions(opts);
CommandLineParser parser = new GnuParser();
boolean parsed = false;
try {
commandLine = parser.parse(opts, preProcessForWindows(args), true);
processGeneralOptions(commandLine);
parsed = true;
} catch(ParseException e) {
LOG.warn("options parsing failed: "+e.getMessage());
HelpFormatter formatter = new HelpFormatter();
formatter.printHelp("general options are: ", opts);
}
return parsed;
}
4、看 GnuParser
package org.apache.commons.cli;
import java.util.ArrayList;
import java.util.List;
[@Deprecated](https://my.oschina.net/jianhuaw)
public class GnuParser extends Parser {
.......
}
org.apache.commons.cli Parser,是不是有点熟悉?对,请参考 https://segmentfault.com/a/1190000045394541 这篇文章吧
5、看 processGeneralOptions 方法
private void processGeneralOptions(CommandLine line) throws IOException {
if (line.hasOption("fs")) {
FileSystem.setDefaultUri(conf, line.getOptionValue("fs"));
}
if (line.hasOption("jt")) {
String optionValue = line.getOptionValue("jt");
if (optionValue.equalsIgnoreCase("local")) {
conf.set("mapreduce.framework.name", optionValue);
}
conf.set("yarn.resourcemanager.address", optionValue,
"from -jt command line option");
}
if (line.hasOption("conf")) {
String[] values = line.getOptionValues("conf");
for(String value : values) {
conf.addResource(new Path(value));
}
}
if (line.hasOption('D')) {
String[] property = line.getOptionValues('D');
for(String prop : property) {
String[] keyval = prop.split("=", 2);
if (keyval.length == 2) {
conf.set(keyval[0], keyval[1], "from command line");
}
}
}
if (line.hasOption("libjars")) {
// for libjars, we allow expansion of wildcards
conf.set("tmpjars",
validateFiles(line.getOptionValue("libjars"), true),
"from -libjars command line option");
//setting libjars in client classpath
URL[] libjars = getLibJars(conf);
if(libjars!=null && libjars.length>0) {
conf.setClassLoader(new URLClassLoader(libjars, conf.getClassLoader()));
Thread.currentThread().setContextClassLoader(
new URLClassLoader(libjars,
Thread.currentThread().getContextClassLoader()));
}
}
if (line.hasOption("files")) {
conf.set("tmpfiles",
validateFiles(line.getOptionValue("files")),
"from -files command line option");
}
if (line.hasOption("archives")) {
conf.set("tmparchives",
validateFiles(line.getOptionValue("archives")),
"from -archives command line option");
}
conf.setBoolean("mapreduce.client.genericoptionsparser.used", true);
// tokensFile
if(line.hasOption("tokenCacheFile")) {
String fileName = line.getOptionValue("tokenCacheFile");
// check if the local file exists
FileSystem localFs = FileSystem.getLocal(conf);
Path p = localFs.makeQualified(new Path(fileName));
localFs.getFileStatus(p);
if(LOG.isDebugEnabled()) {
LOG.debug("setting conf tokensFile: " + fileName);
}
UserGroupInformation.getCurrentUser().addCredentials(
Credentials.readTokenStorageFile(p, conf));
conf.set("mapreduce.job.credentials.binary", p.toString(),
"from -tokenCacheFile command line option");
}
}
原理是把 fs、jt、D、libjars、files、archives、tokenCacheFile 相关参数放入到 Hadoop的 Configuration中了,终于清楚 GenericOptionsParser是干什么的了
args呢?如果要使用args,以上这种 fs、jt、D、libjars、files、archives、tokenCacheFile 是需要自己解析的。
Hadoop jar完整参数解释
hadoop jar wordcount.jar org.myorg.WordCount \
-fs hdfs://namenode.example.com:8020 \
-jt resourcemanager.example.com:8032 \
-D mapreduce.job.queuename=default \
-libjars /path/to/dependency1.jar,/path/to/dependency2.jar \
-files /path/to/file1.txt,/path/to/file2.txt \
-archives /path/to/archive1.zip,/path/to/archive2.tar.gz \
-tokenCacheFile /path/to/credential.file \
/input /output
这条命令会:
- 将作业提交到 hdfs://namenode.example.com:8020 文件系统
- 使用 resourcemanager.example.com:8032 作为 YARN ResourceManager
- 提交到 default 队列
- 使用 /path/to/dependency1.jar 和 /path/to/dependency2.jar 作为依赖
- 分发本地文件 /path/to/file1.txt 和 /path/to/file2.txt,注意 : 是本地文件哦
- 解压并分发 /path/to/archive1.zip 和 /path/to/archive2.tar.gz
- 分发凭证文件 /path/to/credential.file
MR实例
WordCount经典示例
public class WCMapper extends Mapper<LongWritable, Text, Text, IntWritable> {
private IntWritable one = new IntWritable(1);
private Text word = new Text();
[@Override](https://my.oschina.net/u/1162528)
protected void map(LongWritable key,
Text value,
Mapper<LongWritable, Text, Text, IntWritable>.Context context) throws IOException, InterruptedException {
String[] fields = value.toString().split("\\s+");
for (String field : fields) {
word.set(field);
context.write(word, one);
}
}
}
public class WCReducer extends Reducer<Text, IntWritable, Text, IntWritable> {
private IntWritable result = new IntWritable();
[@Override](https://my.oschina.net/u/1162528)
protected void reduce(Text key,
Iterable<IntWritable> values,
Reducer<Text, IntWritable, Text, IntWritable>.Context context) throws IOException, InterruptedException {
int sum = 0;
for (IntWritable val : values) {
sum += val.get();
}
result.set(sum);
context.write(key, result);
}
}
public class WCJob {
public static void main(String[] args) throws Exception {
Configuration conf = new Configuration();
// TODO 如果要是本地访问远程的hdfs,需要指定hdfs的根路径,否则只能访问本地的文件系统
// conf.set("fs.defaultFS", "hdfs://xx.xx.xx.xx:8020");
GenericOptionsParser optionParser = new GenericOptionsParser(conf, args);
String[] remainingArgs = optionParser.getRemainingArgs();
for (String arg : args) {
System.out.println("arg :" + arg);
}
for (String remainingArg : remainingArgs) {
System.out.println("remainingArg :" + remainingArg);
}
if (remainingArgs.length < 2) {
throw new RuntimeException("input and output path must set.");
}
Path outputPath = new Path(remainingArgs[1]);
FileSystem fileSystem = FileSystem.get(conf);
boolean exists = fileSystem.exists(outputPath);
// 如果目标目录存在,则删除
if (exists) {
fileSystem.delete(outputPath, true);
}
Job job = Job.getInstance(conf, "MRWordCount");
job.setJarByClass(WCJob.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(IntWritable.class);
job.setMapperClass(WCMapper.class);
job.setReducerClass(WCReducer.class);
FileInputFormat.addInputPath(job, new Path(remainingArgs[0]));
FileOutputFormat.setOutputPath(job, new Path(remainingArgs[1]));
System.exit(job.waitForCompletion(true) ? 0 : 1);
}
}
文件分发
public class ConfigMapper extends Mapper<LongWritable, Text, Text, NullWritable> {
private List<String> whiteList = new ArrayList<>();
private Text text = new Text();
[@Override](https://my.oschina.net/u/1162528)
protected void setup(Mapper<LongWritable, Text, Text, NullWritable>.Context context) throws IOException, InterruptedException {
// 获取作业提交时传递的文件
URI[] files = context.getCacheFiles();
if (files != null && files.length > 0) {
// 读取文件内容
File configFile = new File("white.txt"); // 文件名要与传递的文件名保持一致
try (BufferedReader reader = new BufferedReader(new FileReader(configFile))){
String line = null;
while ((line = reader.readLine()) != null) {
whiteList.add(line);
}
}
}
}
[@Override](https://my.oschina.net/u/1162528)
protected void map(LongWritable key,
Text value,
Mapper<LongWritable, Text, Text, NullWritable>.Context context) throws IOException, InterruptedException {
String line = value.toString();
String[] datas = line.split("\\s+");
List<String> whiteDatas = Arrays.stream(datas).filter(data -> whiteList.contains(data)).collect(Collectors.toList());
for (String data : whiteDatas) {
text.set(data);
context.write(text , NullWritable.get());
}
}
}
public class ConfigJob {
public static void main(String[] args) throws Exception {
// 设置用户名
System.setProperty("HADOOP_USER_NAME", "root");
Configuration conf = new Configuration();
conf.set("fs.defaultFS", "hdfs://xx.xx.xx.xx:8020");
GenericOptionsParser optionParser = new GenericOptionsParser(conf, args);
String[] remainingArgs = optionParser.getRemainingArgs();
if (remainingArgs.length < 2) {
throw new RuntimeException("input and output path must set.");
}
Path outputPath = new Path(remainingArgs[1]);
FileSystem fileSystem = FileSystem.get(conf);
boolean exists = fileSystem.exists(outputPath);
// 如果目标目录存在,则删除
if (exists) {
fileSystem.delete(outputPath, true);
}
Job job = Job.getInstance(conf, "MRConfig");
job.setJarByClass(ConfigJob.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(NullWritable.class);
job.setMapperClass(ConfigMapper.class);
FileInputFormat.addInputPath(job, new Path(remainingArgs[0]));
FileOutputFormat.setOutputPath(job, new Path(remainingArgs[1]));
System.exit(job.waitForCompletion(true) ? 0 : 1);
}
}
Dolphinscheduler MR使用
Yarn test队列设置
YARN 的配置目录中找到 capacity-scheduler.xml 文件。通常位于 $HADOOP_HOME/etc/hadoop/ 目录下。
修改 capacity-scheduler.xml
<property>
<name>yarn.scheduler.capacity.root.queues</name>
<value>default, test</value>
</property>
<property>
<name>yarn.scheduler.capacity.root.test.capacity</name>
<value>30</value>
</property>
<property>
<name>yarn.scheduler.capacity.root.test.maximum-capacity</name>
<value>50</value>
</property>
<property>
<name>yarn.scheduler.capacity.root.test.user-limit-factor</name>
<value>1</value>
</property>
刷新队列配置 yarn rmadmin -refreshQueues
流程定义设置
执行结果
离线任务实例
YARN作业展示
源码分析
org.apache.dolphinscheduler.plugin.task.mr.MapReduceArgsUtils#buildArgs
String others = param.getOthers();
// TODO 这里其实就是想说,没有通过 -D mapreduce.job.queuename 形式指定队列,是用页面上直接指定队列名称的,页面上 Yarn队列 输入框
if (StringUtils.isEmpty(others) || !others.contains(MR_YARN_QUEUE)) {
String yarnQueue = param.getYarnQueue();
if (StringUtils.isNotEmpty(yarnQueue)) {
args.add(String.format("%s%s=%s", D, MR_YARN_QUEUE, yarnQueue));
}
}
// TODO 这里就是页面上,选项参数 输入框
// -conf -archives -files -libjars -D
if (StringUtils.isNotEmpty(others)) {
args.add(others);
}
转载自Journey 原文链接:https://segmentfault.com/a/1190000045403915
本文由 白鲸开源科技 提供发布支持!

低调大师中文资讯倾力打造互联网数据资讯、行业资源、电子商务、移动互联网、网络营销平台。
持续更新报道IT业界、互联网、市场资讯、驱动更新,是最及时权威的产业资讯及硬件资讯报道平台。
转载内容版权归作者及来源网站所有,本站原创内容转载请注明来源。
-
上一篇
vivo 游戏中心包体积优化方案与实践
作者:来自 vivo 互联网大前端团队- Ke Jie 介绍 App 包体积优化的必要性,游戏中心 App 在实际优化过程中的有效措施,包括一些优化建议以及有优化思路。 一、包体积优化的必要性 安装包大小与下载转化率的关系大致是成反比的,即安装包越大,下载转换率就越差。Google 曾在2019的谷歌大会上给出过一个统计结论,包体积体大小每上升6MB,应用下载转化率就会下降1%,在不同地区的表现可能会有所差异。 APK 减少10MB,在不同国家转化率增长 (注:数据来自于 googleplaydev:Shrinking APKs, growing installs) 二、游戏中心 APK 组成 APK 包含以下目录: META-INF/:包含 CERT.SF 、CERT.RSA 签名文件、MANIFEST.MF 清单文件。 assets/:包含应用的资源。 res/:包含未编译到 resources.arsc 中的资源。 lib/:支持对应 CPU 架构的 so 文件。 resources.arsc:资源索引文件。 classes.dex:可以理解的dex文件就是项目代码编译为 cla...
-
下一篇
Rust 的静态网站生成器「GitHub 热点速览」
如果你做过个人博客网站,那么一定对静态网站生成器不陌生。无论是 Ruby 语言的 Jekyll、Go 语言的 Hugo、还是基于 React 的 Gatsby,这些工具都有庞大的用户群体。对于喜欢的人来说,它们是无可替代的神器,而对于不喜欢的人,则可能难以"下咽"。正如俗话所说,"萝卜青菜,各有所爱",没有最好用的工具,只有最适合自己的。 比如,上周热门的开源项目 zola,它的诞生就是因为作者不喜欢 Hugo 的模板引擎,同时为了追求更简洁的使用体验。他选择用 Rust 开发了 zola 这款静态网站生成器,提供独立的可执行文件和更简单易用的模版语言。同样备受关注的还有 dockur/macos,它可以让用户在 Docker 中体验 macOS 系统。用于备份 QQ 空间说说的 GetQzonehistory,虽看似简单,但凭借切中用户痛点和开箱即用的特点,让它在短时间内获得上千 Star 的关注。 本文目录 热门开源项目 1.1 Rust 的静态网站生成器:zola 1.2 Linux 平台的 GDB 图形化增强工具:Seer 1.3 在 Docker 中体验 macOS 系统:m...
相关文章
文章评论
共有0条评论来说两句吧...
文章二维码
点击排行
推荐阅读
最新文章
- Eclipse初始化配置,告别卡顿、闪退、编译时间过长
- SpringBoot2整合Thymeleaf,官方推荐html解决方案
- SpringBoot2编写第一个Controller,响应你的http请求并返回结果
- SpringBoot2初体验,简单认识spring boot2并且搭建基础工程
- MySQL数据库在高并发下的优化方案
- CentOS关闭SELinux安全模块
- CentOS7,8上快速安装Gitea,搭建Git服务器
- Docker使用Oracle官方镜像安装(12C,18C,19C)
- Springboot2将连接池hikari替换为druid,体验最强大的数据库连接池
- Dcoker安装(在线仓库),最新的服务器搭配容器使用