XXL-JOB中断信号感知

news2025/2/24 11:58:51

目录

背景

思路

实现逻辑

总结


背景

  在使用xxl-job框架时,由于系统是由线程池去做异步逻辑,然后主线程等待,在控制台手动停止时,会出现异步线程不感知信号中断的场景,如下场景

而此时如果人工在控制台停止xxl-job执行,异步任务并不会感知到调度线程被interrupt了,上面3个异步任务仍旧执行,而主线程却退出了,如果此时再次调度该任务,而代码逻辑没做幂等,可能出现预期外的异常

思路

  先看看xxl-job trigger的时序图

原图plantuml

    @startuml
'https://plantuml.com/sequence-diagram

!theme plain
autonumber
hide footbox
skinparam ParticipantPadding 50
skinparam BoxPadding 150
title xxljob接口调用

actor "User" as user

box "xxl-job-admin" #LightGray
    participant "controller"  as controller
    participant "trigger" as trigger
    participant "executor-proxy" as proxy
    participant "adminBiz" as admin
end box

box "xxl-job-client" #LightGray
participant "executor"  as executor
participant "jobThread" as job
participant "callBackThread" as callback
participant "retryCallThread" as retryCallBack
end box


autonumber 1
user -> controller++:手动调度 /jobinfo/trigger
controller->trigger++: jobId/触发类型/参数
trigger->trigger:提交trigger任务
    group 异步流程
    trigger->trigger:根据jobId获取jobInfo
    trigger->trigger:获取执行器信息
    note left
    已注册的机器地址列表
    end note
    alt 分片广播
    loop
    trigger->trigger:遍历触发
    end loop
    else 其他
    trigger->trigger:单个触发
    end
    end group
    return 返回提交结果
    == 异步rpc触发==
    autonumber 1
        group 触发流程
        trigger->trigger:获取路由策略&阻塞策略
        trigger->trigger:根据路由策略获取需调度的机器地址
        trigger -> proxy++:获取执行器代理对象&缓存
        note left
        jdk代理+netty
        xxljob的log是客户端记录在本地文件
        admin调用时也通过代理调用远端接口
        end note
        proxy->executor:远程调用(传递触发信息)
        executor->executor:根据jobId获取执行线程
        executor->executor:获取job执行器
        alt 执行线程不为空
        executor->executor:根据阻塞策略处理
        end
        alt 执行线程为空
        executor->executor:新建job线程
        end
        executor->job++:把任务参数加入阻塞队列
        job->job:jobId去重
        return:返回结果
        return:返回结果
        end group
    == 异步jobThread ==
    autonumber 1
    job->job:执行handler init 方法
    loop toStop=false
    job->job:从阻塞队列中获取任务参数
    job->job:准备工作
    note left
    状态设置为运行中
    空闲次数=0
    去除jobId
    设置logFile&分片信息
    end note
   alt 超时时间>0
   job->job:新建线程处理handler信息
   else
   job->job:本线程处理handler信息
   end
   job->job:把执行结果or终止结果加入callback阻塞队列
    end loop
    job->job:清除阻塞队列里的待任务
    note left
    此时已经该线程已经被停止了
    end note
    == 异步callBackThread ==
  autonumber 1
  loop toStop=false
  callback->callback:从callback阻塞队列中获取callback参数
  alt 获取成功
  callback->callback:清空当前阻塞队列中的参数,并将其放到一个新的list
  loop 遍历admin列表
  callback->controller++:调用callback接口
  controller->admin:调用callback逻辑
  alt 任务处理成功
  admin->admin:获取job信息
  admin->admin:获取子任务信息
  loop 遍历子任务
  admin->trigger:提交trigger任务
  end loop
  admin->admin:更新job信息
  end
  return:返回回调结果
  callback->callback:记录日志到本地文件
  alt 回调失败
  callback->callback:记录序列化后的失败参数,用于重试
  end
  end loop
  end
  end loop
== 重试retryCallBack ==
autonumber 1
  loop toStop=false
    retryCallBack->retryCallBack:获取本地重试文件信息
    retryCallBack->retryCallBack:反序列化内容,重试callback请求
  end loop
