分布式任务调度 - PowerJob

news2025/1/24 11:40:25

一、简介

1、介绍

PowerJob(原OhMyScheduler)是全新一代分布式任务调度与计算框架,其主要功能特性如下:

  • 使用简单:提供前端Web界面,允许开发者可视化地完成调度任务的管理(增、删、改、查)、任务运行状态监控和运行日志查看等功能。
  • 定时策略完善:支持 CRON 表达式固定频率固定延迟API四种定时调度策略。
  • 执行模式丰富:支持单机、广播、Map、MapReduce 四种执行模式,其中 Map/MapReduce 处理器能使开发者寥寥数行代码便获得集群分布式计算的能力。
  • 工作流支持:支持在线配置任务依赖关系(DAG),以可视化的方式对任务进行编排,同时还支持上下游任务间的数据传递,以及多种节点类型(判断节点 & 嵌套工作流节点)。
  • 执行器支持广泛:支持 Spring Bean、内置/外置 Java 类,另外可以通过引入官方提供的依赖包,一键集成 Shell、Python、HTTP、SQL 等处理器,应用范围广。
  • 运维便捷:支持在线日志功能,执行器产生的日志可以在前端控制台页面实时显示,降低 debug 成本,极大地提高开发效率。
  • 依赖精简:最小仅依赖关系型数据库(MySQL/PostgreSQL/Oracle/MS SQLServer…)
  • 高可用 & 高性能:调度服务器经过精心设计,一改其他调度框架基于数据库锁的策略,实现了无锁化调度。部署多个调度服务器可以同时实现高可用和性能的提升(支持无限的水平扩展)。
  • 故障转移与恢复:任务执行失败后,可根据配置的重试策略完成重试,只要执行器集群有足够的计算节点,任务就能顺利完成。

1.1、适用场景

  • 有定时执行需求的业务场景:如每天凌晨全量同步数据、生成业务报表、未支付订单超时取消等。
  • 有需要全部机器一同执行的业务场景:如使用广播执行模式清理集群日志。
  • 有需要分布式处理的业务场景:比如需要更新一大批数据,单机执行耗时非常长,可以使用Map/MapReduce 处理器完成任务的分发,调动整个集群加速计算。
  • 有需要延迟执行某些任务的业务场景:比如订单过期处理等。

1.2、设计目标

PowerJob的设计目标为企业级的分布式任务调度平台,即成为公司内部的任务调度中间件。整个公司统一部署调度中心powerjob-server,旗下所有业务线应用只需要依赖 powerjob-worker即可接入调度中心获取任务调度与分布式计算能力。
同类产品对比

1.3、同类产品对比

项目QuartZxxl-jobSchedulerX 2.0PowerJob
定时类型CRONCRONCRON、固定频率、固定延迟、OpenAPICRON、固定频率、固定延迟、OpenAPI
任务类型内置Java内置Java、GLUE Java、Shell、Python等脚本内置Java、外置Java(FatJar)、Shell、Python等脚本内置Java、外置Java(容器)、Shell、Python等脚本
分布式任务静态分片MapReduce 动态分片MapReduce 动态分片
在线任务治理不支持支持支持支持
日志白屏化不支持支持不支持支持
调度方式及性能基于数据库锁,有性能瓶颈基于数据库锁,有性能瓶颈不详无锁化设计,性能强劲无上限
报警监控邮件短信邮件,提供接口允许开发者扩展
系统依赖关系型数据库(MySQL、Oracle…)MySQL人民币任意 Spring Data Jpa支持的关系型数据库(MySQL、Oracle…)
DAG 工作流不支持不支持支持支持

2、基本概念

2.1、分组概念:

  • appName:应用名称,建议与用户实际接入 PowerJob 的应用名称保持一致,用于业务分组与隔离一个 appName 等于一个业务集群,也就是实际的一个 Java 项目。

