DolphinScheduler自身容错导致的服务器持续崩溃重大问题的排查与解决

news2025/1/13 20:53:57

file

01 问题复现

在DolphinScheduler中有如下一个Shell任务:

current_timestamp() {  
    date +"%Y-%m-%d %H:%M:%S"  
}

TIMESTAMP=$(current_timestamp)
echo $TIMESTAMP
sleep 60

在DolphinScheduler将工作流执行策略设置为并行:

file

定时周期调度设置为10秒一次:

file

将定时调度上线后,会调度执行任务,此时一切正常:

file

此时将Master节点给kill掉,模拟宕机:

$ jps
1979710 AlertServer
1979626 WorkerServer
1979546 MasterServer
1979794 ApiApplicationServer
1980483 Jps
$ kill -9 1979546

去到DolphinScheduler中查看,发现Master已经不存在了:

file

此时观察DolphinScheduler工作流执行,发现其不会继续调度任务执行了,并且所有的任务则会一直执行下去,直到报错。

当过了一段时间后(模拟发现了宕机问题),此时重启DolphinScheduler:

sh bin/stop-all.sh
sh bin/start-all.sh

重启完成后,就会将之前没有执行成功的任务,包括没有执行的调度任务,全部都执行一次:

file

这就有一个致命的问题:如果都是高性能任务的话,就会导致CPU、内存被打满,从而让服务器整个宕机!!!

file

02 多场景测试

  • Master宕机后,重启整个DS:会产生上述问题。
  • Master宕机后,重启相应的Master:会产生上述问题。——有缺陷,官方没有单独的Master后台启动,只有前台启动的脚本,但可以重复执行start-all.sh。
  • Worker宕机后,重启整个DS:不会产生上述问题。——因为Master会持续的调度任务,而Worker宕机后的结果就是调度任务直接失败。
  • Worker宕机后,重启相应的Worker:不会产生上述问题。——有缺陷,官方没有单独的Worker后台启动,只有前台启动的脚本,但可以重复执行start-all.sh。
  • DS整个宕机后,重启整个DS:会产生上述问题。
  • DS使用stop-all.sh停止后,重启整个DS:会产生上述问题。

其核心就是在于Master,只要配置了周期任务,无论Master是宕机还是调用脚本关闭的,其都会产生上述问题。

03 原理分析

DolphinScheduler核心角色:

  • MasterServer主要负责 DAG 任务切分、任务提交监控,并同时监听其它MasterServer和WorkerServer的健康状态。MasterServer服务启动时向Zookeeper注册临时节点,通过监听Zookeeper临时节点变化来进行容错处理。
  • WorkerServer主要负责任务的执行和提供日志服务。WorkerServer服务启动时向Zookeeper注册临时节点,并维持心跳。
  • ApiServer主要负责处理前端UI层的请求。

大致的任务运行流程如下:

  • 在API-Server中创建任务,并将元数据持久化到DB中。

  • 通过手动点击或定时执行生成一个触发工作流执行的Command写入DB。

  • Master消费DB中的Command,开始执行工作流,并将工作流中的任务分发给Worker执行。

  • 当整个工作流执行结束之后,Master结束工作流的执行。

file

参考官网,上述的DolphinScheduler核心任务执行流程可以细化为如下:

file

鉴于任务调度的复杂性,一个大的流程可以划分为小的流程,在主线流程之外还附加了支线流程,下面对执行调度流程拆分进行分析一下,这样更容易理解:

file

在本次问题中,主要关注的就是Command分发流程。其Command分发流程是一个异步分布式生产消费模式。

i. 首先是生产者api-server,会将用户的运行工作流http请求封装成command数据,insert到t_ds_command表中,如下是一个启动工作流实例的command样例(老版本):

{
    "commandType": "START_PROCESS",
    "processDefinitionCode": 14285512555584,
    "executorId": 1,
    "commandParam": "{}",
    "taskDependType": "TASK_POST",
    "failureStrategy": "CONTINUE",
    "warningType": "NONE",
    "startTime": 1723444881372,
    "processInstancePriority": "MEDIUM",
    "updateTime": 1723444881372,
    "workerGroup": "default",
    "tenantCode": "default",
    "environmentCode": -1,
    "dryRun": 0,
    "processInstanceId": 0,
    "processDefinitionVersion": 1,
    "testFlag": 0
}

ii.其次是消费者,master server中的MasterSchedulerBootstrap loop程序, MasterSchedulerBootstrap使用ZK分配到自己的slot,从t_ds_command表中select属于slot的command列表处理,其查询语句是:

<select id="queryCommandPageBySlot" resultType="org.apache.dolphinscheduler.dao.entity.Command">
        select *
        from t_ds_command
        where id % #{masterCount} = #{thisMasterSlot}
        order by process_instance_priority, id asc
            limit #{limit}
</select>

iii.MasterSchedulerBootstrap loop轮训查到待处理的command任务,将command任务和master host生成ProcessInstance,将ProcessInstance对象插入到t_ds_process_instance表中, 同时生成包含运行所需要的上下文信息的可执行任务workflowExecuteRunnable。 将workflowExecuteRunnablecache到本地cache processInstanceExecCacheManager,同时生产将ProcessInstance的WorkflowEventType.START_WORKFLOW生产到workflowEventQueue队列中。

