【技术解析】Dolphinscheduler实现MapReduce任务的高效管理

news2025/1/20 12:01:53

MapReduce是一种编程模型,用于处理和生成大数据集,主要用于大规模数据集(TB级数据规模)的并行运算。本文详细介绍了Dolphinscheduler在MapReduce任务中的应用,包括GenericOptionsParser与args的区别、hadoop jar命令参数的完整解释、MapReduce实例代码,以及如何在Dolphinscheduler中配置和运行MapReduce任务。

GenericOptionsParser vs args区别

GenericOptionsParser 如下:

GenericOptionsParser optionParser = new GenericOptionsParser(conf, args);
String[] remainingArgs = optionParser.getRemainingArgs();

查看 GenericOptionsParser 源码做了什么?

1、构造方法
public GenericOptionsParser(Configuration conf, String[] args) 
      throws IOException {
    this(conf, new Options(), args); 
}

2、点击 this
public GenericOptionsParser(Configuration conf,
      Options options, String[] args) throws IOException {
    this.conf = conf;
    parseSuccessful = parseGeneralOptions(options, args);
}

3、查看 parseGeneralOptions
private boolean parseGeneralOptions(Options opts, String[] args)
      throws IOException {
    opts = buildGeneralOptions(opts);
    CommandLineParser parser = new GnuParser();
    boolean parsed = false;
    try {
      commandLine = parser.parse(opts, preProcessForWindows(args), true);
      processGeneralOptions(commandLine);
      parsed = true;
    } catch(ParseException e) {
      LOG.warn("options parsing failed: "+e.getMessage());

      HelpFormatter formatter = new HelpFormatter();
      formatter.printHelp("general options are: ", opts);
    }
    return parsed;
}

4、看 GnuParser 
package org.apache.commons.cli;

import java.util.ArrayList;
import java.util.List;

@Deprecated
public class GnuParser extends Parser {
.......
}

org.apache.commons.cli Parser,是不是有点熟悉?对,请参考 https://segmentfault.com/a/1190000045394541 这篇文章吧

5、看 processGeneralOptions 方法
private void processGeneralOptions(CommandLine line) throws IOException {
    if (line.hasOption("fs")) {
      FileSystem.setDefaultUri(conf, line.getOptionValue("fs"));
    }

    if (line.hasOption("jt")) {
      String optionValue = line.getOptionValue("jt");
      if (optionValue.equalsIgnoreCase("local")) {
        conf.set("mapreduce.framework.name", optionValue);
      }

      conf.set("yarn.resourcemanager.address", optionValue, 
          "from -jt command line option");
    }
    if (line.hasOption("conf")) {
      String[] values = line.getOptionValues("conf");
      for(String value : values) {
        conf.addResource(new Path(value));
      }
    }

    if (line.hasOption('D')) {
      String[] property = line.getOptionValues('D');
      for(String prop : property) {
        String[] keyval = prop.split("=", 2);
        if (keyval.length == 2) {
          conf.set(keyval[0], keyval[1], "from command line");
        }
      }
    }

    if (line.hasOption("libjars")) {
      // for libjars, we allow expansion of wildcards
      conf.set("tmpjars",
               validateFiles(line.getOptionValue("libjars"), true),
               "from -libjars command line option");
      //setting libjars in client classpath
      URL[] libjars = getLibJars(conf);
      if(libjars!=null && libjars.length>0) {
        conf.setClassLoader(new URLClassLoader(libjars, conf.getClassLoader()));
        Thread.currentThread().setContextClassLoader(
            new URLClassLoader(libjars, 
                Thread.currentThread().getContextClassLoader()));
      }
    }
    if (line.hasOption("files")) {
      conf.set("tmpfiles", 
               validateFiles(line.getOptionValue("files")),
               "from -files command line option");
    }
    if (line.hasOption("archives")) {
      conf.set("tmparchives", 
                validateFiles(line.getOptionValue("archives")),
                "from -archives command line option");
    }
    conf.setBoolean("mapreduce.client.genericoptionsparser.used", true);

    // tokensFile
    if(line.hasOption("tokenCacheFile")) {
      String fileName = line.getOptionValue("tokenCacheFile");
      // check if the local file exists
      FileSystem localFs = FileSystem.getLocal(conf);
      Path p = localFs.makeQualified(new Path(fileName));
      localFs.getFileStatus(p);
      if(LOG.isDebugEnabled()) {
        LOG.debug("setting conf tokensFile: " + fileName);
      }
      UserGroupInformation.getCurrentUser().addCredentials(
          Credentials.readTokenStorageFile(p, conf));
      conf.set("mapreduce.job.credentials.binary", p.toString(),
               "from -tokenCacheFile command line option");

    }
}

