XXL-JOB的基本使用

news2025/1/12 18:06:26

1、执行器

1.1下边配置执行器

下边配置执行器,执行器负责与调度中心通信接收调度中心发起的任务调度请求。

1、首先在媒资管理模块的service工程添加依赖,在项目的父工程已约定了版本2.3.1

XML

<dependency>
    <groupId>com.xuxueli</groupId>
    <artifactId>xxl-job-core</artifactId>
</dependency>

2、(加在你要在哪个任务写程序,因为我用的是nacos网关,所以需要搭建。搭建在yml文件当中和我下面一样的)在nacos下的media-service-dev.yaml下配置xxl-job

YAML
xxl:
  job:
    admin:
      addresses:
http://localhost:8080/xxl-job-admin/
    executor:
      appname: media-process-service
      address:
      ip:
      port: 9999
      logpath: /data/applogs/xxl-job/jobhandler
      logretentiondays: 30
    accessToken: default_token

注意配置中的appname这是执行器的应用名,稍后在调度中心配置执行器时要使用。

3、配置xxl-job的执行器

将示例工程下配置类拷贝到你要管理任务的service工程下

拷贝至:

4、下边进入调度中心添加执行器

点击新增,填写执行器信息,appname是前边在nacos中配置xxl信息时指定的执行器的应用名。

添加成功:

到此完成媒资管理模块service工程配置xxl-job执行器,在xxl-job调度中心添加执行器,下边准备测试执行器与调度中心是否正常通信,因为接口工程依赖了service工程,所以启动媒资管理模块的接口工程。

启动后观察日志,出现下边的日志表示执行器在调度中心注册成功

同时观察调度中心中的执行器界面

在线机器地址处已显示1个执行器。

1.2、执行任务

下边编写任务,任务类的编写方法参考示例工程,如下图:

在service包下新建jobhandler存放任务类,下边参考示例工程编写一个任务类

Java

package com.xuecheng.media.service.jobhander;

import com.xxl.job.core.context.XxlJobHelper;
import com.xxl.job.core.handler.annotation.XxlJob;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.stereotype.Component;

import java.io.BufferedInputStream;
import java.io.BufferedReader;
import java.io.DataOutputStream;
import java.io.InputStreamReader;
import java.net.HttpURLConnection;
import java.net.URL;
import java.util.Arrays;
import java.util.concurrent.TimeUnit;

/**
 * XxlJob开发示例(Bean模式)
 *
 * 开发步骤:
 *      1、任务开发:在Spring Bean实例中,开发Job方法;
 *      2、注解配置:为Job方法添加注解 "@XxlJob(value="自定义jobhandler名称", init = "JobHandler初始化方法", destroy = "JobHandler销毁方法")",注解value值对应的是调度中心新建任务的JobHandler属性的值。
 *      3、执行日志:需要通过 "XxlJobHelper.log" 打印执行日志;
 *      4、任务结果:默认任务结果为 "成功" 状态,不需要主动设置;如有诉求,比如设置任务结果为失败,可以通过 "XxlJobHelper.handleFail/handleSuccess" 自主设置任务结果;
 *
 * @author xuxueli 2023-3-1 9:32:51
 */
@Component
public class SampleXxlJob {
    private static Logger logger = LoggerFactory.getLogger(SampleXxlJob.class);


    /**
     * 1、简单任务示例(Bean模式)
     */
    @XxlJob("tetsJob")
    public void demoJobHandler() throws Exception {
        XxlJobHelper.log("开始执行。。");

        for (int i = 0; i < 5; i++) {
            XxlJobHelper.log("beat at:" + i);
            TimeUnit.SECONDS.sleep(2);
        }
        // default success
    }

}

下边在调度中心添加任务,进入任务管理

点击新增,填写任务信息

注意红色标记处:

调度类型选择Cron,并配置Cron表达式设置定时策略。

运行模式有BEAN和GLUE,bean模式较常用就是在项目工程中编写执行器的任务代码,GLUE是将任务代码编写在调度中心。

JobHandler任务方法名填写@XxlJob注解中的名称。

添加成功,启动任务

通过调度日志查看任务执行情况

下边启动媒资管理的service工程,启动执行器。

观察执行器方法的执行。

如果要停止任务需要在调度中心操作

任务跑一段时间注意清理日志

2、 分片广播