上面的步骤是用户在Web页面点击启动任务后的流程,而本次的问题是Master周期调度的问题。经过查阅资料,周期调度任务则是MasterServer将其封装为命令数据并插入t_ds_process_instance表中,后续步骤如上,大致流程如下:

  • 命令分发:以用户提交的工作流请求为触发,MasterServer将其封装为命令数据并插入数据库中。

  • 任务分配:MasterServer循环查询待处理的命令,依照负载情况将任务分配到对应的ProcessInstance中。

  • 任务执行:根据DAG的依赖关系,WorkerServer会优先执行无依赖的任务,然后根据优先级逐步执行其他任务。

  • 状态反馈:任务执行过程中,WorkerServer会定期回调MasterServer,通知任务的进展和执行状态。

所以,上述的问题就在这,当Master从停止到启动时,t_ds_command中会产生大量的任务数据。

在DolphinScheduler3.2.1中,其t_ds_command数据样例为:

id  |command_type|process_definition_code|process_definition_version|process_instance_id|command_param                        |task_depend_type|failure_strategy|warning_type|warning_group_id|schedule_time      |start_time         |executor_id|update_time        |process_instance_priority|worker_group|tenant_code|environment_code|dry_run|test_flag|
----+------------+-----------------------+--------------------------+-------------------+-------------------------------------+----------------+----------------+------------+----------------+-------------------+-------------------+-----------+-------------------+-------------------------+------------+-----------+----------------+-------+---------+
1988|           6|         15921642898976|                         4|                  0|{"schedule_timezone":"Asia/Shanghai"}|               2|               1|           0|               0|2024-12-11 00:36:40|2024-12-11 00:39:01|          2|2024-12-11 00:39:01|                        2|default     |default    |              -1|      0|        0|
1989|           6|         15921642898976|                         4|                  0|{"schedule_timezone":"Asia/Shanghai"}|               2|               1|           0|               0|2024-12-11 00:36:50|2024-12-11 00:39:01|          2|2024-12-11 00:39:01|                        2|default     |default    |              -1|      0|        0|
1990|           6|         15921642898976|                         4|                  0|{"schedule_timezone":"Asia/Shanghai"}|               2|               1|           0|               0|2024-12-11 00:37:00|2024-12-11 00:39:01|          2|2024-12-11 00:39:01|                        2|default     |default    |              -1|      0|        0|

而command_type的枚举由源码中的CommandType定义,其内容如下:

/**
 * command types
 * 0 start a new process
 * 1 start a new process from current nodes
 * 2 recover tolerance fault process
 * 3 recover suspended process
 * 4 start process from failure task nodes
 * 5 complement data
 * 6 start a new process from scheduler
 * 7 repeat running a process
 * 8 pause a process
 * 9 stop a process
 * 10 recover waiting thread
 * 11 recover serial wait
 * 12 start a task node in a process instance
 */
START_PROCESS(0, "start a new process"),
START_CURRENT_TASK_PROCESS(1, "start a new process from current nodes"),
RECOVER_TOLERANCE_FAULT_PROCESS(2, "recover tolerance fault process"),
RECOVER_SUSPENDED_PROCESS(3, "recover suspended process"),
START_FAILURE_TASK_PROCESS(4, "start process from failure task nodes"),
COMPLEMENT_DATA(5, "complement data"),
SCHEDULER(6, "start a new process from scheduler"),
REPEAT_RUNNING(7, "repeat running a process"),
PAUSE(8, "pause a process"),
STOP(9, "stop a process"),
RECOVER_WAITING_THREAD(10, "recover waiting thread"),
RECOVER_SERIAL_WAIT(11, "recover serial wait"),
EXECUTE_TASK(12, "start a task node in a process instance"),
DYNAMIC_GENERATION(13, "dynamic generation"),
;

那为什么会这样呢?本质上是Master自身的容错机制造成的,其容错机制可以细分为如下几个模块:

  • 1)Master自身的容错:如果是多Master同时运行,其中一个作为Active Master负责处理任务调度请求,其他节点作为Standby Master。当Active Master出现故障时,Standby Master将自动接管其工作,确保系统的正常运行。这是通过ZooKeeper实现的,ZooKeeper负责选举Active Master节点,并监控节点的状态。

  • 2)状态同步:多Master节点之间会进行状态同步,以确保在Active Master宕机时,Standby Master能够接管任务调度。

  • 3)故障恢复:当Master节点宕机后,其他Master节点会通过ZooKeeper的Watcher机制监听到这一事件,并触发故障恢复。

  • 4)正在运行任务的容错:当前Master节点宕机后,新Master会通过已下线的Master的地址和正在运行的工作流状态数组获取需要容错的ProcessInstance列表,之后将其放入t_ds_command表中(后续流程就是Master获取到并调度+Worker执行了)。

  • 5)分布式锁:在容错过程中,Master节点会使用ZK分布式锁+采用指定command表分配ID的形式来确保只有一个Master节点执行容错操作,避免多个Master节点同时接管同一个任务。

  • 6)定时容错线程:除了ZooKeeper的事件触发容错外,DolphinScheduler还实现了一个定时线程FailoverExecuteThread,用于Master重启后恢复自身之前的工作流实例。

  • 7)任务重试:DolphinScheduler还支持任务失败后的重试机制,这与服务宕机容错相辅相成,确保任务的最终执行成功。

