【技术解析】Dolphinscheduler实现MapReduce任务的高效管理
MapReduce是一种编程模型,用于处理和生成大数据集,主要用于大规模数据集(TB级数据规模)的并行运算。本文详细介绍了Dolphinscheduler在MapReduce任务中的应用,包括GenericOptionsParser与args的区别、hadoop jar命令参数的完整解释、MapReduce实例代码,以及如何在Dolphinscheduler中配置和运行MapReduce任务。
GenericOptionsParser vs args区别
GenericOptionsParser 如下:
GenericOptionsParser optionParser = new GenericOptionsParser(conf, args); String[] remainingArgs = optionParser.getRemainingArgs();
查看 GenericOptionsParser 源码做了什么?
1、构造方法 public GenericOptionsParser(Configuration conf, String[] args) throws IOException { this(conf, new Options(), args); } 2、点击 this public GenericOptionsParser(Configuration conf, Options options, String[] args) throws IOException { this.conf = conf; parseSuccessful = parseGeneralOptions(options, args); } 3、查看 parseGeneralOptions private boolean parseGeneralOptions(Options opts, String[] args) throws IOException { opts = buildGeneralOptions(opts); CommandLineParser parser = new GnuParser(); boolean parsed = false; try { commandLine = parser.parse(opts, preProcessForWindows(args), true); processGeneralOptions(commandLine); parsed = true; } catch(ParseException e) { LOG.warn("options parsing failed: "+e.getMessage()); HelpFormatter formatter = new HelpFormatter(); formatter.printHelp("general options are: ", opts); } return parsed; } 4、看 GnuParser package org.apache.commons.cli; import java.util.ArrayList; import java.util.List; [@Deprecated](https://my.oschina.net/jianhuaw) public class GnuParser extends Parser { ....... } org.apache.commons.cli Parser,是不是有点熟悉?对,请参考 https://segmentfault.com/a/1190000045394541 这篇文章吧 5、看 processGeneralOptions 方法 private void processGeneralOptions(CommandLine line) throws IOException { if (line.hasOption("fs")) { FileSystem.setDefaultUri(conf, line.getOptionValue("fs")); } if (line.hasOption("jt")) { String optionValue = line.getOptionValue("jt"); if (optionValue.equalsIgnoreCase("local")) { conf.set("mapreduce.framework.name", optionValue); } conf.set("yarn.resourcemanager.address", optionValue, "from -jt command line option"); } if (line.hasOption("conf")) { String[] values = line.getOptionValues("conf"); for(String value : values) { conf.addResource(new Path(value)); } } if (line.hasOption('D')) { String[] property = line.getOptionValues('D'); for(String prop : property) { String[] keyval = prop.split("=", 2); if (keyval.length == 2) { conf.set(keyval[0], keyval[1], "from command line"); } } } if (line.hasOption("libjars")) { // for libjars, we allow expansion of wildcards conf.set("tmpjars", validateFiles(line.getOptionValue("libjars"), true), "from -libjars command line option"); //setting libjars in client classpath URL[] libjars = getLibJars(conf); if(libjars!=null && libjars.length>0) { conf.setClassLoader(new URLClassLoader(libjars, conf.getClassLoader())); Thread.currentThread().setContextClassLoader( new URLClassLoader(libjars, Thread.currentThread().getContextClassLoader())); } } if (line.hasOption("files")) { conf.set("tmpfiles", validateFiles(line.getOptionValue("files")), "from -files command line option"); } if (line.hasOption("archives")) { conf.set("tmparchives", validateFiles(line.getOptionValue("archives")), "from -archives command line option"); } conf.setBoolean("mapreduce.client.genericoptionsparser.used", true); // tokensFile if(line.hasOption("tokenCacheFile")) { String fileName = line.getOptionValue("tokenCacheFile"); // check if the local file exists FileSystem localFs = FileSystem.getLocal(conf); Path p = localFs.makeQualified(new Path(fileName)); localFs.getFileStatus(p); if(LOG.isDebugEnabled()) { LOG.debug("setting conf tokensFile: " + fileName); } UserGroupInformation.getCurrentUser().addCredentials( Credentials.readTokenStorageFile(p, conf)); conf.set("mapreduce.job.credentials.binary", p.toString(), "from -tokenCacheFile command line option"); } } 原理是把 fs、jt、D、libjars、files、archives、tokenCacheFile 相关参数放入到 Hadoop的 Configuration中了,终于清楚 GenericOptionsParser是干什么的了
args呢?如果要使用args,以上这种 fs、jt、D、libjars、files、archives、tokenCacheFile 是需要自己解析的。
Hadoop jar完整参数解释
hadoop jar wordcount.jar org.myorg.WordCount \ -fs hdfs://namenode.example.com:8020 \ -jt resourcemanager.example.com:8032 \ -D mapreduce.job.queuename=default \ -libjars /path/to/dependency1.jar,/path/to/dependency2.jar \ -files /path/to/file1.txt,/path/to/file2.txt \ -archives /path/to/archive1.zip,/path/to/archive2.tar.gz \ -tokenCacheFile /path/to/credential.file \ /input /output
这条命令会:
- 将作业提交到 hdfs://namenode.example.com:8020 文件系统
- 使用 resourcemanager.example.com:8032 作为 YARN ResourceManager
- 提交到 default 队列
- 使用 /path/to/dependency1.jar 和 /path/to/dependency2.jar 作为依赖
- 分发本地文件 /path/to/file1.txt 和 /path/to/file2.txt,注意 : 是本地文件哦
- 解压并分发 /path/to/archive1.zip 和 /path/to/archive2.tar.gz
- 分发凭证文件 /path/to/credential.file
MR实例
WordCount经典示例
public class WCMapper extends Mapper<LongWritable, Text, Text, IntWritable> { private IntWritable one = new IntWritable(1); private Text word = new Text(); [@Override](https://my.oschina.net/u/1162528) protected void map(LongWritable key, Text value, Mapper<LongWritable, Text, Text, IntWritable>.Context context) throws IOException, InterruptedException { String[] fields = value.toString().split("\\s+"); for (String field : fields) { word.set(field); context.write(word, one); } } } public class WCReducer extends Reducer<Text, IntWritable, Text, IntWritable> { private IntWritable result = new IntWritable(); [@Override](https://my.oschina.net/u/1162528) protected void reduce(Text key, Iterable<IntWritable> values, Reducer<Text, IntWritable, Text, IntWritable>.Context context) throws IOException, InterruptedException { int sum = 0; for (IntWritable val : values) { sum += val.get(); } result.set(sum); context.write(key, result); } } public class WCJob { public static void main(String[] args) throws Exception { Configuration conf = new Configuration(); // TODO 如果要是本地访问远程的hdfs,需要指定hdfs的根路径,否则只能访问本地的文件系统 // conf.set("fs.defaultFS", "hdfs://xx.xx.xx.xx:8020"); GenericOptionsParser optionParser = new GenericOptionsParser(conf, args); String[] remainingArgs = optionParser.getRemainingArgs(); for (String arg : args) { System.out.println("arg :" + arg); } for (String remainingArg : remainingArgs) { System.out.println("remainingArg :" + remainingArg); } if (remainingArgs.length < 2) { throw new RuntimeException("input and output path must set."); } Path outputPath = new Path(remainingArgs[1]); FileSystem fileSystem = FileSystem.get(conf); boolean exists = fileSystem.exists(outputPath); // 如果目标目录存在,则删除 if (exists) { fileSystem.delete(outputPath, true); } Job job = Job.getInstance(conf, "MRWordCount"); job.setJarByClass(WCJob.class); job.setOutputKeyClass(Text.class); job.setOutputValueClass(IntWritable.class); job.setMapperClass(WCMapper.class); job.setReducerClass(WCReducer.class); FileInputFormat.addInputPath(job, new Path(remainingArgs[0])); FileOutputFormat.setOutputPath(job, new Path(remainingArgs[1])); System.exit(job.waitForCompletion(true) ? 0 : 1); } }
文件分发
public class ConfigMapper extends Mapper<LongWritable, Text, Text, NullWritable> { private List<String> whiteList = new ArrayList<>(); private Text text = new Text(); [@Override](https://my.oschina.net/u/1162528) protected void setup(Mapper<LongWritable, Text, Text, NullWritable>.Context context) throws IOException, InterruptedException { // 获取作业提交时传递的文件 URI[] files = context.getCacheFiles(); if (files != null && files.length > 0) { // 读取文件内容 File configFile = new File("white.txt"); // 文件名要与传递的文件名保持一致 try (BufferedReader reader = new BufferedReader(new FileReader(configFile))){ String line = null; while ((line = reader.readLine()) != null) { whiteList.add(line); } } } } [@Override](https://my.oschina.net/u/1162528) protected void map(LongWritable key, Text value, Mapper<LongWritable, Text, Text, NullWritable>.Context context) throws IOException, InterruptedException { String line = value.toString(); String[] datas = line.split("\\s+"); List<String> whiteDatas = Arrays.stream(datas).filter(data -> whiteList.contains(data)).collect(Collectors.toList()); for (String data : whiteDatas) { text.set(data); context.write(text , NullWritable.get()); } } } public class ConfigJob { public static void main(String[] args) throws Exception { // 设置用户名 System.setProperty("HADOOP_USER_NAME", "root"); Configuration conf = new Configuration(); conf.set("fs.defaultFS", "hdfs://xx.xx.xx.xx:8020"); GenericOptionsParser optionParser = new GenericOptionsParser(conf, args); String[] remainingArgs = optionParser.getRemainingArgs(); if (remainingArgs.length < 2) { throw new RuntimeException("input and output path must set."); } Path outputPath = new Path(remainingArgs[1]); FileSystem fileSystem = FileSystem.get(conf); boolean exists = fileSystem.exists(outputPath); // 如果目标目录存在,则删除 if (exists) { fileSystem.delete(outputPath, true); } Job job = Job.getInstance(conf, "MRConfig"); job.setJarByClass(ConfigJob.class); job.setOutputKeyClass(Text.class); job.setOutputValueClass(NullWritable.class); job.setMapperClass(ConfigMapper.class); FileInputFormat.addInputPath(job, new Path(remainingArgs[0])); FileOutputFormat.setOutputPath(job, new Path(remainingArgs[1])); System.exit(job.waitForCompletion(true) ? 0 : 1); } }
Dolphinscheduler MR使用
Yarn test队列设置
YARN 的配置目录中找到 capacity-scheduler.xml 文件。通常位于 $HADOOP_HOME/etc/hadoop/ 目录下。
修改 capacity-scheduler.xml
<property> <name>yarn.scheduler.capacity.root.queues</name> <value>default, test</value> </property> <property> <name>yarn.scheduler.capacity.root.test.capacity</name> <value>30</value> </property> <property> <name>yarn.scheduler.capacity.root.test.maximum-capacity</name> <value>50</value> </property> <property> <name>yarn.scheduler.capacity.root.test.user-limit-factor</name> <value>1</value> </property>
刷新队列配置 yarn rmadmin -refreshQueues
流程定义设置
执行结果
离线任务实例
YARN作业展示
源码分析
org.apache.dolphinscheduler.plugin.task.mr.MapReduceArgsUtils#buildArgs
String others = param.getOthers(); // TODO 这里其实就是想说,没有通过 -D mapreduce.job.queuename 形式指定队列,是用页面上直接指定队列名称的,页面上 Yarn队列 输入框 if (StringUtils.isEmpty(others) || !others.contains(MR_YARN_QUEUE)) { String yarnQueue = param.getYarnQueue(); if (StringUtils.isNotEmpty(yarnQueue)) { args.add(String.format("%s%s=%s", D, MR_YARN_QUEUE, yarnQueue)); } } // TODO 这里就是页面上,选项参数 输入框 // -conf -archives -files -libjars -D if (StringUtils.isNotEmpty(others)) { args.add(others); }
转载自Journey 原文链接:https://segmentfault.com/a/1190000045403915
本文由 白鲸开源科技 提供发布支持!

低调大师中文资讯倾力打造互联网数据资讯、行业资源、电子商务、移动互联网、网络营销平台。
持续更新报道IT业界、互联网、市场资讯、驱动更新,是最及时权威的产业资讯及硬件资讯报道平台。
转载内容版权归作者及来源网站所有,本站原创内容转载请注明来源。
- 上一篇
vivo 游戏中心包体积优化方案与实践
作者:来自 vivo 互联网大前端团队- Ke Jie 介绍 App 包体积优化的必要性,游戏中心 App 在实际优化过程中的有效措施,包括一些优化建议以及有优化思路。 一、包体积优化的必要性 安装包大小与下载转化率的关系大致是成反比的,即安装包越大,下载转换率就越差。Google 曾在2019的谷歌大会上给出过一个统计结论,包体积体大小每上升6MB,应用下载转化率就会下降1%,在不同地区的表现可能会有所差异。 APK 减少10MB,在不同国家转化率增长 (注:数据来自于 googleplaydev:Shrinking APKs, growing installs) 二、游戏中心 APK 组成 APK 包含以下目录: META-INF/:包含 CERT.SF 、CERT.RSA 签名文件、MANIFEST.MF 清单文件。 assets/:包含应用的资源。 res/:包含未编译到 resources.arsc 中的资源。 lib/:支持对应 CPU 架构的 so 文件。 resources.arsc:资源索引文件。 classes.dex:可以理解的dex文件就是项目代码编译为 cla...
- 下一篇
Rust 的静态网站生成器「GitHub 热点速览」
如果你做过个人博客网站,那么一定对静态网站生成器不陌生。无论是 Ruby 语言的 Jekyll、Go 语言的 Hugo、还是基于 React 的 Gatsby,这些工具都有庞大的用户群体。对于喜欢的人来说,它们是无可替代的神器,而对于不喜欢的人,则可能难以"下咽"。正如俗话所说,"萝卜青菜,各有所爱",没有最好用的工具,只有最适合自己的。 比如,上周热门的开源项目 zola,它的诞生就是因为作者不喜欢 Hugo 的模板引擎,同时为了追求更简洁的使用体验。他选择用 Rust 开发了 zola 这款静态网站生成器,提供独立的可执行文件和更简单易用的模版语言。同样备受关注的还有 dockur/macos,它可以让用户在 Docker 中体验 macOS 系统。用于备份 QQ 空间说说的 GetQzonehistory,虽看似简单,但凭借切中用户痛点和开箱即用的特点,让它在短时间内获得上千 Star 的关注。 本文目录 热门开源项目 1.1 Rust 的静态网站生成器:zola 1.2 Linux 平台的 GDB 图形化增强工具:Seer 1.3 在 Docker 中体验 macOS 系统:m...
相关文章
文章评论
共有0条评论来说两句吧...
文章二维码
点击排行
推荐阅读
最新文章
- Red5直播服务器,属于Java语言的直播服务器
- CentOS7,8上快速安装Gitea,搭建Git服务器
- CentOS6,7,8上安装Nginx,支持https2.0的开启
- CentOS8,CentOS7,CentOS6编译安装Redis5.0.7
- SpringBoot2更换Tomcat为Jetty,小型站点的福音
- Jdk安装(Linux,MacOS,Windows),包含三大操作系统的最全安装
- SpringBoot2整合MyBatis,连接MySql数据库做增删改查操作
- SpringBoot2全家桶,快速入门学习开发网站教程
- CentOS8安装MyCat,轻松搞定数据库的读写分离、垂直分库、水平分库
- CentOS8编译安装MySQL8.0.19