@enduml

主要关注异步JobThread部分,可以看出是有个toStop的flag去感知这个中断信号的,那怎么去获取toStop的信息呢?这里可以通过起另一个线程去检查这个信号,如果为stop,则透传到异步task中,设计流程如下

原图plantuml

@startuml
'https://plantuml.com/sequence-diagram

!theme plain
autonumber
hide footbox
skinparam ParticipantPadding 50
skinparam BoxPadding 150
title xxljob接口调用

actor "xxl-job-admin" as user

box "xxl-job-client" #LightGray
    participant "xxl-client"  as client
    participant "xxl-main-thread"  as mainThread
    participant "check-interrupt-thread" as checkThread
    participant "async-task..." as asyncThread
end box



autonumber 1
user -> client++:手动调度 /jobinfo/trigger
client->client:加入任务队列
return
client-->mainThread:获取队列任务执行
mainThread->mainThread++:init
mainThread->checkThread++:定期检查mainThread的 stopFlag属性
loop
checkThread->checkThread:定期检查停止属性属性

end loop
mainThread->mainThread:初始化完毕
mainThread->asyncThread:分发任务
asyncThread-->asyncThread++:任务执行
mainThread->mainThread:等待子任务执行完成
user->client:手动中断任务
client->client:捞取jobId对应的线程
client->mainThread:调用暂停方法,interrupt,设置 stopFlag
mainThread-->client:返回暂停结果
mainThread->mainThread:等待执行中的子任务完成
checkThread->asyncThread:设置给子任务 stopFlag
asyncThread->asyncThread:业务逻辑判断 stopFlag
return:stop
mainThread->mainThread:等待检查线程完成
return:check-thread end
mainThread->mainThread:后置处理
return:stop
@enduml

即对于异步的任务,可以做一个封装,用于接受中断信号,而信息的传递则通过threadLocal复制的方式给到异步任务,主要是解决中断信号如何传递到异步任务的问题,异步任务可以通过某个方法来获取主线程是否中断

要点如下

  1. 感知xxl-job主线程的中断信号
  2. 传递中断信号到异步任务,异步任务执行的方法可以手动调用某个方法判断是否中断,进而更快地停止任务

实现逻辑

定义异步任务封装类,用于接受信息

public class TaskWrapper<T> implements Runnable {
    private Runnable runnable;
    private volatile boolean isInterrupt;
    private Supplier<T> supplier;
    private T result;
    private final String taskId;

    private Map<String, String> copyMdc = null;

    //有需要传递的变量可以通过context传递
    private Map<String, Object> executeContext = null;
    Throwable errorCause;

    TaskWrapper(Runnable runnable, String taskId) {
        this.runnable = runnable;
        this.isInterrupt = false;
        this.taskId = taskId;
        copyMdc = MDC.getCopyOfContextMap();
        executeContext = XxlShardingTask.getCopyOfContext();
    }

    TaskWrapper(Supplier<T> supplier, String taskId) {
        this.supplier = supplier;
        this.isInterrupt = false;
        this.taskId = taskId;
        copyMdc = MDC.getCopyOfContextMap();
        executeContext = XxlShardingTask.getCopyOfContext();
    }

    @Override
    public void run() {
        if (!CollectionUtils.isEmpty(copyMdc)) {
            MDC.setContextMap(copyMdc);
        }
        if (!CollectionUtils.isEmpty(executeContext)) {
            XxlShardingTask.setExecuteContext(executeContext);
        }
        XxlShardingTask.setWrapper(this);
        try {
            if (isInterrupt) {
                return;
            }
            if (runnable != null) {
                runnable.run();
            }
            if (supplier != null) {
                result = supplier.get();
            }
        } finally {
            MDC.clear();
            XxlShardingTask.removeContext();
        }
    }

    static boolean isInterrupt() {
        return Optional.ofNullable(XxlShardingTask.getFromContext(XxlShardingTask.EXECUTE_KEY)).map(e -> ((TaskWrapper<?>) e).interrupted()).orElse(Boolean.FALSE);
    }

    public T getResult() {
        return result;
    }