所以,此时根据原理+复现可以初步推测出,是在Master启动时的某一个线程进行的定时容错,接下来就进入源码来真正验证一下。

04 源码解析

在org.apache.dolphinscheduler.server.master.MasterServer下,启动Master时会有run入口:

/**
 * run master server
 */
@PostConstruct
public void run() throws SchedulerException {
    // init rpc server
    this.masterRPCServer.start();
    // install task plugin
    this.taskPluginManager.loadPlugin();
    this.masterSlotManager.start();
    // self tolerant
    this.masterRegistryClient.start();
    this.masterRegistryClient.setRegistryStoppable(this);
    this.masterSchedulerBootstrap.start();
    this.eventExecuteService.start();
    this.failoverExecuteThread.start();
    this.schedulerApi.start();
    this.taskGroupCoordinator.start();
    MasterServerMetrics.registerMasterCpuUsageGauge(() -> {
        SystemMetrics systemMetrics = metricsProvider.getSystemMetrics();
        return systemMetrics.getTotalCpuUsedPercentage();
    });
    MasterServerMetrics.registerMasterMemoryAvailableGauge(() -> {
        SystemMetrics systemMetrics = metricsProvider.getSystemMetrics();
        return (systemMetrics.getSystemMemoryMax() - systemMetrics.getSystemMemoryUsed()) / 1024.0 / 1024 / 1024;
    });
    MasterServerMetrics.registerMasterMemoryUsageGauge(() -> {
        SystemMetrics systemMetrics = metricsProvider.getSystemMetrics();
        return systemMetrics.getJvmMemoryUsedPercentage();
    });
    Runtime.getRuntime().addShutdownHook(new Thread(() -> {
        if (!ServerLifeCycleManager.isStopped()) {
            close("MasterServer shutdownHook");
        }
    }));
}

通过上面的代码,可以看到Master启动执行了:

  • masterRPCServer.start():初始化并启动RPC服务器,用于节点间通信。

  • taskPluginManager.loadPlugin():加载任务插件,这些插件可以扩展DolphinScheduler的任务类型。

  • masterSlotManager.start():启动Master的Slot管理器,它负责管理Master的资源槽位,用于任务调度。

  • masterRegistryClient.start():启动Master的注册客户端,它负责将Master节点注册到分布式协调服务(如ZooKeeper)中。

  • masterRegistryClient.setRegistryStoppable(this):设置注册客户端的可停止对象,以便在Master停止时能够进行清理工作。

  • masterSchedulerBootstrap.start():启动Master的调度引导服务,它负责初始化调度相关的服务。

  • eventExecuteService.start():启动事件执行服务,它负责处理工作流中的事件,如任务状态变化。

  • failoverExecuteThread.start():启动故障恢复执行线程,它负责在Master宕机后恢复任务执行。

  • schedulerApi.start():启动调度API服务,提供调度相关的接口供外部调用。

  • taskGroupCoordinator.start():启动任务组协调器,它负责协调任务组内的任务执行。

经过源码探查,发现最关键的failoverExecuteThread不是重新执行未调度的周期任务,而是容错未执行完的任务。并且其他源码中也没有关于恢复周期任务调度的内容。

那现在需要换一个思路,就是从下往上走:

  1. 首先发现重启恢复后,Web页面上的“运行类型”是“调度执行”,而数据库的“command_type”是“6”,那就意味着必须有一个服务会有往数据库里面去插入command_type为6的方法。并且其会去获取t_ds_schedules表中的任务定时调度实例。

  2. 根据源码,排查到dolphinscheduler-dao项目下会存放所有的数据库操作DAO,遂可以找到ScheduleMapper类,此类是和t_ds_schedules相关的DAO类;之后根据t_ds_command反查,找到了CommandServiceImpl类中的createCommand方法;再根据两者反查+command_type为6,找到了ProcessScheduleTask类中的executeInternal方法。

  3. ProcessScheduleTask类中的executeInternal方法,同时满足:获取了调度任务、插入command数据、类型为6这三个条件。

  4. 查看ProcessScheduleTask的executeInternal源码,前半部分是从Quartz上下文中获取到预定义的调度时间和调度实际运行时间,下半部分是校验这个调度Cron是否存在和上线。

  5. 在executeInternal中,最关键的其实就是scheduledFireTime和fireTime。

找到这里的话,我们再结合DolphinScheduler+Quartz总结一下调度的原理:

  • Web页面设置调度,其会通过SchedulerController中的createSchedule()来创建调度,并往t_ds_schedules中插入一条数据;

  • Web页面设置调度上线,其会通过QuartzScheduler中的insertOrUpdateScheduleTask()向Quartz中创建Trigger触发器,并往QRTZ_CRON_TRIGGERS中插入一条数据;

  • 之后定期调用ProcessScheduleTask中的executeInternal()来往t_ds_command中插入数据;

  • 之后就是Master-Worker的执行流程了;