原理是把 fs、jt、D、libjars、files、archives、tokenCacheFile 相关参数放入到 Hadoop的 Configuration中了,终于清楚 GenericOptionsParser是干什么的了

args呢?如果要使用args,以上这种 fs、jt、D、libjars、files、archives、tokenCacheFile 是需要自己解析的。

Hadoop jar完整参数解释

hadoop jar wordcount.jar org.myorg.WordCount \
    -fs hdfs://namenode.example.com:8020 \
    -jt resourcemanager.example.com:8032 \
    -D mapreduce.job.queuename=default \
    -libjars /path/to/dependency1.jar,/path/to/dependency2.jar \
    -files /path/to/file1.txt,/path/to/file2.txt \
    -archives /path/to/archive1.zip,/path/to/archive2.tar.gz \
    -tokenCacheFile /path/to/credential.file \
    /input /output

这条命令会:

  1. 将作业提交到 hdfs://namenode.example.com:8020 文件系统
  2. 使用 resourcemanager.example.com:8032 作为 YARN ResourceManager
  3. 提交到 default 队列
  4. 使用 /path/to/dependency1.jar 和 /path/to/dependency2.jar 作为依赖
  5. 分发本地文件 /path/to/file1.txt 和 /path/to/file2.txt,注意 : 是本地文件哦
  6. 解压并分发 /path/to/archive1.zip 和 /path/to/archive2.tar.gz
  7. 分发凭证文件 /path/to/credential.file

    MR实例

    WordCount经典示例

public class WCMapper extends Mapper<LongWritable, Text, Text, IntWritable> {

    private IntWritable one = new IntWritable(1);
    private Text word = new Text();

    @Override
    protected void map(LongWritable key,
                       Text value,
                       Mapper<LongWritable, Text, Text, IntWritable>.Context context) throws IOException, InterruptedException {
        String[] fields = value.toString().split("\\s+");
        for (String field : fields) {
            word.set(field);
            context.write(word, one);
        }
    }
}

public class WCReducer extends Reducer<Text, IntWritable, Text, IntWritable> {

    private IntWritable result = new IntWritable();

    @Override
    protected void reduce(Text key,
                          Iterable<IntWritable> values,
                          Reducer<Text, IntWritable, Text, IntWritable>.Context context) throws IOException, InterruptedException {

        int sum = 0;
        for (IntWritable val : values) {
            sum += val.get();
        }
        result.set(sum);
        context.write(key, result);
    }
}


public class WCJob {

    public static void main(String[] args) throws Exception {
        Configuration conf = new Configuration();

        // TODO 如果要是本地访问远程的hdfs,需要指定hdfs的根路径,否则只能访问本地的文件系统
//        conf.set("fs.defaultFS", "hdfs://xx.xx.xx.xx:8020");

        GenericOptionsParser optionParser = new GenericOptionsParser(conf, args);
        String[] remainingArgs = optionParser.getRemainingArgs();

        for (String arg : args) {
            System.out.println("arg :" + arg);
        }

        for (String remainingArg : remainingArgs) {
            System.out.println("remainingArg :" + remainingArg);
        }

        if (remainingArgs.length < 2) {
            throw new RuntimeException("input and output path must set.");
        }

        Path outputPath = new Path(remainingArgs[1]);
        FileSystem fileSystem = FileSystem.get(conf);
        boolean exists = fileSystem.exists(outputPath);
        // 如果目标目录存在,则删除
        if (exists) {
            fileSystem.delete(outputPath, true);
        }

        Job job = Job.getInstance(conf, "MRWordCount");
        job.setJarByClass(WCJob.class);
        job.setOutputKeyClass(Text.class);
        job.setOutputValueClass(IntWritable.class);
        job.setMapperClass(WCMapper.class);
        job.setReducerClass(WCReducer.class);
        FileInputFormat.addInputPath(job, new Path(remainingArgs[0]));
        FileOutputFormat.setOutputPath(job, new Path(remainingArgs[1]));
        System.exit(job.waitForCompletion(true) ? 0 : 1);
    }
}

