Java | 一分钟掌握定时任务 | 9 - PowerJob分布式定时任务

news2025/1/10 17:19:41

作者:Mars酱

声明:本文章由Mars酱整理编写,部分内容来源于网络,如有疑问请联系本人。

转载:欢迎转载,转载前先请联系我!

前言

我们选择一套框架或者技术的时候,一定要知道它的特点和功能,不能为了(学习)技术而(选择)技术,那是对产品的不负责任。官方说有类似情况的,可以使用PowJob:

  • 有定时执行需求的业务场景:如每天凌晨全量同步数据、生成业务报表、未支付订单超时取消等。
  • 有需要​​全部机器一同执行​​的业务场景:如使用广播执行模式清理集群日志。
  • 有需要​​分布式处理​​的业务场景:比如需要更新一大批数据,单机执行耗时非常长,可以使用Map/MapReduce 处理器完成任务的分发,调动整个集群加速计算。
  • 有需要​​延迟执行​​某些任务的业务场景:比如订单过期处理等。

架构图

这是官方提供的架构图:

从架构图颜色可以看出主体就分了两个大块和一小条:调度中心、执行器、Akka。

  • ​调度中心 powerjob-server​​:PowerJob 的设计目标为企业级的分布式任务调度平台,即成为调度中间件,让任意业务线的应用仅需要依赖 powerjob-worker 即可获取任务调度与分布式计算的能力。因此,PowerJob 的理想部署模式为一个公司统一部署 powerjob-server 集群,各业务线应用直接接入使用。
  • ​执行器 powerjob-worker​​:根据以前对定时任务的理解,用过Quartz的话,这里相当于Job这个接口;用过ElasticJob的话,最起码相当于Job接口中的一种,比如SimpleJob接口;用过xxl-job的话,这里也是同理,相当于使用了注解​​@XxlJob​​的方法。因此,所有需要执行的任务,mars酱的理解都需要依赖powerjob-worker的。
  • ​Akka ActorSystem​​:基于Actor模型设计的,专用于构建高度并发、分布式和弹性的工具包,号称单台机器上高达 200 亿条消息/秒。从架构图来看,PowerJob用来做数据交换传输,这么牛逼的中间协议处理者,看来PowerJob团队一定是想往大了搞的。

学习官方的例子

官方提供了一个示例,下载源代码之后,有个powerjob-worker-samples子工程,工程结构是这样:

工程依赖powerjob-worker-spring-boot-starter,工程文件夹中比较重点的就是processors文件夹了,给出了各种处理器实现的例子。处理器官方按照功能分了几种:

  • ​单机处理器 - BasicProcessor​​:单机执行的策略下,server 会在所有可用 worker 中选取健康度最佳的机器进行执行。
  • ​广播处理器 - BroadcastProcessor​​:广播执行的策略下,所有机器都会被调度执行该任务。为了便于资源的准备和释放,广播处理器在​​BasicProcessor​​ 的基础上额外增加了 ​​preProcess​​ 和 ​​postProcess​​ 方法,分别在整个集群开始之前/结束之后选一台机器执行相关方法。
  • ​并行处理器 - MapReduceProcessor​​:MapReduce 是最复杂也是最强大的一种执行器,它允许开发者完成任务的拆分,将子任务派发到集群中其他Worker 执行,是执行大批量处理任务的首选。
  • ​Map处理器 - MapProcessor​​:对应了Map任务,即某个任务在运行过程中,允许产生子任务并分发到其他机器进行运算。

process方法

BasicProcessor 的 process 方法基本上是所有任务需要实现的核心方法,表示需要执行任务的具体业务内容,其方法定义如下:

ProcessResult process(TaskContext context) throws Exception;

TaskContext入参

process参数TaskContext类似一个dto对象,里面定义了框架传递给具体业务内容所需的一些属性,如下:

属性名称意义/用法
jobId任务 ID,开发者一般无需关心此参数
instanceId任务实例 ID,全局唯一,开发者一般无需关心此参数
subInstanceId子任务实例 ID,秒级任务使用,开发者一般无需关心此参数
taskId采用链式命名法的 ID,在某个任务实例内唯一,开发者一般无需关心此参数
taskNametask 名称,Map/MapReduce 任务的子任务的值为开发者指定,否则为系统默认值,开发者一般无需关心此参数
jobParams任务参数对于非工作流中的任务其值等同于控制台录入的任务参数; 如果该任务为工作流中的任务且有配置节点参数信息,那么接收到的是节点配置的参数信息
instanceParams任务实例参数对于非工作流中的任务 其值 等同于 OpenAPI 传递的实例参数,非 OpenAPI 触发的任务则一定为空。 如果该任务为工作流中的任务那么这里实际接收到的是工作流上下文信息,建议使用 getWorkflowContext 方法获取上下文信息
maxRetryTimesTask 的最大重试次数
currentRetryTimesTask 的当前重试次数,和 maxRetryTimes 联合起来可以判断当前是否为该 Task 的最后一次运行机会
subTask子 Task,Map/MapReduce 处理器专属,开发者调用map方法时传递的子任务列表中的某一个
omsLogger在线日志,用法同 Slf4J,记录的日志可以直接通过控制台查看,非常便捷和强大!不过使用过程中需要注意频率,滥用在线日志会对 Server 造成巨大的压力
userContext用户在 PowerJobWorkerConfig 中设置的自定义上下文
workflowContext工作流WorkflowContext对象

这是它的源码:

@Getter
@Setter
@ToString
@Slf4j
public class TaskContext {

    private Long jobId;

    private Long instanceId;

    private Long subInstanceId;

    private String taskId;

    private String taskName;
    
    private String jobParams;
    
    private String instanceParams;
    
    private int maxRetryTimes;
    
    private int currentRetryTimes;
    
    private Object subTask;
    
    @JsonIgnore
    private OmsLogger omsLogger;
    
    private Object userContext;
    
    private WorkflowContext workflowContext;

}

返回值 ProcessResult

方法的返回值为 ​​ProcessResult​​,代表了本次任务执行的结果,包含 ​​success​​ 和 ​​msg​​ 两个属性,分别用于传递 Task 是否执行成功和 Task 需要返回的信息。

process方法被谁调用

mars酱选择官方例子中的SimpleProcessor任务,它实现了​​BasicProcessor​​的process方法,跟踪它被调用的地方,找到LightTaskTracker, 它的构造函数把任务提交给线程池调用,这是LightTaskTracker的构造函数:

public LightTaskTracker(ServerScheduleJobReq req, WorkerRuntime workerRuntime) {
        super(req, workerRuntime);
        try {
            taskContext = constructTaskContext(req, workerRuntime);
            // 1. 等待处理
            status = TaskStatus.WORKER_RECEIVED;
            // 2. 加载 Processor
            processorBean = workerRuntime.getProcessorLoader().load(new ProcessorDefinition().setProcessorType(req.getProcessorType()).setProcessorInfo(req.getProcessorInfo()));
            executeThread = new AtomicReference<>();
            long delay = Integer.parseInt(System.getProperty(PowerJobDKey.WORKER_STATUS_CHECK_PERIOD, "15")) * 1000L;
            // 3. 初始延迟加入随机值,避免在高并发场景下所有请求集中在一个时间段
            long initDelay = RandomUtils.nextInt(5000, 10000);
            // 4. 上报任务状态
            statusReportScheduledFuture = workerRuntime.getExecutorManager().getLightweightTaskStatusCheckExecutor().scheduleWithFixedDelay(this::checkAndReportStatus, initDelay, delay, TimeUnit.MILLISECONDS);
            // 5. 超时控制
            if (instanceInfo.getInstanceTimeoutMS() != Integer.MAX_VALUE) {
                if (instanceInfo.getInstanceTimeoutMS() < 1000L) {
                    timeoutCheckScheduledFuture = workerRuntime.getExecutorManager().getLightweightTaskStatusCheckExecutor().scheduleAtFixedRate(this::timeoutCheck, instanceInfo.getInstanceTimeoutMS(), instanceInfo.getInstanceTimeoutMS() / 10, TimeUnit.MILLISECONDS);
                } else {
                    // 执行时间超过 1 s 的任务,超时检测最小颗粒度为 1 s
                    timeoutCheckScheduledFuture = workerRuntime.getExecutorManager().getLightweightTaskStatusCheckExecutor().scheduleAtFixedRate(this::timeoutCheck, instanceInfo.getInstanceTimeoutMS(), 1000L, TimeUnit.MILLISECONDS);
                }
            } else {
                timeoutCheckScheduledFuture = null;
            }
            // 6. 提交任务到线程池
            processFuture = workerRuntime.getExecutorManager().getLightweightTaskExecutorService().submit(this::processTask);
        } catch (Exception e) {
            log.error("[TaskTracker-{}] fail to create TaskTracker for req:{} ", instanceId, req);
            destroy();
            throw e;
        }

    }