了解了大致的调度流程后,结合源码中的scheduledFireTime和fireTime,就可以推断出调度时间不是由DolphinScheduler设置的,而是由Quartz设置的。

那就继续查阅Quartz相关的资料,发现在Quartz中有一个misfire机制:周期性任务A需要在某个规定的时间执行,但是由于某种原因导致任务A未执行,称为MisFire。

而Quartz判断一个任务是MisFire,提供了一个配置项:org.quartz.jobStore.misfireThreshold,默认是60000ms(即60秒)。

misfire产生需要有2个前置条件:

  • 一个是job到达触发时间时没有被执行;

  • 二是被执行的延迟时间超过了Quartz配置的misfireThreshold阀值;

如果延迟执行的时间小于阀值,则Quartz不认为发生了misfire,立即执行job;如果延迟执行的时间大于或者等于阀值,则被判断为misfire,然后会按照指定的策略来执行。

那misfire产生的原因一般如下:

  • 当job达到触发时间时,所有线程都被其他job占用,没有可用线程。;

  • 在job需要触发的时间点,scheduler停止了(可能是意外停止的);【——当前的问题属于这种类型】

  • job使用了@DisallowConcurrentExecution注解,job不能并发执行,当达到下一个job执行点的时候,上一个任务还没有完成;

  • job指定了过去的开始执行时间,例如当前时间是8点00分00秒,指定开始时间为7点00分00秒;

而判定了任务是MisFire后,会有一个补偿机制,补偿机制只有在任务确认为MisFire状态后,才会被执行。补偿机制配置在Quartz源码的Trigger中:

public interface Trigger extends Serializable, Cloneable, Comparable<Trigger> {
    long serialVersionUID = -3904243490805975570L;
    int MISFIRE_INSTRUCTION_SMART_POLICY = 0;
    int MISFIRE_INSTRUCTION_IGNORE_MISFIRE_POLICY = -1;
    int DEFAULT_PRIORITY = 5;
    ......

但是这个补偿机制需要根据Trigger来判定,如下是不同的Trigger:

file

在DolphinScheduler中,各种类型的Trigg都会涉及到:

file

Trigger的类型:

  • SimpleTrigger是一个简单的触发器,用于执行重复任务。它可以指定一个起始时间,然后按照固定的间隔时间重复执行任务,直到达到指定的重复次数。SimpleTrigger的属性包括重复间隔(repeatInterval)和重复次数(repeatCount),实际执行次数是repeatCount + 1,因为在开始时间(startTime)时会执行一次。

  • CronTrigger:CronTrigger使用Cron表达式来定义复杂的调度计划。Cron表达式由6或7个空格分隔的时间字段组成,分别表示秒、分、小时、一个月中的日期、月份、一周中的日期和可选的年份。CronTrigger允许设定非常复杂的触发时间表,基本上覆盖了其他触发器的绝大部分能力。

  • CalendarIntervalTrigger:CalendarIntervalTrigger指定从某一个时间开始,以一定的时间间隔执行的任务。不同于SimpleTrigger只支持毫秒单位的时间间隔,CalendarIntervalTrigger支持的间隔单位有秒、分钟、小时、天、月、年。它适合的任务类似于每周执行一次。

  • DailyTimeIntervalTrigger:DailyTimeIntervalTrigger指定每天的某个时间段内,以一定的时间间隔执行任务。并且它可以支持指定星期。它适合的任务类似于每天9:00到18:00,每隔70秒执行一次,并且只要周一至周五执行。