2.2、核心概念:

  • 任务(Job):描述了需要被PowerJob调度的任务信息,包括任务名称、调度时间、处理器信息等。

  • 任务实例( JobInstance,简称 Instance):任务(Job)被调度执行后会生成任务实例(Instance),任务实例记录了任务的运行时信息(任务与任务实例的关系类似于类与对象的关系)。

  • 作业(Task):任务实例的执行单元,一个JobInstance存在至少一个Task,具体规则如下:
    ○ 单机任务(STANDALONE):一个 JobInstance 对应一个 Task
    ○ 广播任务(BROADCAST):一个 JobInstance 对应 N 个 Task,N为集群机器数量,即每一台机器都会生成一个 Task
    ○ Map/MapReduce任务:一个 JobInstance 对应若干个 Task,由开发者手动 map 产生

  • 工作流(Workflow):由 DAG(有向无环图)描述的一组任务(Job),用于任务编排。

  • 工作流实例(WorkflowInstance):工作流被调度执行后会生成工作流实例,记录了工作流的运行时信息。

2.3、扩展概念

  • JVM 容器:以 Maven 工程项目的维度组织一堆 Java 文件(开发者开发的众多 Java 处理器),可以通过前端网页动态发布并被执行器加载,具有极强的扩展能力和灵活性。
  • OpenAPI:允许开发者通过接口来完成手工的操作,让系统整体变得更加灵活。开发者可以基于 API 便捷地扩展PowerJob 原有的功能。

2.4、定时任务类型

  • API:该任务只会由powerjob-client中提供的OpenAPI接口触发,server 不会主动调度。
  • CRON:该任务的调度时间由 CRON 表达式指定。
  • 固定频率:秒级任务,每隔多少毫秒运行一次,功能与 java.util.concurrent.ScheduledExecutorService#scheduleAtFixedRate 相同。
  • 固定延迟:秒级任务,延迟多少毫秒运行一次,功能与 java.util.concurrent.ScheduledExecutorService#scheduleWithFixedDelay 相同。
  • 工作流:该任务只会由其所属的工作流调度执行,server 不会主动调度该任务。如果该任务不属于任何一个工作流,该任务就不会被调度。

备注:固定延迟和固定频率任务统称秒级任务,这两种任务无法被停止,只有任务被关闭或删除时才能真正停止任务。

3、官方

官网:http://www.powerjob.tech/
文档:https://www.yuque.com/powerjob/guidence/intro
项目地址
PowerJob 主项目:https://github.com/PowerJob/PowerJob
PowerJob 前端项目:https://github.com/PowerJob/PowerJob-Console
PowerJob 官网项目:https://github.com/PowerJob/Official-Website

├── LICENSE
├── powerjob-client // powerjob-client,普通Jar包,提供 OpenAPI
├── powerjob-common // 各组件的公共依赖,开发者无需感知
├── powerjob-server // powerjob-server,基于SpringBoot实现的调度服务器
├── powerjob-worker // powerjob-worker, 普通Jar包,接入powerjob-server的应用需要依赖该Jar包
├── powerjob-worker-agent // powerjob-agent,可执行Jar文件,可直接接入powerjob-server的代理应用
├── powerjob-worker-samples // 教程项目,包含了各种Java处理器的编写样例
├── powerjob-worker-spring-boot-starter // powerjob-worker 的 spring-boot-starter ,spring boot 应用可以通用引入该依赖一键接入 powerjob-server
├── powerjob-official-processors // 官方处理器,包含一系列常用的 Processor,依赖该 jar 包即可使用
├── others
└── pom.xml

二、实战

1、本地部署

1.1、下载项目

git clone https://github.com/PowerJob/PowerJob.git

1.2、导入 IDE,源码结构如下,我们需要启动调度服务器(powerjob-server),同时在 samples工程中编写自己的处理器代码

在这里插入图片描述

1.3、启动调度服务器

  1. 创建数据库(仅需要创建数据库):找到你的 DB,运行 SQL
CREATE DATABASE IF NOT EXISTS `powerjob-daily` DEFAULT CHARSET utf8mb4