文件分发

public class ConfigMapper extends Mapper<LongWritable, Text, Text, NullWritable> {
    private List<String> whiteList = new ArrayList<>();
    private Text text = new Text();

    @Override
    protected void setup(Mapper<LongWritable, Text, Text, NullWritable>.Context context) throws IOException, InterruptedException {
        // 获取作业提交时传递的文件
        URI[] files = context.getCacheFiles();

        if (files != null && files.length > 0) {
            // 读取文件内容
            File configFile = new File("white.txt"); // 文件名要与传递的文件名保持一致
            try (BufferedReader reader = new BufferedReader(new FileReader(configFile))){
                String line = null;
                while ((line = reader.readLine()) != null) {
                    whiteList.add(line);
                }
            }
        }
    }

    @Override
    protected void map(LongWritable key,
                       Text value,
                       Mapper<LongWritable, Text, Text, NullWritable>.Context context) throws IOException, InterruptedException {

        String line = value.toString();
        String[] datas = line.split("\\s+");

        List<String> whiteDatas = Arrays.stream(datas).filter(data -> whiteList.contains(data)).collect(Collectors.toList());

        for (String data : whiteDatas) {
            text.set(data);
            context.write(text , NullWritable.get());
        }
    }
}


public class ConfigJob {

    public static void main(String[] args) throws Exception {

        // 设置用户名
        System.setProperty("HADOOP_USER_NAME", "root");

        Configuration conf = new Configuration();

        conf.set("fs.defaultFS", "hdfs://xx.xx.xx.xx:8020");

        GenericOptionsParser optionParser = new GenericOptionsParser(conf, args);
        String[] remainingArgs = optionParser.getRemainingArgs();

        if (remainingArgs.length < 2) {
            throw new RuntimeException("input and output path must set.");
        }

        Path outputPath = new Path(remainingArgs[1]);
        FileSystem fileSystem = FileSystem.get(conf);
        boolean exists = fileSystem.exists(outputPath);
        // 如果目标目录存在,则删除
        if (exists) {
            fileSystem.delete(outputPath, true);
        }

        Job job = Job.getInstance(conf, "MRConfig");
        job.setJarByClass(ConfigJob.class);
        job.setOutputKeyClass(Text.class);
        job.setOutputValueClass(NullWritable.class);
        job.setMapperClass(ConfigMapper.class);
        FileInputFormat.addInputPath(job, new Path(remainingArgs[0]));
        FileOutputFormat.setOutputPath(job, new Path(remainingArgs[1]));
        System.exit(job.waitForCompletion(true) ? 0 : 1);
    }
}

Dolphinscheduler MR使用

Yarn test队列设置

YARN 的配置目录中找到 capacity-scheduler.xml 文件。通常位于 $HADOOP_HOME/etc/hadoop/ 目录下。

修改 capacity-scheduler.xml

<property>
    <name>yarn.scheduler.capacity.root.queues</name>
    <value>default, test</value>
</property>

<property>
    <name>yarn.scheduler.capacity.root.test.capacity</name>
    <value>30</value>
</property>

<property>
    <name>yarn.scheduler.capacity.root.test.maximum-capacity</name>
    <value>50</value>
</property>

<property>
    <name>yarn.scheduler.capacity.root.test.user-limit-factor</name>
    <value>1</value>
</property>

刷新队列配置 yarn rmadmin -refreshQueues

流程定义设置

file

执行结果

file

离线任务实例

file

YARN作业展示

file

源码分析

org.apache.dolphinscheduler.plugin.task.mr.MapReduceArgsUtils#buildArgs

String others = param.getOthers();
// TODO 这里其实就是想说,没有通过 -D mapreduce.job.queuename 形式指定队列,是用页面上直接指定队列名称的,页面上 Yarn队列 输入框
if (StringUtils.isEmpty(others) || !others.contains(MR_YARN_QUEUE)) {
    String yarnQueue = param.getYarnQueue();
    if (StringUtils.isNotEmpty(yarnQueue)) {
        args.add(String.format("%s%s=%s", D, MR_YARN_QUEUE, yarnQueue));
    }
}