    public String getTaskId() {
        return taskId;
    }

    public Throwable getErrorCause() {
        return errorCause;
    }

    /**
     * 是否成功
     *
     * @return
     */
    public boolean isSuccess() {
        return !isInterrupt && errorCause == null;
    }

    public boolean interrupted() {
        return isInterrupt;
    }

    synchronized void setInterrupt() {
        this.isInterrupt = true;
    }
}

在xxljob的主线程初次调用时,会调用init方法,定一个handler继承xxljob的IJobHandler,并实现

他的init方法,新建检查线程用于check中断信号,执行过程中,会把当前在跑的任务丢到一个map中存储,而检查线程会调用异步任务,把对应的标志未置为停止

public abstract class XxlAsyncTaskHandler<T> extends IJobHandler {
...

public void init() throws InvocationTargetException, IllegalAccessException {
        super.init();
        JobThread thread = (JobThread) Thread.currentThread();
        Field toStop = ReflectionUtils.findField(JobThread.class, "toStop");
        if (toStop == null) {
            throw new IllegalStateException("current thread don't have field [toStop],please check the xxl-job version");
        }
        mainThreadInterrupt.set(false);
        ReflectionUtils.makeAccessible(toStop);
        checkInterruptThread = new Thread(() -> {
            try {
                while (!mainThreadInterrupt.get()) {
                    TimeUnit.MILLISECONDS.sleep(getCheckInterruptMills());
                    if ((boolean) toStop.get(thread)) {
                        if (mainThreadInterrupt.compareAndSet(false, true)) {
                            currentRunTask.forEach((s, tTaskWrapper) -> {
                                tTaskWrapper.setInterrupt();
                            });

                        }
                    }
                }
            } catch (InterruptedException e) {
                //ignore
            } catch (Exception ex) {
                LOGGER.error("check interrupt error", ex);
            }
        });
        checkInterruptThread.start();
    }

}