上面代码的第6步是通过PowerJob的ExecutorManager创建的一个线程池,并提交给线程池去执行,这是ExecutorManager的构造函数:

public ExecutorManager(PowerJobWorkerConfig workerConfig){


        final int availableProcessors = Runtime.getRuntime().availableProcessors();
        // 初始化定时线程池
        ThreadFactory coreThreadFactory = new ThreadFactoryBuilder().setNameFormat("powerjob-worker-core-%d").build();
        coreExecutor =  new ScheduledThreadPoolExecutor(3, coreThreadFactory);

        ThreadFactory lightTaskReportFactory = new ThreadFactoryBuilder().setNameFormat("powerjob-worker-light-task-status-check-%d").build();
        
        lightweightTaskStatusCheckExecutor =  new ScheduledThreadPoolExecutor(availableProcessors * 10, lightTaskReportFactory);

        ThreadFactory lightTaskExecuteFactory = new ThreadFactoryBuilder().setNameFormat("powerjob-worker-light-task-execute-%d").build();
        // 这里创建线程池,
        lightweightTaskExecutorService = new ThreadPoolExecutor(availableProcessors * 10,availableProcessors * 10, 120L, TimeUnit.SECONDS,
                new ArrayBlockingQueue<>((workerConfig.getMaxLightweightTaskNum() * 2),true), lightTaskExecuteFactory, new ThreadPoolExecutor.AbortPolicy());

    }

构造函数最后一行创建ThreadPoolExecutor,队列使用的ArrayBlockingQueue,失败策略使用的AbortPolicy策略,失败之后抛出异常。

其他任务调度框架

优秀的定时任务框架很多,单体架构的实现可选的不太多,一般也就spring task常用点,分布式架构的可选的很多,可以根据自己的需求选择不同的定时任务框架,以下还有几款名气也不小的:

  • ​big-whale​​:美柚app开源的任务调度框架,提供Spark、Flink等批处理任务的DAG调度和流处理任务的运行管理和状态监控,并具有Yarn应用管理、重复应用检测、大内存应用检测等功能。
  • ​Schedulis​​:微众银行基于 LinkedIn 的开源项目 Azkaban 开发的一款工作流任务调度系统,用于解决金融级场景下,大量批量作业任务的复杂依赖、灵活调度。
  • ​Oozie​​:工作流调度系统,用于管理Apache Hadoop作业。它与Hadoop堆栈的其余部分集成,支持多种类型的Hadoop作业(如Java map-reduce,Streaming map-reduce,Pig,Hive,Sqoop和Distcp)以及系统特定的作业(如Java程序和shell脚本)。

其他优秀的开源任务调度框架,大家可以去github或者gitee上搜索并学习一下。一分钟掌握定时任务,完结。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/571292.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

电源大师课-初阶