// TODO 这里就是页面上,选项参数 输入框
// -conf -archives -files -libjars -D
if (StringUtils.isNotEmpty(others)) {
    args.add(others);
}

转载自Journey 原文链接:https://segmentfault.com/a/1190000045403915

本文由 白鲸开源科技 提供发布支持!

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/2243185.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

Debezium-MySqlConnectorTask

文章目录 概要整体架构流程技术名词解释技术细节小结 概要 MySqlConnectorTask&#xff0c;用于读取MySQL的二进制日志并生成对应的数据变更事件 整体架构流程 技术名词解释 数据库模式&#xff08;Database Schema&#xff09; 数据库模式是指数据库中数据的组织结构和定义&…

剧本杀门店预约小程序,解锁沉浸式推理体验

一、开发背景 剧本杀作为一种热门娱乐游戏&#xff0c;深受大众的欢迎&#xff0c;但随着市场的快速发展&#xff0c;竞争也在不断加大&#xff0c;对于剧本杀线下商家来说面临着发展创新。 剧本杀线下门店数量目前正在逐渐增加&#xff0c;竞争激烈&#xff0c;而门店的获客…

今天你学C++了吗——C++启航之入门知识

♥♥♥~~~~~~欢迎光临知星小度博客空间~~~~~~♥♥♥ ♥♥♥零星地变得优秀~也能拼凑出星河~♥♥♥ ♥♥♥我们一起努力成为更好的自己~♥♥♥ ♥♥♥如果这一篇博客对你有帮助~别忘了点赞分享哦~♥♥♥ ♥♥♥如果有什么问题可以评论区留言或者私信我哦~♥♥♥ ✨✨✨✨✨✨ 个…

【element-tiptap】Tiptap编辑器核心概念----结构篇

core-concepts 前言&#xff1a;这篇文章来介绍一下 Tiptap 编辑器的一些核心概念 &#xff08;一&#xff09;结构 1、 Schemas 定义文档组成方式。一个文档就是标题、段落以及其他的节点组成的一棵树。 每一个 ProseMirror 的文档都有一个与之相关联的 schema&#xff0c;…

LC69---219存在重复元素(滑动窗口)---Java版