搞定~
2. 修改配置文件,配置文件的说明官方文档写的非常详细,此处不再赘述。
需要修改的地方为数据库配置

  • spring.datasource.core.jdbc-url
  • spring.datasource.core.username
  • spring.datasource.core.password

当然,有 mongoDB 的同学也可以修改spring.data.mongodb.uri以获取完全版体验。

powerjob-server 日常环境配置文件:application-daily.properties

oms.env=DAILY
logging.config=classpath:logback-dev.xml

####### 外部数据库配置(需要用户更改为自己的数据库配置) #######
spring.datasource.core.driver-class-name=com.mysql.cj.jdbc.Driver
spring.datasource.core.jdbc-url=jdbc:mysql://localhost:3306/powerjob-daily?useUnicode=true&characterEncoding=UTF-8&serverTimezone=Asia/Shanghai
spring.datasource.core.username=root
spring.datasource.core.password=No1Bug2Please3!
spring.datasource.core.hikari.maximum-pool-size=20
spring.datasource.core.hikari.minimum-idle=5

####### mongoDB配置,非核心依赖,通过配置 oms.mongodb.enable=false 来关闭 #######
oms.mongodb.enable=true
spring.data.mongodb.uri=mongodb://localhost:27017/powerjob-daily

####### 邮件配置(不需要邮件报警可以删除以下配置来避免报错) #######
spring.mail.host=smtp.163.com
spring.mail.username=zqq@163.com
spring.mail.password=GOFZPNARMVKCGONV
spring.mail.properties.mail.smtp.auth=true
spring.mail.properties.mail.smtp.starttls.enable=true
spring.mail.properties.mail.smtp.starttls.required=true

####### 资源清理配置 #######
oms.instanceinfo.retention=1
oms.container.retention.local=1
oms.container.retention.remote=-1

####### 缓存配置 #######
oms.instance.metadata.cache.size=1024
  1. 完成配置文件的修改后,可以直接通过启动类 tech.powerjob.server.PowerJobServerApplication 启动调度服务器,观察启动日志,查看是否启动成功~启动成功后,访问 http://127.0.0.1:7700/ ,如果能顺利出现 Web 界面,则说明调度服务器启动成功!
  2. 注册应用:点击主页应用注册按钮,填入 powerjob-agent-test 和控制台密码(用于进入控制台),注册示例应用(当然你也可以注册其他的 appName,只是别忘记在示例程序中同步修改~)

2、编写示例代码

进入示例工程(powerjob-worker-samples),修改配置文件连接powerjob-server并编写自己的处理器代码。

  1. 修改 powerjob-worker-samplesapplication.properties,将 powerjob.worker.app-name 改为刚刚在控制台注册的名称。
server.port=8081

spring.jpa.open-in-view=false

########### powerjob-worker 配置 ###########
# akka 工作端口,可选,默认 27777
powerjob.worker.akka-port=27777
# 接入应用名称,用于分组隔离,推荐填写 本 Java 项目名称
powerjob.worker.app-name=powerjob-agent-test
# 调度服务器地址,IP:Port 或 域名,多值逗号分隔
powerjob.worker.server-address=127.0.0.1:7700,127.0.0.1:7701
# 持久化方式,可选,默认 disk
powerjob.worker.store-strategy=disk
# 任务返回结果信息的最大长度,超过这个长度的信息会被截断,默认 8192
powerjob.worker.max-result-length=4096
# 单个任务追加的工作流上下文最大长度,超过这个长度的会被直接丢弃,默认 8192
powerjob.worker.max-appended-wf-context-length=4096
  1. 编写自己的处理器:随便找个地方新建类,继承你想要使用的处理器(各个处理器的介绍可见官方文档,文档非常详细),这里为了简单演示,选择使用单机处理器 BasicProcessor,以下是代码示例。