主流程(即xxl-job调度线程所执行的execute方法)通过获取待执行的任务,对其进行封装,并加入到当前在运行的任务map中,核心的代码如下,逻辑流程

  1. 从任务生成器中获取待执行的封装好的任务
  2. 并加入到异步线程池执行
  3. 主线程等待
 while (currentTaskGenerator.hasNextTask()) {
                List<TaskWrapper<T>> wrappers = new ArrayList<>();
                for (int i = 0; i < parallelCount; i++) {
                    if (currentTaskGenerator.hasNextTask()) {
                        TaskWrapper<T> nextTask = currentTaskGenerator.getNextTask();
                        String taskId = nextTask.getTaskId();
//加入到当前执行中的任务
                        currentRunTask.put(taskId, nextTask);
                        CompletableFuture.runAsync(nextTask, executor).whenComplete((unused, throwable) -> {
                            if (throwable != null) {
                                currentRunTask.get(taskId).errorCause = throwable;
                            } else {
                                if (nextTask.isSuccess()) {
                                    successCount.incrementAndGet();
                                }
                            }
//任务处理完,countDown一下
                            count.countDown();
                            currentRunTask.remove(taskId);
                        });
                        //代表任务分配完毕
                    } else {
                        count.countDown();
                    }
                }
//主线程等待
                count.await();

对于异步任务的逻辑

由于开始时设置当前执行的封装任务到本地线程,可以通过static方法进行获取标识,比如循环或者一些较重的耗时操作,可以在执行前进行判断,如果中断了就返回结果

  protected static boolean isWorkerInterrupt() {
        return TaskWrapper.isInterrupt();
    }

比如继承该类,子类可以在业务逻辑进行判断

            while (!isWorkerInterrupt()) {
...业务逻辑
}

由于整块优化的异步调度任务的代码比较多,而且涉及了公司信息,不在此展示,重点在于

  1. xxl-job异步线程如何感知主线程中断信息——了解xxljob trigger原理,封装runnable,管理当前封装的runnable任务,把中断信息透传异步任务
  2. 线程间的信息如何传递——这里通过封装runnable类作为一个信息载体,threadLocal用于接受信息,实现不同线程的信息传递

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/1902203.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

超越YOLO! RT-DETR 实时目标检测技术介绍

《博主简介》 小伙伴们好&#xff0c;我是阿旭。专注于人工智能、AIGC、python、计算机视觉相关分享研究。 ✌更多学习资源&#xff0c;可关注公-仲-hao:【阿旭算法与机器学习】&#xff0c;共同学习交流~ &#x1f44d;感谢小伙伴们点赞、关注&#xff01; 《------往期经典推…

Thisjavabean对象数组

This 1.概念 this是一个对象this是一个构造函数 2.介绍 解决局部变量和成员变量命名冲突 this在面向对象-封装那一篇里&#xff0c;有被两个地方提及。 但我们先简单给一个例子&#xff1a; public Person(String name, String phone, String qqPassword, String bankCar…

【踩坑】修复报错Cannot find DGL libdgl_sparse_pytorch_2.2.0.so

转载请注明出处&#xff1a;小锋学长生活大爆炸[xfxuezhagn.cn] 如果本文帮助到了你&#xff0c;欢迎[点赞、收藏、关注]哦~ 目录 错误复现 原因分析 解决方法 错误复现 import dgldataset dgl.data.CoraGraphDataset() graph dataset[0] graph.adjacency_matrix() 原因分…

Python 学习中什么是元组,如何使用元组?

什么是元组 元组&#xff08;Tuple&#xff09;是Python内置的一种数据结构&#xff0c;用于存储多个数据项。与列表类似&#xff0c;元组也可以存储不同类型的数据&#xff0c;但它们之间存在一个重要区别&#xff1a;元组是不可变的&#xff0c;也就是说&#xff0c;一旦创建…

笔记13:switch多分支选择语句

引例&#xff1a; 输入1-5中的任意一共数字&#xff0c;对应的打印字符A,B,C,D,E int num 0; printf("Input a number[1,5]:"); scanf("%d"&#xff0c;&num); if( num 1)printf("A\n"); else if(num2)printf("B\n"); else i…

文化财经macd顶底背离幅图指标公式源码

DIFF:EMA(CLOSE,12) - EMA(CLOSE,26); DEA:EMA(DIFF,9); MACD:2*(DIFF-DEA),COLORSTICK; JC:CROSS(DIFF,DEA); SC:CROSSDOWN(DIFF,DEA); N1:BARSLAST(JC)1; N2:BARSLAST(SC)1; HH:VALUEWHEN(CROSSDOWN(DIFF,DEA),HHV(H,N1));//上次MACD红柱期间合约最大值 HH2:VALUEWHE…

HTML(26)——平面转换-旋转-多重转换-缩放

旋转 属性&#xff1a;transform:rotate(旋转角度) 角度的单位是deg。 取值为正&#xff0c;顺时针旋转取值为负&#xff0c;逆时针旋转 默认情况下&#xff0c;旋转的原点是盒子中心点 改变旋转的原点可以使用属性:transform-origin:水平原点位置 垂直原点位置 取值&a…

springboot+vue原创歌曲分享平台 LW +PPT+源码+讲解

3 平台分析 3.1平台可行性分析 3.1.1经济可行性 由于本平台是作为毕业设计平台&#xff0c;且平台本身存在一些技术层面的缺陷&#xff0c;并不能直接用于商业用途&#xff0c;只想要通过该平台的开发提高自身学术水平&#xff0c;不需要特定服务器等额外花费。所有创造及工作…

[BJDCTF 2nd]简单注入

sqlsqlsqlsqlsql又来喽 过滤了单双引号&#xff0c;等于符号&#xff0c;还有select等&#xff0c;但是这里没有二次注入 。扫描发现hint.txt 看出题人的意思是&#xff0c;得到密码即可获得flag。 select * from users where username$_POST["username"] and passw…

编写优雅Python代码的20个最佳实践

想要让你的代码像艺术品一样既实用又赏心悦目吗&#xff1f;今天我们就来聊聊如何通过20个小技巧&#xff0c;让你的Python代码从平凡走向优雅&#xff0c;让同行看了都忍不住点赞&#xff01; **温馨提示&#xff1a;更多的编程资料&#xff0c;领取方式在&#xff1a; 1. 拥…

最小代价生成树实现(算法与数据结构设计)

课题内容和要求 最小代价生成树的实现&#xff0c;分别以普利姆算法和克鲁斯卡尔算法实现最小代价生成树&#xff0c;并分析两种算法的适用场合。 数据结构说明 普利姆算法实现最小代价生成树的图采用邻接表存储结构&#xff0c;还有辅助数据结构&#xff0c;数组nearest&am…

Lambda架构

1.Lambda架构对大数据处理系统的理解 Lambda架构由Storm的作者Nathan Marz提出&#xff0c;其设计目的在于提供一个能满足大数据系统关键特性的架构&#xff0c;包括高容错、低延迟、可扩展等。其整合离线计算与实时计算&#xff0c;融合不可变性、读写分离和复杂性隔离等原则&…

揭秘“消费即收益”的循环购模式 商家智慧还是消费陷阱?

大家好&#xff0c;我是你们的电商策略顾问吴军。今天&#xff0c;我将带大家深入剖析一种新兴的商业模式——循环购模式&#xff0c;它以其独特的“消费赠礼、每日返利、提现自由”特性&#xff0c;在电商界掀起了不小的波澜。那么&#xff0c;这种模式究竟有何魅力&#xff1…

ip地址突然变了一个城市怎么办

在数字化日益深入的今天&#xff0c;IP地址不仅是网络连接的标识&#xff0c;更是我们网络行为的“身份证”。然而&#xff0c;当您突然发现您的IP地址从一个城市跳转到另一个城市时&#xff0c;这可能会引发一系列的疑问和担忧。本文将带您深入了解IP地址突变的可能原因&#…

Android ViewPostImeInputStage输入事件处理

InputDispatcher向InputChannel使用socket写入输入事件&#xff0c;触发InputEventReceiver调用来接收输入事件。 ViewPostImeInputStage处理view控件的事件 frameworks/base/core/java/android/view/InputEventReceiver.java dispatchInputEvent frameworks/base/core/jav…

Shell编程类-网站检测

Shell编程类-网站检测 面试题参考答法 a(1 2 3 4) echo ${a[0]} echo ${a[*]}这里声明一个数值&#xff0c;并选择逐个调用输出还是全部输出 curl -w %{http_code} urL/IPADDR常用-w选项去判断网站的状态&#xff0c;因为不加选择访问到的网站可能出现乱码无法判断是否网站down…

Nuxt框架中内置组件详解及使用指南(一)

title: Nuxt框架中内置组件详解及使用指南&#xff08;一&#xff09; date: 2024/7/6 updated: 2024/7/6 author: cmdragon excerpt: 本文详细介绍了Nuxt框架中的两个内置组件和的使用方法与功能。确保包裹的内容仅在客户端渲染&#xff0c;适用于处理浏览器特定功能或异步…

第1章 项目背景(学成在线),项目介绍,环境搭建

1.项目背景 1.1 在线教育市场环境 以下内容摘自https://report.iresearch.cn/content/2021/01/358854.shtml 在线教育行业是一个有着极强的广度和深度的行业&#xff0c;从校内到校外&#xff1b;从早幼教到职业培训&#xff1b;从教育工具到全信息化平台等等。 2020年的新…

智慧文旅(景区)解决方案PPT(42页)

智慧文旅解决方案摘要 行业分析中国旅游业正经历消费大众化、需求品质化、发展全域化和产业现代化的发展趋势。《“十三五”旅游业发展规划》的发布&#xff0c;以及文化和旅游部的设立&#xff0c;标志着旅游业的信息化和智能化建设成为国家战略。2018年推出的旅游行业安全防范…

Linux:Ubuntu18.04下开机自启动QT图形化界面

Linux&#xff1a;Ubuntu18.04下开机自启动QT图形化界面 Chapter1 Linux&#xff1a;Ubuntu18.04下开机自启动QT图形化界面一、创建rc.local文件二、建立rc-local.service文件三、启动服务查看启动状态四、重启 Chapter2 将QT应用作为开机自启动&#xff08;Linux系统&#xff…