掌握了xxl-job的基本使用,下边思考如何进行分布式任务处理呢?如下图,我们会启动多个执行器组成一个集群,去执行任务。

执行器在集群部署下调度中心有哪些调度策略呢?

查看xxl-job官方文档,阅读高级配置相关的内容:

SQL
高级配置:
    - 路由策略:当执行器集群部署时,提供丰富的路由策略,包括;
        FIRST(第一个):固定选择第一个机器;
        LAST(最后一个):固定选择最后一个机器;
        ROUND(轮询):;
        RANDOM(随机):随机选择在线的机器;
        CONSISTENT_HASH(一致性HASH):每个任务按照Hash算法固定选择某一台机器,且所有任务均匀散列在不同机器上。
        LEAST_FREQUENTLY_USED(最不经常使用):使用频率最低的机器优先被选举;
        LEAST_RECENTLY_USED(最近最久未使用):最久未使用的机器优先被选举;
        FAILOVER(故障转移):按照顺序依次进行心跳检测,第一个心跳检测成功的机器选定为目标执行器并发起调度;
        BUSYOVER(忙碌转移):按照顺序依次进行空闲检测,第一个空闲检测成功的机器选定为目标执行器并发起调度;
        SHARDING_BROADCAST(分片广播):广播触发对应集群中所有机器执行一次任务,同时系统自动传递分片参数;可根据分片参数开发分片任务;
    - 子任务:每个任务都拥有一个唯一的任务ID(任务ID可以从任务列表获取),当本任务执行结束并且执行成功时,将会触发子任务ID所对应的任务的一次主动调度。
    - 调度过期策略:
        - 忽略:调度过期后,忽略过期的任务,从当前时间开始重新计算下次触发时间;
        - 立即执行一次:调度过期后,立即执行一次,并从当前时间开始重新计算下次触发时间;
    - 阻塞处理策略:调度过于密集执行器来不及处理时的处理策略;
        单机串行(默认):调度请求进入单机执行器后,调度请求进入FIFO队列并以串行方式运行;
        丢弃后续调度:调度请求进入单机执行器后,发现执行器存在运行的调度任务,本次请求将会被丢弃并标记为失败;
        覆盖之前调度:调度请求进入单机执行器后,发现执行器存在运行的调度任务,将会终止运行中的调度任务并清空队列,然后运行本地调度任务;
    - 任务超时时间:支持自定义任务超时时间,任务运行超时将会主动中断任务;
    - 失败重试次数;支持自定义任务失败重试次数,当任务失败时将会按照预设的失败重试次数主动进行重试;

第一个:每次调度选择集群中第一台执行器。

最后一个:每次调度选择集群中最后一台执行器。

轮询:按照顺序每次调度选择一台执行器去调度。

随机:每次调度随机选择一台执行器去调度。

CONSISTENT_HASH:按任务的hash值选择一台执行器去调度。

其它策略请自行阅读文档。

下边要重点说的是分片广播策略,分片是指是调度中心将集群中的执行器标上序号:0,1,2,3...,广播是指每次调度会向集群中所有执行器发送调度请求,请求中携带分片参数。

如下图:

每个执行器收到调度请求根据分片参数自行决定是否执行任务。

另外xxl-job还支持动态分片,当执行器数量有变更时,调度中心会动态修改分片的数量。

作业分片适用哪些场景呢?

  • 分片任务场景:10个执行器的集群来处理10w条数据,每台机器只需要处理1w条数据,耗时降低10倍;
  • 广播任务场景:广播执行器同时运行shell脚本、广播集群节点进行缓存更新等。

所以,广播分片方式不仅可以充分发挥每个执行器的能力,并且根据分片参数可以控制任务是否执行,最终灵活控制了执行器集群分布式处理任务。

使用说明:

"分片广播" 和普通任务开发流程一致,不同之处在于可以获取分片参数进行分片业务处理。

Java语言任务获取分片参数方式:

BEAN、GLUE模式(Java),可参考Sample示例执行器中的示例任务"ShardingJobHandler":

ShardingUtil.ShardingVO shardingVO = ShardingUtil.getShardingVo();

分片参数属性说明:

index:当前分片序号(从0开始),执行器集群列表中当前执行器的序号;

total:总分片数,执行器集群的总机器数量;

2.1、下边测试作业分片:

1、定义作业分片的任务方法

