精细调度:Apache DolphinScheduler脚本深度解析

news2025/1/22 9:15:43

在现代数据处理和工作流管理中,Apache DolphinScheduler以其灵活性和强大的调度能力受到开发者的广泛欢迎。

file

本文将逐步解析DolphinScheduler的关键脚本,希望能提供一个详尽的操作指南,帮助大家掌握安装、配置和操作的每一步。

建立在./bin/env/下目录的配置文件建立好的前提。

安装流程解析

./install.sh
  • 通过source获取install_env.sh和dolphinscheduler_env.sh中的环境变量,如master、worker的基本信息。
  • 在当前机器上创建安装目录,并给目录授权。
  • 向其他节点发送dolphinscheduler的解压文件
  • 停止所有的服务
  • 删除zk上的dolphinscheduler根节点
  • 启动dolphinscheduler所有的服务。

拷贝文件到工作节点

    workDir=`dirname $0`
    workDir=`cd ${workDir};pwd`

    source ${workDir}/env/install_env.sh
    # 获取workers=${workers:-"ds1:default,ds2:default,ds3:default,ds4:default,ds5:default"}
    # 获取数组
    workersGroup=(${workers//,/ })
    # 顺序取数组中的值
    for workerGroup in ${workersGroup[@]}
    do
    # 比如:ds1:default
      echo $workerGroup;
      # 获取worker的ip
      worker=`echo $workerGroup|awk -F':' '{print $1}'`
      # 获取worker对应ip的组,默认为default
      group=`echo $workerGroup|awk -F':' '{print $2}'`
      # 将ip放置到一个集合
      workerNames+=($worker)
      # 组放到一个集合
      groupNames+=(${group:-default})
    done
    # 获取需要安装的机器ip: ips=${ips:-"ds1,ds2,ds3,ds4,ds5"}
    hostsArr=(${ips//,/ })
    # 开始遍历所有需要安装的机器
    for host in ${hostsArr[@]}
    do
      # 连接目标ip,验证安装目录是否存在,如果不存在,则会进行文件夹的创建,因此,需要事先创建好ssh免密登录
      if ! ssh -o StrictHostKeyChecking=no -p $sshPort $host test -e $installPath; then
      # 创建安装目录 比如:/home/dolphinscheduler/apache-dolphinscheduler
        ssh -o StrictHostKeyChecking=no -p $sshPort $host "sudo mkdir -p $installPath; sudo chown -R $deployUser:$deployUser $installPath"
      fi

      如果当前机器时server-worker的机器
      echo "scp dirs to $host/$installPath starting"
      for i in ${!workerNames[@]}; do
        if [[ ${workerNames[$i]} == $host ]]; then
          workerIndex=$i
          break
        fi
      done

      # 这里表示用给定的组去替换default这个字符串,不过配置文件中,默认是不存在这个值的,暂时不用管
      # set worker groups in application.yaml
      [[ -n ${workerIndex} ]] && sed -i "s/- default/- ${groupNames[$workerIndex]}/" $workDir/../worker-server/conf/application.yaml

    # 将相关的七个文件都拷贝到安装目录下。
      for dsDir in bin master-server worker-server alert-server api-server ui tools
      do
        echo "start to scp $dsDir to $host/$installPath"
        # Use quiet mode to reduce command line output
        scp -q -P $sshPort -r $workDir/../$dsDir  $host:$installPath
      done
      # restore worker groups to default
      [[ -n ${workerIndex} ]] && sed -i "s/- ${groupNames[$workerIndex]}/- default/" $workDir/../worker-server/conf/application.yaml

      echo "scp dirs to $host/$installPath complete"
    done

Zookeeper上根节点的删除

执行命令:

bash ${workDir}/remove-zk-node.sh $zkRoot

具体的脚本细节:

print_usage(){
        printf $"USAGE:$0 rootNode\n"
        exit 1
}
# 如果启动参数的个数不等于1,就会报错,
if [ $# -ne 1 ];then
        print_usage
fi
# 获取zk上的rootNode: /dolphinscheduler
rootNode=$1

# 获取当前 remove-zk-node.sh 脚本的目录,bin
BIN_DIR=`dirname $0`
BIN_DIR=`cd "$BIN_DIR"; pwd`
# 获取dolphin的根目录,可能是安装目录,可能是包目录
DOLPHINSCHEDULER_HOME=$BIN_DIR/..

# 刷新环境变量
source ${BIN_DIR}/env/install_env.sh
source ${BIN_DIR}/env/dolphinscheduler_env.sh

# 获取java环境
export JAVA_HOME=$JAVA_HOME

# 设置配置文件目录,不过不存在配置文件目录
export DOLPHINSCHEDULER_CONF_DIR=$DOLPHINSCHEDULER_HOME/conf
# 获取需要的lib包
export DOLPHINSCHEDULER_LIB_JARS=$DOLPHINSCHEDULER_HOME/api-server/libs/*

# 下面就是具体的执行命令:
export DOLPHINSCHEDULER_OPTS="-Xmx1g -Xms1g -Xss512k -XX:+DisableExplicitGC -XX:+UseConcMarkSweepGC -XX:+CMSParallelRemarkEnabled -XX:LargePageSizeInBytes=128m -XX:+UseCMSInitiatingOccupancyOnly -XX:CMSInitiatingOccupancyFraction=70 "
export STOP_TIMEOUT=5

CLASS=org.apache.zookeeper.ZooKeeperMain

exec_command="$DOLPHINSCHEDULER_OPTS -classpath $DOLPHINSCHEDULER_CONF_DIR:$DOLPHINSCHEDULER_LIB_JARS $CLASS -server $REGISTRY_ZOOKEEPER_CONNECT_STRING rmr $rootNode"

cd $DOLPHINSCHEDULER_HOME
$JAVA_HOME/bin/java $exec_command

# 下面来看下这个具体的执行命令是什么?
/bin/java -Xmx1g -Xms1g -Xss512k 
-XX:+DisableExplicitGC -XX:+UseConcMarkSweepGC 
-XX:+CMSParallelRemarkEnabled -XX:LargePageSizeInBytes=128m 
-XX:+UseCMSInitiatingOccupancyOnly -XX:CMSInitiatingOccupancyFraction=70 
# 类路径下的参数
-classpath /conf:/api-server/libs/* 
# 启动的主要类
org.apache.zookeeper.ZooKeeperMain
# 启动的相关参数,这个是zookeeper自身定义的东西,参数需要查看zookeepeer的类
# 主要是zookeeper的连接信息,主要从 dophinscheduler_env.sh 脚本中获取的变量:export REGISTRY_ZOOKEEPER_CONNECT_STRING=${REGISTRY_ZOOKEEPER_CONNECT_STRING:-localhost:2181}
-server localhost:2181 rmr /dolphinscheduler

整体启停流程解析

# 一键开启集群所有服务
bash ./bin/start-all.sh
# 一键关闭集群所有服务
bash ./bin/stop-all.sh

启动过程如下:

  • 通过source获取install_env.sh中的变量,及api-server、master-server、worker-server、alert-server这几个dolphin中节点的基本部署信息。
  • 通过ssh在各个节点上使用 dolphinscheduler-daemon.sh 命令对各个服务做启停。
  • 启停顺序为 master-server、worker-server、alert-server、api-server。
  • 启动时最后会通过 status-all.sh 对各个服务的状态做查询。

节点状态查询解析

根据作者本人所了解的,获取服务的状态一般通过两种方式:

  • 启动服务时,将服务的进程id写入到文件中,通常在/var/run/目录中,当然,也可以自定义目录。
  • 通过ps命令获取对应的进程id。

当然,Apache DolphinScheduler也是通过写进程文件pid的方式来获取进程id来查询服务状态和停止服务的。

如何单节点启停以及状态查询

在海豚调度的整个启动,停止,状态查询中,最终所用到的脚本是 dolphinscheduler-daemon.sh

有时候,因为某种原因,可能导致Apache DolphinScheduler集群中某一个服务挂掉,不可能通过start-all.sh命令来操作所有,同时,在集群各个服务的扩缩容中,也需要单节点启动,因此合理使用该脚本就比较重要。

命令的使用规范:

dolphinscheduler-daemon.sh (start|stop|status) <api-server|master-server|worker-server|alert-server|standalone-server>

脚本解析

dolphinscheduler-daemon.sh 脚本

# 这是一个用法的示例
usage="Usage: dolphinscheduler-daemon.sh (start|stop|status) <api-server|master-server|worker-server|alert-server|standalone-server> "

# 携带的参数必须是两个,如果是一个就会报错
# if no args specified, show usage
if [ $# -le 1 ]; then
  echo $usage
  exit 1
fi

# 执行的命令
startStop=$1
# shift相当于是将$2 变成$1,
shift
# 执行的任务类型
command=$1
shift

echo "Begin $startStop $command......"

BIN_DIR=`dirname $0`
BIN_DIR=`cd "$BIN_DIR"; pwd`
# 获取安装路径的家目录,注意,因为执行的时候,cd 到了installPath
DOLPHINSCHEDULER_HOME=`cd "$BIN_DIR/.."; pwd`
# 获取dolphin的环境变量,为下面的环境变量覆盖做帮助
BIN_ENV_FILE="${DOLPHINSCHEDULER_HOME}/bin/env/dolphinscheduler_env.sh"

# 这段话的意思就是使用/bin/env/dolphinscheduler_env.sh配置文件,代替每个服务下配置目录conf下的配置文件
# Overwrite server dolphinscheduler_env.sh in path `<server>/conf/dolphinscheduler_env.sh` when exists
# `bin/env/dolphinscheduler_env.sh` file. User could only change `bin/env/dolphinscheduler_env.sh` instead
# of each server's dolphinscheduler_env.sh when they want to start the server

# 定义了一个函数,具体看后面的使用,覆盖环境变量
function overwrite_server_env() {
  local server=$1
  local server_env_file="${DOLPHINSCHEDULER_HOME}/${server}/conf/dolphinscheduler_env.sh"
  if [ -f "${BIN_ENV_FILE}" ]; then
    echo "Overwrite ${server}/conf/dolphinscheduler_env.sh using bin/env/dolphinscheduler_env.sh."
    cp "${BIN_ENV_FILE}" "${server_env_file}"
  else
    echo "Start server ${server} using env config path ${server_env_file}, because file ${BIN_ENV_FILE} not exists."
  fi
}

# 当前机器的hostname
export HOSTNAME=`hostname`
# 执行命令服务的日志文件
export DOLPHINSCHEDULER_LOG_DIR=$DOLPHINSCHEDULER_HOME/$command/logs
# 设置超时时间
export STOP_TIMEOUT=5

# 创建日志文件夹
if [ ! -d "$DOLPHINSCHEDULER_LOG_DIR" ]; then
  mkdir $DOLPHINSCHEDULER_LOG_DIR
fi

# 定义服务的启动进程文件
pid=$DOLPHINSCHEDULER_HOME/$command/pid

# 进入到服务的主目录
cd $DOLPHINSCHEDULER_HOME/$command
# 服务的运行日志,out日志
if [ "$command" = "api-server" ]; then
  log=$DOLPHINSCHEDULER_HOME/api-server/logs/$command-$HOSTNAME.out
elif [ "$command" = "master-server" ]; then
  log=$DOLPHINSCHEDULER_HOME/master-server/logs/$command-$HOSTNAME.out
elif [ "$command" = "worker-server" ]; then
  log=$DOLPHINSCHEDULER_HOME/worker-server/logs/$command-$HOSTNAME.out
elif [ "$command" = "alert-server" ]; then
  log=$DOLPHINSCHEDULER_HOME/alert-server/logs/$command-$HOSTNAME.out
elif [ "$command" = "standalone-server" ]; then
  log=$DOLPHINSCHEDULER_HOME/standalone-server/logs/$command-$HOSTNAME.out
else
  echo "Error: No command named '$command' was found."
  exit 1
fi

# 定义一个函数,获取服务的当前状态
state=""
function get_server_running_status() {
  state="STOP"
  if [ -f $pid ]; then
    TARGET_PID=`cat $pid`
    if [[ $(ps -p "$TARGET_PID" -o comm=) =~ "bash" ]]; then
      state="RUNNING"
    fi
  fi
}

# 使用case语句,根据情况做启动,停止,状态查看
case $startStop in
  (start)
    # if server is already started, cancel this launch
    # 如果服务已经启动,直接退出启动过程
    get_server_running_status
    if [[ $state == "RUNNING" ]]; then
      echo "$command running as process $TARGET_PID.  Stop it first."
      exit 1
    fi
    # 开始做启动
    echo starting $command, logging to $DOLPHINSCHEDULER_LOG_DIR
    # 覆盖配置文件
    overwrite_server_env "${command}"
    # 执行具体的命令,输入到日志文件,并将标准输出2重定向到标准输出1
    nohup /bin/bash "$DOLPHINSCHEDULER_HOME/$command/bin/start.sh" > $log 2>&1 &
    echo $! > $pid
    ;;

# 停止服务,通过kill命令
  (stop)
      if [ -f $pid ]; then
        TARGET_PID=`cat $pid`
        if kill -0 $TARGET_PID > /dev/null 2>&1; then
          echo stopping $command
          pkill -P $TARGET_PID
          sleep $STOP_TIMEOUT
          if kill -0 $TARGET_PID > /dev/null 2>&1; then
            echo "$command did not stop gracefully after $STOP_TIMEOUT seconds: killing with kill -9"
            pkill -P -9 $TARGET_PID
          fi
        else
          echo no $command to stop
        fi
        rm -f $pid
      else
        echo no $command to stop
      fi
      ;;

# 查询状态
  (status)
    get_server_running_status
    if [[ $state == "STOP" ]]; then
      #  font color - red
      state="[ \033[1;31m $state \033[0m ]"
    else
      # font color - green
      state="[ \033[1;32m $state \033[0m ]"
    fi
    echo -e "$command  $state"
    ;;

  (*)
    echo $usage
    exit 1
    ;;

启动脚本关键点说明

这里主要讲一点关于env中配置目录中的关键点,可以发现在dolphinscheduler_env.sh中有一些数据库方面的配置。如下:

# Database related configuration, set database type, username and password
export DATABASE=${DATABASE:-postgresql}
export SPRING_PROFILES_ACTIVE=${DATABASE}
export SPRING_DATASOURCE_URL
export SPRING_DATASOURCE_USERNAME
export SPRING_DATASOURCE_PASSWORD

比较了解SpringBoot的同学知道,JAVA的配置一般是来自于Yaml文件中的,因此对于一些初试用的同学对配置可能就比较迷惑。

打开spring的官网: https://docs.spring.io/spring-boot/docs/2.2.9.RELEASE/reference/htmlsingle/#boot-features-external-config, 我们可以看到有这样的描述:

SpringBoot使用一种非常特殊的PropertySource顺序,旨在允许合理地覆盖值。按以下顺序考虑属性:

  • $HOME/.config/spring-boot当 devtools 处于活动状态时,文件夹中的Devtools全局设置属性。
  • @TestPropertySource对你的测试进行注释。
  • properties测试的属性。可用于测试应用程序的特定部分的测试@SpringBootTest注释。
  • 命令行参数。
  • 来自SPRING_APPLICATION_JSON(嵌入环境变量或系统属性中的内联 JSON)的属性。
  • ServletConfig初始化参数。
  • ServletContext初始化参数。
  • JNDI 属性来自java:comp/env.
  • Java 系统属性 ( System.getProperties())。
  • 操作系统环境变量。
  • ARandomValuePropertySource仅在 中具有属性random.*。
  • 打包的 jar(和 YAML 变体)之外的特定于配置文件的应用程序属性application-{profile}.properties。
  • 特定于配置文件的应用程序属性打包在 jar(application-{profile}.properties和 YAML 变体)内。
  • 打包的 jar(和 YAML 变体)之外的应用程序属性application.properties。
  • 打包在 jar 内的应用程序属性application.properties(和 YAML 变体)。
  • @PropertySource类上的注释@Configuration。Environment请注意,在刷新应用程序上下文之前,不会将此类属性源添加到中。现在配置某些属性为时已晚,例如在刷新开始之前读取的logging.和。spring.main.
  • 默认属性(由设置指定SpringApplication.setDefaultProperties)。

其中就有操作系统环境变量,而使用方式就是大写和下划线作为分隔符,具体细节大家看链接的官网就明白了。

通过以上深入的脚本解析,开发者应能更加熟练地操纵Apache DolphinScheduler,从而提升数据工作流的效率和稳定性。随着技术的不断进步,了解并掌握这些基本的脚本操作对于保持技术竞争力是至关重要的。

本文由 白鲸开源科技 提供发布支持!

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/1495681.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

基于机器学习的垃圾分类

1绪论 1.1问题背景 垃圾分类有减少环境污染、节省土地资源、再生资源的利用、提高民众价值观念等的好处&#xff0c;在倡导绿色生活&#xff0c;注重环境保护的今天&#xff0c;正确的垃圾分类和处理对我们的生态环境显得尤为重要。 在国外很多国家&#xff0c;经过了几十年…

解决WordPress更新插件或者更新版本报WordPress 需要访问您网页服务器的权限的问题

文章目录 前言一、原因二、解决步骤总结 前言 当对WordPress的插件或者版本进行更新时报错&#xff1a;要执行请求的操作&#xff0c;WordPress 需要访问您网页服务器的权限。 请输入您的 FTP 登录凭据以继续。 如果您忘记了您的登录凭据&#xff08;如用户名、密码&#xff09…

JAVA 用二分法查找数组中是否存在某个值

二分法查找的概念 二分查找也称折半查找&#xff08;Binary Search&#xff09;&#xff0c;它是一种效率较高的查找方法。首先&#xff0c;将表中间位置记录的关键字与查找关键字比较&#xff0c;如果两者相等&#xff0c;则查找成功&#xff1b;否则利用中间位置记录将表分成…

每日一题——LeetCode1588.所有奇数长度子数组的和

方法一 暴力循环 遍历数组的每一个元素&#xff0c;找寻该元素所有连续长度为奇数的可能&#xff0c;累加奇数长度区间内的所有元素。 比如对于[1,4,2,5,3] &#xff0c;对于第一个元素1&#xff0c;有[1]、[1,4,2]、[1,4,2,5,3]这三种可能&#xff0c;对于第二个元素4&#x…

STP---生成树协议

STP的作用 a)Stp通过阻塞端口来消除环路&#xff0c;并能够实现链路备份目的 b)消除了广播风暴 c)物理链路冗余&#xff0c;网络变成了层次化结构的网络 STP操作 选举一个根桥每个非根交换机选举一个根端口每个网段选举一个指定端口阻塞非根&#xff0c;非指定端口 STP--生成树…

Java 数据结构之链表

public ListNode getIntersectionNode(ListNode headA, ListNode headB) {if (headA null || headB null) return null;ListNode pA headA, pB headB;while (pA ! pB) {pA pA null ? headB : pA.next;pB pB null ? headA : pB.next;}return pA;} public ListNode rev…

Linux cgrpup技术解析和验证测试

Linux cgrpup技术解析和验证测试 1. cgroup技术解析和分类1.1. 相关概念介绍1.2 cgroup子系统 2. cgroup子系统详解2.1 cpu子系统2.2 cpuacct子系统2.3 cpuset子系统2.4 memory子系统2.5 blkio子系统2.6 ns子系统 3. cgroup使用3.1 通用使用流程3.1.1 限制进程的cpu资源3.1.2 绑…

【开源项目】ollama:本地部署大模型

文章目录 1. 安装2. 使用体验2.1. 运行llama22.2. 运行llama2-chinese 项目地址&#xff1a;Github - ollama/ollama 注意&#xff1a;项目对硬盘容量、内存要求比较高。 1. 安装 从Github项目的最新release下载安装包&#xff0c;点击运行即可。 2. 使用体验 2.1. 运行ll…

中小企业如何降低网络攻击和数据泄露的风险?

德迅云安全收集了Bleeping Computer 网站消息&#xff0c; Arctic Wolf 表示 Akira 勒索软件组织的攻击目标瞄准了中小型企业&#xff0c;自 2023 年 3 月以来&#xff0c;该团伙成功入侵了多家组织&#xff0c;索要的赎金从 20 万美元到 400 多万美元不等&#xff0c;如果受害…

基于OpenCV的图形分析辨认05(补充)

目录 一、前言 二、实验内容 三、实验过程 一、前言 编程语言&#xff1a;Python&#xff0c;编程软件&#xff1a;vscode或pycharm&#xff0c;必备的第三方库&#xff1a;OpenCV&#xff0c;numpy&#xff0c;matplotlib&#xff0c;os等等。 关于OpenCV&#xff0c;num…

常见排序算法解析

芝兰生于深林&#xff0c;不以无人而不芳&#xff1b;君子修道立德&#xff0c;不为穷困而改节 文章目录 插入排序直接插入排序希尔排序 选择排序直接选择排序堆排序 交换排序冒泡排序快速排序优化挖坑法前后指针法非递归版 归并排序递归非递归 总结 插入排序 插入排序&#…

论文阅读笔记 | MetaIQA: Deep Meta-learning for No-Reference Image Quality Assessment

文章目录 文章题目发表年限期刊/会议名称论文简要动机主要思想或方法架构实验结果 文章链接&#xff1a;https://doi.org/10.48550/arXiv.2004.05508 文章题目 MetaIQA: Deep Meta-learning for No-Reference Image Quality Assessment 发表年限 2020 期刊/会议名称 Publi…

Unity性能优化篇(八) 导入的模型网格优化设置

模型导入Unity后&#xff0c;可以选中这个模型&#xff0c;在Inspector窗口设置它的属性。下面说的都是可自定义选择优化的地方 Model选择卡: 1.在Model选项卡&#xff0c;启用Mesh Compression可以压缩模型&#xff0c;压缩程度越高&#xff0c;模型精度越低&#xff0c;但是…

Docker前后端项目部署

目录 一、搭建项目部署的局域网 二、redis安装 三、MySQL安装 四、若依后端项目搭建 4.1 使用Dockerfile自定义镜像 五、若依前端项目搭建 一、搭建项目部署的局域网 搭建net-ry局域网&#xff0c;用于部署若依项目 docker network create net-ry --subnet172.68.0.0/1…

网络安全审计是什么意思?与等保测评有什么区别?

网络安全审计和等保测评在信息安全领域中都是非常重要的环节。但不少人对于这两者是傻傻分不清楚&#xff0c;今天我们就来简单聊聊网络安全审计是什么意思&#xff1f;与等保测评有什么区别&#xff1f; 网络安全审计是什么意思&#xff1f; 网络安全审计是通过对网络系统和网…

python 蓝桥杯填空题

文章目录 字母数判断列名&#xff08;进制问题&#xff09;特殊日期大乘积星期几 字母数 由于是填空题&#xff0c;那么寻找的话&#xff0c;就直接让每一个位置都是A,通过计算看看是不是结果大于2022即可 判断列名&#xff08;进制问题&#xff09; 这道题目&#xff0c;我们可…

Linux篇:基础IO

一 预备知识 1. 文件内容属性&#xff0c;内容与属性都是数据&#xff0c;都要在磁盘中保存。 2. 文件分为打开的文件和没打开的文件。 3. 进程在访问一个文件的时候&#xff0c;都是要先打开这个文件。打开文件之前&#xff0c;文件在磁盘&#xff0c;打开文件之后&#xff0…

RocketMQ架构详解

文章目录 概述RocketMQ架构Broker 高可用集群刷盘策略 概述 RocketMQ一个纯java、分布式、队列模型的开源消息中间件&#xff0c;前身是MetaQ&#xff0c;是阿里研发的一个队列模型的消息中间件&#xff0c;后开源给apache基金会成为了apache的顶级开源项目&#xff0c;具有高…

Mybatis-Spring | Mybatis与Spring的“整合“

目录 : 一、配置环境1. 整合环境需导入的JAR :Spring框架所需JARMybatis框架所需JARMyBatis与Spring整合的中间JAR数据库驱动JAR包数据源所需JAR包 &#xff08;下面的例子中 : 用的不是这个数据源&#xff09; 2. 编写“配置文件” 和 “.properties文件” ( 只是概述&#xf…

mysql从旧表 取出部分列并保存到新表几种方式介绍

在MySQL中&#xff0c;从旧表取出部分列并保存到新表有多种方式&#xff0c;主要包括以下几种&#xff1a; 1. 使用INSERT INTO ... SELECT语句&#xff1a; 这是最常用的方法。通过SELECT语句从旧表中选择需要的数据&#xff0c;然后使用INSERT INTO语句将数据…