第一课 电源系统构成和基础原理 1-电源效率 总的Pout除以Pin 2-输出电压调整率 源调整率(输入电压变化时&#xff0c;输出稳定程度) 负载调整率(输出负载变化时&#xff0c;输出稳定度) 温度调整率(工作环境温度在极限情况下&#xff0c;输出的稳定度) 3-纹波测试(20MHz、最…

KD7742电气安规综合测试仪

一、产品简介 KD7742电气安规综合测试仪具有交/直流耐压、绝缘电阻等项目的测试分析功能&#xff0c;能显示电压、电流和电阻的波形图以及趋势图&#xff0c;以便更直观的监测分析绝缘性能和绝缘崩溃时的各项指标&#xff0c;适用于高要求的测试分析场合。 产品具有测试参数范围…

郑州信源招标采购系统 定制

概述&#xff1a; 招标采购系统是郑州信源运用“互联网”、大数据、人工智能、区块链、物联网等新兴技术&#xff0c;结合供应链管理理念&#xff0c;以招标采购为核心&#xff0c;提供交易、管理、数据、服务、监管为一体的高标准采购管理平台&#xff0c;招标采购系统根据客户…

基于html+css的图片展示93

准备项目 项目开发工具 Visual Studio Code 1.44.2 版本: 1.44.2 提交: ff915844119ce9485abfe8aa9076ec76b5300ddd 日期: 2020-04-16T16:36:23.138Z Electron: 7.1.11 Chrome: 78.0.3904.130 Node.js: 12.8.1 V8: 7.8.279.23-electron.0 OS: Windows_NT x64 10.0.19044 项目…

Qt编程基础 | 第六章-窗体 | 6.1、主窗体QMainWindow类

一、主窗体QMainWindow类 1.1、简介 QMainWindow是为用户提供主窗口程序的类&#xff0c;包含一个菜单栏&#xff08;menu bar&#xff09;、多个工具栏&#xff08;tool bars&#xff09;、多个锚接部件&#xff08;dock widgets&#xff09;、一个状态栏&#xff08;status …

基于jmeter完成压测

&#x1f3e0;个人主页&#xff1a;shark-Gao &#x1f9d1;个人简介&#xff1a;大家好&#xff0c;我是shark-Gao&#xff0c;一个想要与大家共同进步的男人&#x1f609;&#x1f609; &#x1f389;目前状况&#xff1a;23届毕业生&#xff0c;目前在某公司实习&#x1f…

FLEXPART拉格朗日粒子扩散模式建模技术及研究大气污染物源-汇关系中的实践经验与技巧

拉格朗日粒子扩散模式FLEXPART通过计算点、线、面或体积源释放的大量粒子的轨迹&#xff0c;来描述示踪物在大气中长距离、中尺度的传输、扩散、干湿沉降和辐射衰减等过程。 该模式既可以通过时间的前向运算来模拟示踪物由源区向周围的扩散&#xff0c;也可以通过后向运算来确…

剑指offer -- 二维数组中的查找

二维数组中的查找_牛客题霸_牛客网 (nowcoder.com) 暴力查找法: 是一种简单直接的解决方法&#xff0c;可以用于在二维数组中查找目标值。该方法的思路是遍历数组的每个元素&#xff0c;逐个与目标值进行比较。 具体步骤如下&#xff1a; 从数组的第一行第一列开始&#xff0c;…

Scala学习(十)---Set和Map

文章目录 1.Set集合1.1 不可变Set1.2 可变Set 2.Map2.1 不可变Map2.2 可变map 3.元组 1.Set集合 1.1 不可变Set 创建一个不可变set val setSet(1,2,3,4,6,5,4,4)println(set)//判断此set是否为不可变HashSetval bool set.isInstanceOf[HashSet[Int]]println(bool)运行&#…

考研数据结构--图

文章目录 图图的基本概念图的定义种类 图的抽象数据类型图的基本术语1. 端点和邻接点2. 顶点的度、入度和出度3. 完全图4. 稠密图、稀疏图5. 子图6. 路径和路径长度7. 回路或环8. 连通、连通图和连通分量9. 强连通图和强连通分量在一个图中找强连通分量的方法 10. 权和网 图的存…

如何在华为OD机试中获得满分?Java实现【统计匹配的二元组个数】一文详解!

✅创作者&#xff1a;陈书予 &#x1f389;个人主页&#xff1a;陈书予的个人主页 &#x1f341;陈书予的个人社区&#xff0c;欢迎你的加入: 陈书予的社区 &#x1f31f;专栏地址: Java华为OD机试真题&#xff08;2022&2023) 文章目录 1. 题目描述2. 输入描述3. 输出描述…

基于openfaas托管脚本的实践

作者 | 张曦 一、openfaas产品背景 在云服务架构发展之初&#xff0c;这个方向上的思路是使开发者不需要关心搭建和管理后端应用程序。这里并没有提及无服务器这个概念&#xff0c;而是指后端基础设施由第三方来托管&#xff0c;需要的基础架构组建均以服务的形式提供&#x…

list的模拟实现

第一步&#xff1a;看源代码 类的框架&#xff1a; 成员函数&#xff1a; 基本可以确定list是一个带头双向循环链表&#xff0c;end()的迭代器指向头节点&#xff0c;begin()的迭代器指向头结点的下一个节 list的迭代器&#xff1a;&#xff08;稍显复杂&#xff09; 库中的迭代…

5节点系统潮流计算-牛拉法和PQ分解法(代码+报告)

目录 1 主要内容 2 部分代码 3 程序结果 4 下载链接 1 主要内容 该部分资料是牛拉法和PQ分解法两种潮流计算方法的代码和对应的资料&#xff0c;程序针对5节点系统&#xff0c;也可以自行修改节点和线路参数改成其他节点系统&#xff0c;程序通用性较强&#xff0c;注释清晰…

电脑没有声音了怎么恢复?3个实用方法分享!

案例&#xff1a;我想在电脑上看电影、听音乐&#xff0c;但是我发现电脑没有声音&#xff0c;这种情况让我感到很困扰&#xff0c;有没有解决的方法&#xff1f; 【我的电脑没有声音&#xff0c;这非常影响我的使用。电脑没有声音是什么问题&#xff1f;有没有小伙伴知道解决…

Recurrent Neural Network(循环神经网络)

目录 Slot Filling with RNN Elman Network & Jordan Network Bidirectional RNN LSTM(Long Short-term Memory) Example Learning Target LSTM GRU (Gated Recurrent Unit) More Applications Many to One Many to Many Speech Recognition Sequence to Sequ…

一大波物联网毕业设计选题推荐(配套源码、文档、开发板)

以下项目整体综合性比较强&#xff0c;更贴近于产品化&#xff0c;并且基本都包含微信小程序与物联网云平台的联动&#xff0c;每个项目均配套详细的项目开发文档、程序源码&#xff0c;非常适合作为物联网毕业设计选题。项目文档及源码在文章末尾可免费下载。 另外&#xff0…

新手上路——怎样给我的网站备案

怎样办理网站备案&#xff1a; 由于备案是在主机接入商处办理&#xff0c;通常在哪里买的网站空间在哪里提交备案。例如在西部数码开通的虚拟主机、云服务器、独立主机等业务后&#xff0c;再通过其平台提交备案申请。 1.主机业务开通成功后&#xff0c;打开备案平台网址&…

【uniapp】app端压窗屏设计

一、前言 众所周知&#xff0c;在app端中&#xff0c;普通的组件是无法覆盖原生组件&#xff0c;即使是官方提供的cover-view也只是在实体内容中覆盖一些原生的如地图。但是无法覆盖底部的tabbar。 二、了解层级关系 实际上app端每点击一次的层级是这样的&#xff0c;我们可…

spring security oauth2.0-authorization code

Oauth2.0 spring security 估计很多人都在用,里面有几种安全模式值得大家去摸索. oauth2.0 是一种授权鉴权的机制,主要是用来颁发令牌,验证令牌,刷新令牌. OAuth2.0是OAuth协议的延续版本&#xff0c;但不向前兼容OAuth 1.0(即完全废止了OAuth1.0). 2012年10月&#xff0c…