Java

    /**
     * 2、分片广播任务
     */
    @XxlJob("shardingJobHandler")
    public void shardingJobHandler() throws Exception {

        // 分片参数
        int shardIndex = XxlJobHelper.getShardIndex();
        int shardTotal = XxlJobHelper.getShardTotal();

//        System.out.println("=====shardIndex====="+shardIndex+"============"+"=====shardTotal====="+shardTotal+"============");

        log.info("分片参数:当前分片序号 = {}, 总分片数 = {}", shardIndex, shardTotal);
        log.info("开始执行第"+shardIndex+"批任务");


    }

2.2、在调度中心添加任务

高级配置说明:

Plain Text
 

添加成功:

启动任务,观察日志

下边启动两个执行器实例,观察每个实例的执行情况

首先在nacos中配置media-service的本地优先配置:

YAML
#配置本地优先
spring:
 cloud:
  config:
    override-none: true

将media-service启动两个实例

两个实例的在启动时注意端口不能冲突:

实例1 在VM options处添加:-Dserver.port=63051 -Dxxl.job.executor.port=9998

实例2 在VM options处添加:-Dserver.port=63050 -Dxxl.job.executor.port=9999

例如:

如果找不到就点击 Environment就会弹出

启动两个实例

观察任务调度中心,稍等片刻执行器有两个

观察两个执行实例的日志:

另一实例的日志如下:

从日志可以看每个实例的分片序号不同。

到此作业分片任务调试完成,此时我们可以思考:

当一次分片广播到来,各执行器如何根据分片参数去分布式执行任务,保证执行器之间执行的任务不重复呢?

3、作业分片方案

掌握了xxl-job的作业分片调度方式,下边思考如何分布式去执行学成在线平台中的视频处理任务。

任务添加成功后,对于要处理的任务会添加到待处理任务表中,现在启动多个执行器实例去查询这些待处理任务,此时如何保证多个执行器不会重复执行任务?

执行器收到调度请求后各自己查询属于自己的任务,这样就保证了执行器之间不会重复执行任务。

xxl-job设计作业分片就是为了分布式执行任务,XXL-JOB并不直接提供数据处理的功能,它只会给执行器分配好分片序号并向执行器传递分片总数、分片序号这些参数,开发者需要自行处理分片项与真实数据的对应关系。

下图表示了多个执行器获取视频处理任务的结构:

每个执行器收到广播任务有两个参数:分片总数、分片序号。每个执行从数据表取任务时可以让任务id 模上 分片总数,如果等于分片序号则执行此任务。

上边两个执行器实例那么分片总数为2,序号为0、1,从任务1开始,如下:

1  %  2 = 1    执行器2执行

2  %  2 =  0    执行器1执行

3  %  2 =  1     执行器2执行

以此类推.

3、1 保证任务不重复执行

通过作业分片的方式保证了执行器之间分配的任务不重复,如果同一个执行器在处理一个视频还没有完成,此时调度中心又一次请求调度,为了不重复处理同一个视频该怎么办?

首先配置调度过期策略:

查看文档如下:

    - 调度过期策略:
        - 忽略:调度过期后,忽略过期的任务,从当前时间开始重新计算下次触发时间;
        - 立即执行一次:调度过期后,立即执行一次,并从当前时间开始重新计算下次触发时间;
    - 阻塞处理策略:调度过于密集执行器来不及处理时的处理策略;

这里我们选择忽略,如果立即执行一次可能会重复调度。

其次,再看阻塞处理策略,阻塞处理策略就是当前执行器正在执行任务还没有结束,此时调度时间到该如何处理。

查看文档如下:
        单机串行(默认):调度请求进入单机执行器后,调度请求进入FIFO队列并以串行方式运行;
        丢弃后续调度:调度请求进入单机执行器后,发现执行器存在运行的调度任务,本次请求将会被丢弃并标记为失败;
        覆盖之前调度:调度请求进入单机执行器后,发现执行器存在运行的调度任务,将会终止运行中的调度任务并清空队列,然后运行本地调度任务;

这里选择 丢弃后续调度,避免重复调度。

最后,也就是要注意保证任务处理的幂等性,什么是任务的幂等性?任务的幂等性是指:对于数据的操作不论多少次,操作的结果始终是一致的。执行器接收调度请求去执行任务,要有办法去判断该任务是否处理完成,如果处理完则不再处理,即使重复调度处理相同的任务也不能重复处理相同的视频。

什么是幂等性?

它描述了一次和多次请求某一个资源对于资源本身应该具有同样的结果。