  • ......

所以,因为不同的Trigger类型其参数是不一样的,所以当Trigger触发Misfire机制时,根据Trigger的不同,策略也会不同:

/**
 公共的Misfire机制,在Trigger类中
 **/
 // 这是一个智能策略,Quartz会根据Trigger的类型自动选择一个合适的misfire策略。对于CronTrigger,默认使用MISFIRE_INSTRUCTION_FIRE_ONCE_NOW。
int MISFIRE_INSTRUCTION_SMART_POLICY = 0;
// 这个策略会将所有错过的触发事件,立即执行所有补偿动作。即使定时任务执行的时间已经结束,它也会把所有应该执行的任务一次性全部执行完。
int MISFIRE_INSTRUCTION_IGNORE_MISFIRE_POLICY = -1;
/**
 SimpleTrigger的Misfire机制,在SimpleTrigger类中
 **/
 // 如果触发器错过了预定的触发时间,这个策略会立即执行一次任务,然后按照原计划继续执行后续的任务。
int MISFIRE_INSTRUCTION_FIRE_NOW = 1;
// 这个策略会将触发器的开始时间设置为当前时间,并立即执行错过的任务,包括已经错过的重复次数。
int MISFIRE_INSTRUCTION_RESCHEDULE_NOW_WITH_EXISTING_REPEAT_COUNT = 2;
// 类似于MISFIRE_INSTRUCTION_RESCHEDULE_NOW_WITH_EXISTING_REPEAT_COUNT,但是会忽略已经错过的触发次数,只执行剩余的重复次数。
int MISFIRE_INSTRUCTION_RESCHEDULE_NOW_WITH_REMAINING_REPEAT_COUNT = 3;
// 这个策略会忽略已经错过的触发次数,并在下一个预定的触发时间执行任务,执行剩余的重复次数。
int MISFIRE_INSTRUCTION_RESCHEDULE_NEXT_WITH_REMAINING_COUNT = 4;
// 类似于MISFIRE_INSTRUCTION_RESCHEDULE_NEXT_WITH_REMAINING_COUNT,但是会包括所有错过的重复次数。
int MISFIRE_INSTRUCTION_RESCHEDULE_NEXT_WITH_EXISTING_COUNT = 5;
/** 
CronTrigger的Misfire机制,在CronTrigger类中
**/
// 如果触发器错过了预定的触发时间,这个策略会立即执行一次任务,然后按照原计划继续执行后续的任务。
int MISFIRE_INSTRUCTION_FIRE_ONCE_NOW = 1;
// 对于CronTrigger,这个策略会忽略所有错过的触发事件,直接等待下一次预定的触发时间。
int MISFIRE_INSTRUCTION_DO_NOTHING = 2;
......

在QuartzScheduler的insertOrUpdateScheduleTask()中,用的只有CronTrigger,其源码如下:

CronTrigger cronTrigger = newTrigger()
        .withIdentity(triggerKey)
        .startAt(startDate)
        .endAt(endDate)
        .withSchedule(
                cronSchedule(cronExpression)
                        .withMisfireHandlingInstructionIgnoreMisfires()
                        .inTimeZone(DateUtils.getTimezone(timezoneId)))
        .forJob(jobDetail).build();
// 往下走
public CronScheduleBuilder withMisfireHandlingInstructionIgnoreMisfires() {
    this.misfireInstruction = -1;
    return this;
}

其补偿机制采用的-1编码,也就是会将所有错过的触发事件,立即执行所有补偿动作。所以此时就可以解释,为什么Master重启后,会将所有的未执行的周期任务,全部执行一次!!!

这个设置根据Trigger的不同,也可以分别设置不同的参数:

/**
 CronTrigger,引用CronScheduleBuilder中的设置
 **/
public CronScheduleBuilder withMisfireHandlingInstructionIgnoreMisfires() {
    this.misfireInstruction = -1;
    return this;
}
public CronScheduleBuilder withMisfireHandlingInstructionDoNothing() {
    this.misfireInstruction = 2;
    return this;
}
public CronScheduleBuilder withMisfireHandlingInstructionFireAndProceed() {
    this.misfireInstruction = 1;
    return this;
}
/**
 SimpleTrigger,引用SimpleScheduleBuilder的设置
**/
public SimpleScheduleBuilder withMisfireHandlingInstructionIgnoreMisfires() {
    this.misfireInstruction = -1;
    return this;
}
public SimpleScheduleBuilder withMisfireHandlingInstructionFireNow() {
    this.misfireInstruction = 1;
    return this;
}
public SimpleScheduleBuilder withMisfireHandlingInstructionNextWithExistingCount() {
    this.misfireInstruction = 5;
    return this;
}
public SimpleScheduleBuilder withMisfireHandlingInstructionNextWithRemainingCount() {
    this.misfireInstruction = 4;
    return this;
}
public SimpleScheduleBuilder withMisfireHandlingInstructionNowWithExistingCount() {
    this.misfireInstruction = 2;
    return this;
}
public SimpleScheduleBuilder withMisfireHandlingInstructionNowWithRemainingCount() {
    this.misfireInstruction = 3;
    return this;
}

05 解决方案

  • 将任务设置为“串行等待”。——可行,但无法发挥出大数据集群的并行化优势。并且有一个致命的缺陷,就是串行等待的任务无法在页面数手动停止,需要去到t_ds_process_instance中更改状态或删除数据。

file

  • Master HA:单机Master时,对Master设置守护,宕机后自动拉起(但也有无法拉起的时候);多机Master时,部署多台Master,使其可以实现HA。

  • DolphinScheduler监控告警:持续监控DolphinScheduler运行状态,当有角色宕机后,及时发出告警信息(会有半夜宕机而运维人员没有及时发现告警信息的情况)。

  • 设置DolphinScheduler的CPU和内存使用阈值:在配置文件中,默认的CPU和内存阈值是70%,以为着当服务器的CPU和内存占用达到了70%后,DolphinScheduler就不会在这台服务器上调度任务了。这种方式的好处是可以保证服务器资源不被打满,弊端是如果Master容错的旧任务打满了资源,那就会影响DolphinScheduler正常状态下的新任务了。并且有的任务是非常关键的任务,必须要跑成功的。

  • 设置DolphinScheduler的任务数:在配置文件中,DolphinScheduler默认的任务数是单Worker100个,单Master是1000个。而在现网中,无法对任务数去做到精细的控制,并且DolphinScheduler也无法做到自动调配。

  • 在宕机后重新启动前删除t_ds_command表中的数据:经过验证,Master在宕机后是不会往t_ds_command中写数据了。其会在重启启动后,将数据写到t_ds_command后执行,但其中的时间大概就1~2秒钟,手工无法去执行删除。