@Slf4j
@Component
public class MyStandaloneProcessorDemo implements BasicProcessor {
    @Override
    public ProcessResult process(TaskContext context) throws Exception {
        OmsLogger omsLogger = context.getOmsLogger();
        omsLogger.info("处理器启动成功,context 是 {}.", context);

        log.info("单机处理器正在处理");
        log.info(context.getJobParams());

        omsLogger.info("处理器执行结束");

        boolean success = true;

        return new ProcessResult(success, context + ": " + success);
    }
}

  1. 启动示例程序,即直接运行主类 tech.powerjob.samples.processors.test.MyStandaloneProcessorDemo,观察控制台输出信息,判断是否启动成功。

3、任务的配置与运行

调度服务器与示例工程都启动完毕后,再次前往Web页面( http://127.0.0.1:7700/ ),进行任务的配置与运行。

  1. 在首页输入框输入配置的应用名称,成功操作后会正式进入前端管理界面。
    在这里插入图片描述
  2. 点击任务管理 -> 新建任务(右上角),开始创建任务。
    在这里插入图片描述
  • 任务名称:名称
  • 任务描述:描述
  • 任务参数:任务处理时能够获取到的参数(即各个 Processor 的 process 方法入参 TaskContext 对象的jobParams 属性)(进行一次处理器开发就能理解了)
  • 定时信息:该任务的触发方式,由下拉框和输入框组成
    ○ API -> 不需要填写任何参数,表明该任务由 OpenAPI 触发,不会被调度器主动调度执行
    ○ CRON -> 填写 CRON 表达式(在线生成网站)
    ○ 固定频率 -> 任务以固定的频率执行,填写整数,单位毫秒
    ○ 固定延迟 -> 任务以固定的延迟执行,填写整数,单位毫秒
    ○ 工作流 -> 不需要填写任何参数,表明该任务由工作流(workflow) 触发
  • 执行配置:由执行类型(单机、广播和 MapReduce )、处理器类型和处理器参数组成,后两项相互关联。
    ○ 内置Java处理器 -> 填写该处理器的全限定类名(eg, tech.powerjob.samples.processors.MapReduceProcessorDemo
    ○ Java容器 -> 填写容器ID#处理器全限定类名(eg,1#cn.edu.zju.oms.container.ContainerMRProcessor
    ○ SHELL、Python、SQL 、HTTP 等任务的执行:点击查看官方处理器的使用教程
  • 运行配置
    ○ 最大实例数:该任务同时执行的数量,0 代表不限制实例数量
    ○ 单机线程并发数:该实例执行过程中每个 Worker 使用的线程数量(MapReduce 任务生效,其余无论填什么,都只会使用必要的线程数)
    ○ 运行时间限制:限定任务的最大运行时间,超时则视为失败,单位毫秒,0 代表不限制超时时间(不建议不限制超时时间)。
  • 重试配置:
    ○ Instance 重试次数:实例级别,失败了整个任务实例重试,会更换 TaskTracker(本次任务实例的Master节点),代价较大,大型Map/MapReduce慎用。
    ○ Task 重试次数:Task 级别,每个子 Task 失败后单独重试,会更换 ProcessorTracker(本次任务实际执行的 Worker 节点),代价较小,推荐使用。
    ○ 注:请注意同时配置任务重试次数和子任务重试次数之后的重试放大,比如对于单机任务来说,假如任务重试次数和子任务重试次数都配置了 1 且都执行失败,实际执行次数会变成 4 次!推荐任务实例重试配置为 0,子任务重试次数根据实际情况配置。
  • 机器配置:用来标明允许执行任务的机器状态,避开那些摇摇欲坠的机器,0 代表无任何限制。
    ○ 最低 CPU 核心数:填写浮点数,CPU 可用核心数小于该值的 Worker 将不会执行该任务。
    ○ 最低内存(GB):填写浮点数,可用内存小于该值的 Worker 将不会执行该任务。
    ○ 最低磁盘(GB):填写浮点数,可用磁盘空间小于该值的 Worker 将不会执行该任务。
  • 集群配置
    ○ 执行机器地址:指定集群中的某几台机器执行任务( debug 的好帮手),多值英文逗号分割,如192.168.1.1:27777,192.168.1.2:27777
    ○ 最大执行机器数量:限定调动执行的机器数量
  • 报警配置:选择任务执行失败后报警通知的对象,需要事先录入。
  1. 完成任务创建后,即可在控制台看到刚才创建的任务,如果觉得等待调度太过于漫长,可以直接点击运行按钮,立即运行本任务。
    在这里插入图片描述
    运行结果
    在这里插入图片描述
    执行记录
    在这里插入图片描述
    在这里插入图片描述

4、问题及解决

问题:我设置的是CRON,每5秒执行,但它实际15秒执行一次。

原因:Powerjob的CRON模式下,时间轮是15秒扫描一次,这时间间隔不能小于15秒。若低于15秒会按15秒运行。

解决方案:改为固定频率或固定延迟进行。

修改设置如下:
在这里插入图片描述

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/105146.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

数据库原理及MySQL应用 | 约束

约束是保证数据完整性的一种数据库对象,按约束作用不同,分为七种。 约束从字面上来看就是受到限制,它是附加在表上,通过限制列中、行中、表之间数据来保证数据完整性的一种数据库对象。 在MySQL中,有多种约束&#xf…

设计模式原则 - 开闭原则(五)

开闭原则一 官方定义基本介绍二 案例演示普通实现方式案例分析开闭原则实现案例分析三 注意事项一 官方定义 开闭原则( Open Close Principle ),又称为OCP原则,他的官方定义如下: Software entities like classes,modu…

基于Java+Swing+Mysql实现停车场管理系统

基于JavaSwingMysql实现停车场管理系统一、系统介绍二、系统展示三、其它1.其他系统实现一、系统介绍 1.系统功能 用户 1.登录系统 2.信息查询 包含计费标准,当前在场信息,用户历史信息,用户个人信息,出入场信息,当前…

Win10提示错误代码0xc0000001的解决办法

​有一些朋友在使用Win10系统的时候会遇到蓝屏故障,提示“无法正常启动你的电脑,在多次尝试后,你的电脑上的操作系统仍无法启动,因此需求对其进行修复。” Win10提示恢复无法正常启动你的电脑0xc0000001 故障原因: 错误…

实战案例:初探工程配置 图标组件热身

点击上方卡片“前端司南”关注我您的关注意义重大原创前端司南前言本文是 基于ViteAntDesignVue打造业务组件库[1] 专栏第 3 篇文章【实战案例:初探工程配置 & 图标组件热身】,我将从业务系统中最基础的图标组件入手,带着读者们练练手找找…

websocket的用处及vue和SpringBoot和nginx的引入-入门

websocket的用处及vue和SpringBoot的引入-入门 为什么要有websocket 微信 想一个场景,扫码登录,服务器并不知道用户有没有扫码,怎么办,一种办法是HTTP定时轮询,1-2秒就请求一次服务端,看看用户有没有扫码…

5.3 常见的电感式和电容式感测原理及应用

常见的电感式和电容式感测应用1、电感式和电容式工作原理1.1 电感式感测工作原理1.2 电容式感测工作原理2 FDC:电容式液位感测2.1 电容技术在液位感测中的优势2.2 电容式液位感测入门3 LDC:电感式触控按钮4 LDC:增量编码器和事件计数5 LDC&am…

再学C语言10:字符串(1)

一、字符串定义 字符串:一个或多个字符的序列 "hello world!" 双引号并不是字符串的一部分,只是用于通知编译器其中包含了一个字符串 C没有为字符串定义专门的变量类型,而是将其存储在char数组中 字符串中的字符存放在相邻的存…

Amazon 4.7 星评,领域新经典,了解服务设计就读它

2011 年,Adaptive Path 公司的 Brandon Schauer 粗略估算,美国每年在服务的规划和设计上大约花费 20 亿美元,但其中仅有 7000 万美元(大约 3.5%)花在了“服务设计”上。做另外 96.5% 的工作的那些人,从不觉…

参加大学生数学建模大赛,Matlab和Python到底哪个更好?

前言 后台的小伙伴经常会问编程过程中,MATLAB和Python到底哪个更好?这个问题一直困惑很多同学,今天小编来给大家从实用型来综合分析一下: 首先从两者各自的应用做个对比。 一、python的优势 Python相对于Matlab最大的优势&…

Mac M2芯 k8s(minikube)超详细实战 - 单节点部分

概述 我使用的电脑是Mac pro M2芯的,使用的虚拟环境是 Ubuntu 22.04 ,M2芯兼容性不是特别好,所以尽量跟我博客中的版本保持一致。 虚拟机环境 Ubuntu 22.04docker :20.10.17minikube:v1.25.2 搭建minikube虚拟机环境…

【强化学习基础】强化学习的基本概念:状态、动作、智能体、策略、奖励、状态转移、轨迹、回报、价值函数

文章目录1.状态(State)2.动作(Action)3.智能体(Agent)4.策略(Policy)5.奖励(Reward)6.状态转移(State transition)7.智能体与环境交互…

高效率的Python开发工具——PyCharm v2022.3正式发布

JetBrains PyCharm是一种Python IDE,其带有一整套可以帮助用户在使用Python语言开发时提高其效率的工具。此外,该IDE提供了一些高级功能,以用于Django框架下的专业Web开发。 PyCharm v2022.3官方正式版下载(q技术交流:786598704)…

wireshark抓包数据提取TCP/UDP/RTP负载数据方法

wireshark抓包数据提取TCP_UDP_RTP负载数据方法 文章目录wireshark抓包数据提取TCP_UDP_RTP负载数据方法1 背景2 TCP和UDP负载提取方式3 RTP负载提取方式1 背景 在视频抓包分析过程中,有时候需要从TCP、UDP、RTP中直接提取payload数据,比如较老的摄像机…

微课堂助力在线教育招生引流方式_付费视频系统搭建对在线教育的作用

一、借助优惠码线上线下推广课程 1、线下发传单: 机构先在我们后台创建对应课程的通用优惠码,然后再制作课程传单介绍页。传单上显示出对应课程的通用优惠码,线下派发传单给到用户。 2、线下刮刮卡片推广:将私有码制作成卡片配合…

SVM(二)对偶问题转化以及求解

上篇: SVM(支持向量机)(一)基本形式推导 凸优化 考虑如下优化问题: 应用拉格朗日乘子法: 定义拉格朗日对偶函数G\mathcal{G}G,这里 infinfinf 是上确界(集合的最小上…

超标量处理器设计——第十章_提交

参考《超标量处理器》姚永斌著 文章目录超标量处理器设计——第十章_提交10.1 概述10.2 重排序缓存10.2.1 一般结构10.2.2 端口需求10.3 管理处理器的状态10.3.1 使用ROB管理指令集定义的状态10.3.2 使用物理寄存器管理指令集定义的状态10.4 特殊情况处理10.4.1 分支预测失败的…

【微服务】Nacos ⼀致性协议

目录 一、为什么 Nacos 需要⼀致性协议 二、为什么 Nacos 选择了 Raft 以及 Distro 1、从服务注册发现来看 2、从配置管理来看 3、为什么是 Raft 和 Distro 呢 三、早期的 Nacos ⼀致性协议 四、当前 Nacos 的⼀致性协议层 💖 Spring家族及微服务系列文章 …

【关于时间序列的ML】项目 2 :使用机器学习预测股票价格

🔎大家好,我是Sonhhxg_柒,希望你看完之后,能对你有所帮助,不足请指正!共同学习交流🔎 📝个人主页-Sonhhxg_柒的博客_CSDN博客 📃 🎁欢迎各位→点赞…

5G无线技术基础自学系列 | RF优化流程

素材来源:《5G无线网络规划与优化》 一边学习一边整理内容,并与大家分享,侵权即删,谢谢支持! 附上汇总贴:5G无线技术基础自学系列 | 汇总_COCOgsta的博客-CSDN博客 在介绍完了RF优化的思路和措施之后&…