幂等性是为了解决重复提交问题,比如:恶意刷单,重复支付等。

解决幂等性常用的方案:

1)数据库约束,比如:唯一索引,主键。

2)乐观锁,常用于数据库,更新数据时根据乐观锁状态去更新。

3)唯一序列号,操作传递一个唯一序列号,操作时判断与该序列号相等则执行。

这里我们在数据库视频处理表中添加处理状态字段,视频处理完成更新状态为完成,执行视频处理前判断状态是否完成,如果完成则不再处理。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/379621.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

【Web安全社工篇】——水坑攻击

作者名&#xff1a;白昼安全主页面链接&#xff1a; 主页传送门创作初心&#xff1a; 以后赚大钱座右铭&#xff1a; 不要让时代的悲哀成为你的悲哀专研方向&#xff1a; web安全&#xff0c;后渗透技术每日鸡汤&#xff1a;努力赚钱不是因为爱钱“水坑攻击”&#xff0c;黑客攻…

CVPR 2023 接收结果出炉!再创历史新高!录用2360篇!(附10篇最新论文)

点击下方卡片&#xff0c;关注“CVer”公众号AI/CV重磅干货&#xff0c;第一时间送达点击进入—>【计算机视觉】微信技术交流群2023 年 2 月 28 日凌晨&#xff0c;CVPR 2023 顶会论文接收结果出炉&#xff01;这次没有先放出论文 ID List&#xff0c;而是直接 email 通知作…

【C语言】位段

位段 一.简介 位段和结构体很相似。不同的是&#xff1a; 位段的成员&#xff1a;成员名 : 数字且其成员必须是整型(char、int、unsigned int……) 示例&#xff1a; struct S {char a : 3;char b : 2;char c : 7; };S就是一个位段类型&#xff0c;其成员a为占3个比特位的…

【趣味学Python】Python基础语法讲解

目录 编码 标识符 python保留字 注释 实例(Python 3.0) 实例(Python 3.0) 行与缩进 实例(Python 3.0) 实例 多行语句 数字(Number)类型 字符串(String) 实例(Python 3.0) 空行 等待用户输入 实例(Python 3.0) 同一行显示多条语句 实例(Python 3.0) 多个语句构…

【Day02数据结构 空间复杂度】

最近太忙了都好久没有更新博客了,太难了,抽空水一篇文章,大佬们多多支持. 上篇:时间复杂度分析 目录 前言 一、空间复杂度概念&#xff1f; 二、实例展示 三、.有复杂度要求的算法题练习 1.题目链接&#xff1a;力扣--消失的数字 2.题目链接&#xff1a;力扣--旋转数组 总结: 1…

去课工场成都基地学Java,可行吗?

当然可行&#xff0c;不管是你选择自学Java&#xff0c;还是去培训机构学习都是非常不错的职业选择。选择好赛道能让你的未来收获更多。 2023年了&#xff0c;随着数字经济的发展&#xff0c;互联网已经渗入我们生活工作的方方面面&#xff0c;现在即使是吃个饭点个餐很多时候…

SpringBoot解决跨域方式

跨域是指在 Web 应用中&#xff0c;一个服务器资源或应用访问另一个服务器资源或应用的资源时候。由于浏览器的同源策略&#xff0c;一般情况下同一个域中的网站或应用可以互相访问资源&#xff0c;但跨域访问会被浏览器拒绝。浏览器出于安全考虑&#xff0c;会限制跨域访问&am…

深度学习领域的多任务学习综述

文章目录前言1. 什么是多任务学习&#xff1f;2. 为何要使用多任务学习&#xff1f;3. 多任务学习有哪些类型&#xff1f;3.1 基于硬参数共享的多任务学习3.2 基于软参数共享的多任务学习4. 为什么多任务学习能提升模型的性能&#xff1f;4.1 隐藏数据扩充&#xff08;Implicit…

关于sudo配置

前言这里做一个小补充&#xff0c;主要讲一下关于利用sudo对指令提权以及普通用户无法使用sudo指令的问题。在前面的文章【Linux】一文掌握Linux权限中&#xff0c;我们讲到了关于权限的一些问题。我们知道root身份下&#xff0c;一切畅通无阻&#xff0c;而权限只是用来限制我…

urp SpotLight 衰减方式扩展