  • 修改t_ds_process_instance中的数据:根据时间周期,修改t_ds_process_instance中所有这个范围内的工作流的状态,人工使其结束(但如果DolphinScheduler和元数据库在一台服务器上,容易DolphinScheduler启动后里面把服务器资源打满,造成无法操作元数据库了)。

上面的解决方案主要是分为:

  • 避免或减少Master的宕机;

  • 在Master宕机后,不要运行MisFire的任务;

首先是“避免或减少Master宕机”,这在生产环境中是很难做到的,计算机程序的假设就是100%会在某一个时刻产生某些问题,所以才有了各种微服务架构、高可用HA、多活、容灾等等机制。

其次是“不要运行MisFire的任务”,依照前面的解决方案,没有一个方案能解决这个问题。所以,根据之前的源码解析,需要考虑采用源码修改+重新编译打包的方式进行解决。

06 修改源码

将关键源码修改为:

            CronTrigger cronTrigger = newTrigger()
                    .withIdentity(triggerKey)
                    .startAt(startDate)
                    .endAt(endDate)
                    .withSchedule(
                            cronSchedule(cronExpression)
                                    .withMisfireHandlingInstructionDoNothing()
//                                    .withMisfireHandlingInstructionIgnoreMisfires()
                                    .inTimeZone(DateUtils.getTimezone(timezoneId)))
                    .forJob(jobDetail).build();

07 开发环境验证

使用Java8进行。

更改Master、worker、API下的application.yaml中的MySQL链接信息:

spring:
  config:
    activate:
      on-profile: mysql
  datasource:
    driver-class-name: com.mysql.cj.jdbc.Driver
    url: jdbc:mysql://IP地址:3306/dolphinscheduler?useUnicode=true&characterEncoding=UTF-8&useSSL=false
    username: 账号
    password: 密码
  quartz:
    properties:
      org.quartz.jobStore.driverDelegateClass: org.quartz.impl.jdbcjobstore.StdJDBCDelegate

更改Master、Worker、API下的Zookeeper信息:

registry:
  type: zookeeper
  zookeeper:
    namespace: dolphinscheduler_dev
    connect-string: IP地址:2181
    retry-policy:
      base-sleep-time: 60ms
      max-sleep: 300ms
      max-retries: 5
    session-timeout: 30s
    connection-timeout: 9s
    block-until-connected: 600ms
    digest: ~

更改bom下面的pom:

            <dependency>
                <groupId>mysql</groupId>
                <artifactId>mysql-connector-java</artifactId>
                <version>${mysql-connector.version}</version>
<!--                <scope>test</scope>-->
            </dependency>

更改api、master、worker下的logback-spring.xml,开启运行日志:

    <root level="INFO">
<!--        <if condition="${DOCKER:-false}">-->
<!--            <then>-->
<!--                <appender-ref ref="STDOUT"/>-->
<!--            </then>-->
<!--        </if>-->
        <appender-ref ref="STDOUT"/>
        <appender-ref ref="APILOGFILE"/>
    </root>

    <root level="INFO">
<!--        <if condition="${DOCKER:-false}">-->
<!--            <then>-->
<!--                <appender-ref ref="STDOUT"/>-->
<!--            </then>-->
<!--        </if>-->
        <appender-ref ref="STDOUT"/>
        <appender-ref ref="TASKLOGFILE"/>
        <appender-ref ref="MASTERLOGFILE"/>
    </root>
    <root level="INFO">
<!--        <if condition="${DOCKER:-false}">-->
<!--            <then>-->
<!--                <appender-ref ref="STDOUT"/>-->
<!--            </then>-->
<!--        </if>-->
        <appender-ref ref="STDOUT"/>
        <appender-ref ref="TASKLOGFILE"/>
        <appender-ref ref="WORKERLOGFILE"/>
    </root>

启动 Master、Worker、Api:

  • Master VM Options:-Dlogging.config=classpath:logback-spring.xml -Ddruid.mysql.usePingMethod=false -Dspring.profiles.active=mysql

  • Worker VM Options:-Dlogging.config=classpath:logback-spring.xml -Ddruid.mysql.usePingMethod=false -Dspring.profiles.active=mysql