1.题目描述 2.思路 3.代码实现 public class Solution { public boolean containsNearbyDuplicate(int[] nums, int k) {Map<Integer,Integer> m1new HashMap<>();// 1:0, 2:1,3:2,1:3 key存数组的值&#xff0c;value存索引&#xff0c;为getnum[i]做准备&am…

【C++】了解map和set及平衡二叉树和红黑树的原理

目录 ​编辑 一、关联式容器 二、 键值对 三、pair介绍 四、树形结构的关联式容器 4.1 set 4.2 map 4.3 multiset 4.4 multimaps 五、底层结构&#xff08;重点&#xff09; 5.1 AVL 树 5.1.1 AVL树的概念 5.1.2 AVL树节点的定义 5.1.3 AVL树的旋转 5.1.4 AVL树的…

LeetCode 力扣 热题 100道(五)最长回文子串(C++)

最长回文子串 给你一个字符串 s&#xff0c;找到 s 中最长的 回文子串。 回文性 如果字符串向前和向后读都相同&#xff0c;则它满足 回文性 子字符串子字符串 是字符串中连续的 非空 字符序列。 动态规划法 class Solution { public:string longestPalindrome(string s) {i…

Spring Boot汽车资讯:科技与汽车的新融合

摘要 随着信息技术在管理上越来越深入而广泛的应用&#xff0c;管理信息系统的实施在技术上已逐步成熟。本文介绍了汽车资讯网站的开发全过程。通过分析汽车资讯网站管理的不足&#xff0c;创建了一个计算机管理汽车资讯网站的方案。文章介绍了汽车资讯网站的系统分析部分&…

逆向攻防世界CTF系列41-EASYHOOK

逆向攻防世界CTF系列41-EASYHOOK 看题目是一个Hook类型的&#xff0c;第一次接触&#xff0c;虽然学过相关理论&#xff0c;可以看我的文章 Hook入门(逆向)-CSDN博客 题解参考&#xff1a;https://www.cnblogs.com/c10udlnk/p/14214057.html和攻防世界逆向高手题之EASYHOOK-…

【网络】HTTP 协议

目录 基本概念基于 HTTP 的系统组成HTTP 的基本性质 HTTP 请求头 & 响应头HTTP 的请求方法HTTP 的返回码HTTP 的 CookieHTTP 缓存 Cache-Control会话HTTP/1.x 的连接管理 基本概念 HTTP&#xff08;Hypertext Transfer Protocol&#xff0c;超文本传输协议&#xff09;是一…

执行flink sql连接clickhouse库

手把手教学&#xff0c;flink connector打通clickhouse大数据库&#xff0c;通过下发flink sql&#xff0c;来使用ck。 组件版本jdk1.8flink1.17.2clickhouse23.12.2.59 1.背景 flink官方不支持clickhouse连接器&#xff0c;工作中难免会用到。 2.方案 利用GitHub大佬提供…

笔记|M芯片MAC (arm64) docker上使用 export / import / commit 构建amd64镜像

很简单的起因&#xff0c;我的东西最终需要跑在amd64上&#xff0c;但是因为mac的架构师arm64&#xff0c;所以直接构建好的代码是没办法跨平台运行的。直接在arm64上pull下来的docker镜像也都是arm64架构。 检查镜像架构&#xff1a; docker inspect 8135f475e221 | grep Arc…

免费送源码:Java+Springboot+MySQL Springboot多租户博客网站的设计 计算机毕业设计原创定制

Springboot多租户博客网站的设计 摘 要 博客网站是当今网络的热点&#xff0c;博客技术的出现使得每个人可以零成本、零维护地创建自己的网络媒体&#xff0c;Blog站点所形成的网状结构促成了不同于以往社区的Blog文化&#xff0c;Blog技术缔造了“博客”文化。本文课题研究的“…

代码随想录第46期 单调栈

这道题主要是单调栈的简单应用 class Solution { public:vector<int> dailyTemperatures(vector<int>& T) {vector<int> result(T.size(),0);stack<int> st;st.push(0);for(int i1;i<T.size();i){if(T[i]<T[st.top()]){st.push(i);}else{wh…

【Three.js基础学习】24. shader patterns

前言 课程回顾: ShaderMaterial 这里用的是着色器材质 所以顶点和片段着色器就不需要像原始着色器那样添加需要的属性 然后写 片段着色器需要属性 &#xff1a; 顶点 属性 -》变化 -》 片段中 顶点中的属性不需要声明 只需要声明传送的变量 例如 varying vec vUv; vUv uv; 补充…

力扣整理版七:二叉树(待更新)

满二叉树&#xff1a;如果一棵二叉树只有度为0的结点和度为2的结点&#xff0c;并且度为0的结点在同一层上&#xff0c;则这棵二叉树为满二叉树。深度为k&#xff0c;有2^k-1个节点的二叉树。 完全二叉树&#xff1a;在完全二叉树中&#xff0c;除了最底层节点可能没填满外&am…

173. 二叉搜索树迭代器【 力扣(LeetCode) 】

文章目录 零、原题链接一、题目描述二、测试用例三、解题思路四、参考代码 零、原题链接 173. 二叉搜索树迭代器 一、题目描述 实现一个二叉搜索树迭代器类BSTIterator &#xff0c;表示一个按中序遍历二叉搜索树&#xff08;BST&#xff09;的迭代器&#xff1a; BSTIterato…

自动驾驶系列—深入解析自动驾驶车联网技术及其应用场景

&#x1f31f;&#x1f31f; 欢迎来到我的技术小筑&#xff0c;一个专为技术探索者打造的交流空间。在这里&#xff0c;我们不仅分享代码的智慧&#xff0c;还探讨技术的深度与广度。无论您是资深开发者还是技术新手&#xff0c;这里都有一片属于您的天空。让我们在知识的海洋中…

STL序列式容器之list

相较于vector的连续性空间&#xff0c;list相对比较复杂&#xff1b;list内部使用了双向环形链表的方式对数据进行存储&#xff1b;list在增加元素时&#xff0c;采用了精准的方式分配一片空间对数据及附加指针等信息进行存储&#xff1b; list节点定义如下 template<clas…

Element-ui Select选择器自定义搜索方法

效果图 具体实现 <template><div class"home"><el-selectref"currencySelect"v-model"currency"filterable:spellcheck"false"placeholder"请选择":filter-method"handleCurrencyFilter"change&q…