背景&#xff1a; 解决默认spotLight 的衰减模式下&#xff0c; 在距离灯光特别近的时候&#xff0c;灯光过爆的情况 解决方法&#xff1a; 修改SpotLight的衰减方式 下图是unity给出的几种衰减模式以及图示&#xff1a; 其中InverseSquare是当前2021.2 unity版本中urp(12.1…

相恨见晚的办公插件神器,颠覆我们对辅助工具的认知

不坑盒子 这是一个非常好用的插件工具&#xff0c;专门应用在Word文档和wps&#xff0c;支持Office 2010以上的版本&#xff0c;操作也简单且实用。 不坑盒子下载及使用说明 一键排版功能 像是下面的自动排版功能&#xff0c;可以在配置里面先设定好需要的格式&#xff0c;…

结合java中的锁聊聊锁的本质

在操作系统里面&#xff0c;也会遇到什么信号量、互斥量&#xff0c;然后说利用互斥量、信号量可以实现锁的功能&#xff0c;而操作系统提供的原语有又mutex锁在学习数据库的时候&#xff0c;什么表锁、行锁、读锁、写锁、排它锁、意向锁、meta锁等等&#xff0c;各种各样的锁的…

mysql数据库limit的四种用法

文章目录前言一、语法二、参数说明三、常用示例-4种用法总结前言 mysql数据库中limit子句可以被用于强制select语句返回指定的记录数。limit接受一个或两个数字参数。参数必须是一个整数常量。如果给定两个参数&#xff0c;第一个参数指定第一个返回记录行的偏移量&#xff0c…

实践数据湖iceberg 第四十一课 iceberg的实时性-业界的checkpoint配置

系列文章目录 实践数据湖iceberg 第一课 入门 实践数据湖iceberg 第二课 iceberg基于hadoop的底层数据格式 实践数据湖iceberg 第三课 在sqlclient中&#xff0c;以sql方式从kafka读数据到iceberg 实践数据湖iceberg 第四课 在sqlclient中&#xff0c;以sql方式从kafka读数据到…

硬件系统工程师宝典(12)-----EMC应该知道的事

各位同学大家好&#xff0c;欢迎继续做客电子工程学习圈&#xff0c;今天我们继续来讲这本书&#xff0c;硬件系统工程师宝典。上篇我们说到在做电源完整性分析时去耦电容要遵循的规则&#xff0c;大电容的去耦半径大&#xff0c;小电容的去耦半径小&#xff0c;电容焊盘扇出时…

2023年java春招面试题及答案

2023年java春招面试题1、下面有关jdbc statement的说法错误的是&#xff1f;2、下面有关JVM内存&#xff0c;说法错误的是&#xff1f;3、下面有关servlet service描述错误的是&#xff1f;4、下面有关servlet和cgi的描述&#xff0c;说法错误的是&#xff1f;5、下面有关SPRIN…

Radio Link Monitoring(RLM)

欢迎关注微信同步公众号“modem协议笔记”。 这篇看下radio link monitoring相关的内容&#xff0c;就是UE进行DL radio link quality监听的规定&#xff0c;这部分与RLF的判定息息相关。市面上讲NR相关的书籍&#xff0c;多少都会涉及这部分内容&#xff0c;可能spec上这块的…

pdf免费转换工具,只需记住这3款就够了

PDF格式的文档在人们的办公过程中扮演着非常重要的角色&#xff0c;而PDF格式的文档之所以受到人们的青睐&#xff0c;是因为它不容易被篡改&#xff0c;可以用多种阅读器打开浏览。然而&#xff0c;在实际的应用过程中&#xff0c;我们不仅需要阅读PDF文档&#xff0c;也经常还…

HTTP压力测试概论

常用的HTTP服务压测工具介绍 在项目正式上线之前&#xff0c;我们通常需要通过压测来评估当前系统能够支撑的请求量、排查可能存在的隐藏bug&#xff0c;同时了解了程序的实际处理能力能够帮我们更好的匹配项目的实际需求&#xff0c;节约资源成本。 HTTP服务压力测试工具 在…

如何使用403bypasser绕过目标页面上的访问控制限制

关于403bypasser 403bypasser是一款自动化工具&#xff0c;该工具能够以自动化的形式实现针对目标页面的访问控制限制绕过技术。403bypasser项目目前仍处于积极开发阶段&#xff0c;并且还会增加新的功能。 该工具基于Python语言开发&#xff0c;因此具备良好的跨平台特性。 …