  • Api VM Options:-Dlogging.config=classpath:logback-spring.xml -Dspring.profiles.active=api,mysql

file

如果报错:

Error running 'ApiApplicationServer' Error running ApiApplicationServer. Command line is too long. Shorten the command line via JAR manifest or via a classpath file and rerun. 则加入:

file

如果还是报错缺少MySQL的JDBC驱动包,则在Master、Woker、API的Pom下面添加:

<dependency>
    <groupId>mysql</groupId>
    <artifactId>mysql-connector-java</artifactId>
    <version>8.0.33</version>
</dependency>

08 整体编译打包

需要注意,此时打包的项目,需要只是经过了《修改源码》环境的,不是进行了《开发环境验证》环节的!!!

使用Java8进行。

在项目根目录下执行命令,打包时间较长:

mvn spotless:apply clean package -Dmaven.test.skip=true -Prelease

打包后的二进制文件在dolphinscheduler-dist/target 下 bin.tar.gz 后缀文件。

之后就可以尝试重新部署,验证是否解决上面的问题了。

09 只编译单个模块

去到dolphinscheduler-scheduler-quartz根目录下,执行:

mvn spotless:apply clean package -Dmaven.test.skip=true -Prelease

打包后的文件在dolphinscheduler-scheduler-quartz/target目录下:

file

将其在服务器上进行替换:

su dolphinscheduler -

mv /opt/module/dolphinscheduler-3.2.1/master-server/libs/dolphinscheduler-scheduler-quartz-3.2.1.jar /opt/module/dolphinscheduler-3.2.1/master-server/libs/dolphinscheduler-scheduler-quartz-3.2.1.jar.bak
mv /opt/module/dolphinscheduler-3.2.1/api-server/libs/dolphinscheduler-scheduler-quartz-3.2.1.jar /opt/module/dolphinscheduler-3.2.1/api-server/libs/dolphinscheduler-scheduler-quartz-3.2.1.jar.bak

cp dolphinscheduler-scheduler-quartz-3.2.1.jar /opt/module/dolphinscheduler-3.2.1/master-server/libs/dolphinscheduler-scheduler-quartz-3.2.1.jar
cp dolphinscheduler-scheduler-quartz-3.2.1.jar /opt/module/dolphinscheduler-3.2.1/api-server/libs/dolphinscheduler-scheduler-quartz-3.2.1.jar

chown -R dolphinscheduler:dolphinscheduler /opt/module/dolphinscheduler-3.2.1/

之后就可以尝试重启DolphinScheduler,验证是否解决上面的问题了。

10 问题解决

再次进行问题复现,发现问题已经被解决了:

file file

至此,本次问题排查及修复完成。

本文由 白鲸开源科技 提供发布支持!

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/2276145.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

Python学习(三)基础入门(数据类型、变量、条件判断、模式匹配、循环)

目录 一、第一个 Python 程序1.1 命令行模式、Python 交互模式1.2 Python的执行方式1.3 SyntaxError 语法错误1.4 输入和输出 二、Python 基础2.1 Python 语法2.2 数据类型1&#xff09;Number 数字2&#xff09;String 字符串3&#xff09;List 列表4&#xff09;Tuple 元组5&…

LLM - Llama 3 的 Pre/Post Training 阶段 Loss 以及 logits 和 logps 概念

欢迎关注我的CSDN&#xff1a;https://spike.blog.csdn.net/ 本文地址&#xff1a;https://spike.blog.csdn.net/article/details/145056912 Llama 3 是 Meta 公司发布的开源大型语言模型&#xff0c;包括具有 80 亿和 700 亿参数的预训练和指令微调的语言模型&#xff0c;支持…

[RabbitMQ] RabbitMQ运维问题

&#x1f338;个人主页:https://blog.csdn.net/2301_80050796?spm1000.2115.3001.5343 &#x1f3f5;️热门专栏: &#x1f9ca; Java基本语法(97平均质量分)https://blog.csdn.net/2301_80050796/category_12615970.html?spm1001.2014.3001.5482 &#x1f355; Collection与…

MongoDB如何使用

1.简单介绍 MongoDB是一个开源、高性能、无模式的文档型数据库&#xff0c;当初的设计就是用于简化开发和方便扩展&#xff0c;是NoSQL数据库产品中的一种。是最 像关系型数据库&#xff08;MySQL&#xff09;的非关系型数据库。 MongoDB是一个基于分布式文件存储的数据库由C语…

【2024年华为OD机试】(C卷,100分)- 分割均衡字符串 (Java JS PythonC/C++)

一、问题描述 题目描述 均衡串定义&#xff1a;字符串中只包含两种字符&#xff0c;且这两种字符的个数相同。 给定一个均衡字符串&#xff0c;请给出可分割成新的均衡子串的最大个数。 约定&#xff1a;字符串中只包含大写的 X 和 Y 两种字符。 输入描述 输入一个均衡串…

React Fiber框架中的Commit提交阶段——commitMutationEffect函数

Render阶段 Render阶段可大致归为beginWork&#xff08;递&#xff09;和completeWork&#xff08;归&#xff09;两个阶段 1.beginWork流程&#xff08;递&#xff09; 建立节点的父子以及兄弟节点关联关系 child return sibling属性给fiber节点打上flag标记(当前节点的flag) …

【STM32-学习笔记-6-】DMA

文章目录 DMAⅠ、DMA框图Ⅱ、DMA基本结构Ⅲ、不同外设的DMA请求Ⅳ、DMA函数Ⅴ、DMA_InitTypeDef结构体参数①、DMA_PeripheralBaseAddr②、DMA_PeripheralDataSize③、DMA_PeripheralInc④、DMA_MemoryBaseAddr⑤、DMA_MemoryDataSize⑥、DMA_MemoryInc⑦、DMA_DIR⑧、DMA_Buff…

IoT平台在设备远程运维中的应用

IoT平台是物联网技术的核心组成部分&#xff0c;实现了设备、数据、应用之间的无缝连接与交互。通过提供统一的设备管理、数据处理、安全监控等功能&#xff0c;IoT平台为企业构建了智能化、可扩展的物联网生态系统。在设备远程运维领域&#xff0c;IoT平台发挥着至关重要的作用…

浅谈云计算05 | 云存储等级及其接口工作原理

一、云存储设备 在当今数字化飞速发展的时代&#xff0c;数据已然成为个人、企业乃至整个社会的核心资产。从日常生活中的珍贵照片、视频&#xff0c;到企业运营里的关键业务文档、客户资料&#xff0c;数据量呈爆炸式增长。面对海量的数据&#xff0c;如何安全、高效且便捷地存…

网络传输层TCP协议

传输层TCP协议 1. TCP协议介绍 TCP&#xff08;Transmission Control Protocol&#xff0c;传输控制协议&#xff09;是一个要对数据的传输进行详细控制的传输层协议。 TCP 与 UDP 的不同&#xff0c;在于TCP是有连接、可靠、面向字节流的。具体来说&#xff0c;TCP设置了一大…

【Linux系列】`find / -name cacert.pem` 文件搜索

&#x1f49d;&#x1f49d;&#x1f49d;欢迎来到我的博客&#xff0c;很高兴能够在这里和您见面&#xff01;希望您在这里可以感受到一份轻松愉快的氛围&#xff0c;不仅可以获得有趣的内容和知识&#xff0c;也可以畅所欲言、分享您的想法和见解。 推荐:kwan 的首页,持续学…

【论文笔记】Sign Language Video Retrieval with Free-Form Textual Queries

&#x1f34e;个人主页&#xff1a;小嗷犬的个人主页 &#x1f34a;个人网站&#xff1a;小嗷犬的技术小站 &#x1f96d;个人信条&#xff1a;为天地立心&#xff0c;为生民立命&#xff0c;为往圣继绝学&#xff0c;为万世开太平。 基本信息 标题: Sign Language Video Retr…

Observability:将 OpenTelemetry 添加到你的 Flask 应用程序

作者&#xff1a;来自 Elastic jessgarson 待办事项列表可以帮助管理与假期计划相关的所有购物和任务。使用 Flask&#xff0c;你可以轻松创建待办事项列表应用程序&#xff0c;并使用 Elastic 作为遥测后端&#xff0c;通过 OpenTelemetry 对其进行监控。 Flask 是一个轻量级…

网站目录权限加固

说明 在一个入侵链路中&#xff0c;往往是利用某个安全漏洞&#xff0c;向服务器写入或上传一个webshell&#xff08;后门&#xff09;&#xff0c;再通过webshell提权或进行后续渗透入侵行为。 这个过程中&#xff0c;获取webshell是最关键最重要的一个步骤&#xff0c;如能在…

qt QPainter setViewport setWindow viewport window

使用qt版本5.15.2 引入viewport和window目的是用于实现QPainter画出来的内容随着窗体伸缩与不伸缩两种情况&#xff0c;以及让QPainter在widget上指定的区域(viewport)进行绘制/渲染&#xff08;分别对应下方demo1&#xff0c;demo2&#xff0c;demo3&#xff09;。 setViewpo…

一些计算机零碎知识随写(25年1月)-1

我原以为世界上有技术的那批人不会那么闲&#xff0c;我错了&#xff0c;被脚本真实了。 今天正隔着画画呢&#xff0c;手机突然弹出几条安全告警通知。 急忙打开服务器&#xff0c;发现问题不简单&#xff0c;直接关服务器重装系统..... 首先&#xff0c;不要认为小网站&…

分布式锁Redisson详解,Redisson如何解决不可重入,不可重试,超时释放,主从一致问题的分析解决(包括源码简单分析)

目录 1. Redisson解决不可重入锁导致的死锁问题 2. 不可重试问题 Pub/Sub 的优势 锁释放的发布逻辑 3. 超时释放的问题 1. 锁的超时释放机制背景 2. 源码分析 2.1 锁的获取 2.2 看门狗机制 2.3 看门狗续期实现 2.4 手动设置锁的过期时间 总结 4. 主从一致性 问题…

【微服务】面试 4、限流

微服务限流技术总结 一、微服务业务面试题引入 在微服务业务面试中&#xff0c;限流是重要考点&#xff0c;常与分布式事务、分布式服务接口幂等解决方案、分布式任务调度等一同被考查。面试官一般会询问项目中是否实施限流及具体做法&#xff0c;回答需涵盖限流原因、采用的方…

爬虫基础之爬取歌曲宝歌曲批量下载

声明&#xff1a;本案列仅供学习交流使用 任何用于非法用途均与本作者无关 需求分析: 网站:邓紫棋-mp3在线免费下载-歌曲宝-找歌就用歌曲宝-MP3音乐高品质在线免费下载 (gequbao.com) 爬取 歌曲名 歌曲 实现歌手名称下载所有歌曲 本案列所使用的模块 requests (发送…

树莓派-5-GPIO的应用实验之GPIO的编码方式和SDK介绍

文章目录 1 GPIO编码方式1.1 管脚信息1.2 使用场合1.3 I2C总线1.4 SPI总线2 RPI.GPIO2.1 PWM脉冲宽度调制2.2 静态函数2.2.1 函数setmode()2.2.2 函数setup()2.2.3 函数output()2.2.4 函数input()2.2.5 捕捉引脚的电平改变2.2.5.1 函数wait_for_edge()2.2.5.2 函数event_detect…