分布式任务调度

news2025/3/31 5:26:25

今天我们讲讲分布式定时任务调度—ElasticJob。

一、概述

1、什么是分布式任务调度

我们可以思考⼀下下⾯业务场景的解决⽅案:

  • 某电商平台需要每天上午10点,下午3点,晚上8点发放⼀批优惠券

  • 某银⾏系统需要在信⽤卡到期还款⽇的前三天进⾏短信提醒

  • 某财务系统需要在每天凌晨0:10分结算前⼀天的财务数据,统计汇总

以上场景就是任务调度所需要解决的问题

任务调度是为了⾃动完成特定任务,在约定的特定时刻去执⾏任务的过程

2、为什么需要分布式调度

我们在之前使⽤过Spring中提供的定时任务注解@Scheduled,在业务类中⽅法中贴上这个注解然后在启动类上贴上 @EnableScheduling 注解

那为什么又需要分布式调度?

感觉Spring给我们提供的这个注解可以完成任务调度的功能,好像已经完美解决问题了,为什么还需要分布式呢?

主要有如下这⼏点原因:

1.单机处理极限:原本1分钟内需要处理1万个订单,但是现在需要1分钟内处理10万个订单;原来⼀个统计需要1⼩时,现在业务⽅需要10分钟就统计出来。你也许会说,你也可以多线程、单机多进程处理。的确,多线程并⾏处理可以提⾼单位时间的处理效率,但是单机能⼒毕竟有限(主要是CPU、内存和磁盘),始终会有单机处理不过来的情况。

2.⾼可⽤:单机版的定式任务调度只能在⼀台机器上运⾏,如果程序或者系统出现异常就会导致功能不可⽤。虽然可以在单机程序实现的⾜够稳定,但始终有机会遇到⾮程序引起的故障,⽽这个对于⼀个系统的核⼼功能来说是不可接受的。

3.防⽌重复执⾏: 在单机模式下,定时任务是没什么问题的。但当我们部署了多台服务,同时⼜每台服务⼜有定时任务时,可能会出现任务重复执行

这个时候就需要分布式的任务调度来实现了。

3、Elastic-Job介绍

Elastic-Job是⼀个分布式调度的解决⽅案,由当当⽹开源,它由两个相互独⽴的⼦项⽬Elastic-job-Lite和

Elastic-Job-Cloud组成,使⽤Elastic-Job可以快速实现分布式任务调度。

Elastic-Job的地址: ElasticJob - Distributed scheduled job solution

功能列表:

  • 分布式调度协调

在分布式环境中,任务能够按照指定的调度策略执⾏,并且能够避免同⼀任务多实例重复执⾏。

  • 丰富的调度策略:

基于成熟的定时任务作业框架Quartz cron表达式执⾏定时任务。

  • 弹性拓容缩容

当集群中增加⼀个实例,它应当能够被选举被执⾏任务;当集群减少⼀个实例时,他所执⾏的任务能被转移到别的示例中执⾏。

  • 失效转移

某示例在任务执⾏失败后,会被转移到其他实例执⾏。

  • 错过执⾏任务重触发

若因某种原因导致作业错过执⾏,⾃动记录错误执⾏的作业,并在下次次作业完成后⾃动触发。

  • ⽀持并⾏调度

⽀持任务分⽚,任务分⽚是指将⼀个任务分成多个⼩任务在多个实例同时执⾏。

  • 作业分⽚⼀致性

当任务被分⽚后,保证同⼀分⽚在分布式环境中仅⼀个执⾏实例。

  • ⽀持作业⽣命周期操作

可以动态对任务进⾏开启及停⽌操作。

  • 丰富的作业类型

⽀持Simple、DataFlow、Script三种作业类型

系统运行架构图

二、Zookeeper下载

建议:zookeeper3.4.6以上版本,JDK1.7以上,maven在3.0.4以上

我这里将Zookeeper安装到了Linux系统上,以我自己安装为例,可自行选择。

1.上传,将zookeeper-3.4.11.tar.gz上传到/usr/local/src/soft/zookeeper目录下

2.解压文件到指定目录

tar -zxvf /usr/local/src/soft/zookeeper-3.4.11.tar.gz -C /usr/local/src/soft/zookeeper

3.拷贝配置文件

cp /usr/local/src/soft/zookeeper/zookeeper-3.4.11/conf/zoo_sample.cfg /usr/local/src/soft/zookeeper/zookeeper-3.4.11/conf/zoo.cfg

4.启动

/usr/local/src/soft/zookeeper/apache-zookeeper-3.5.6-bin/bin/zkServer.sh start

5.检查进程是否开启

这个命令不一定有效

jps

可以试试这个命令,查看状态

/usr/local/src/soft/zookeeper/apache-zookeeper-3.5.6-bin/bin/zkServer.sh status

三、任务分片参数

1、分片的概念

作业分片是指任务的分布式执行,需要将一个任务拆分为多个独立的任务项,然后由分布式的应用实例分别执行某一个或者几个分布项。

例如:Elastic-job快速入门中文件备份的案例,现有两台服务器,每台服务器分别跑一个应用实例。为了快速执行作业,那么可以讲任务分成4片,每个应用实例都执行两片。作业遍历数据逻辑应为:实例1查找text和image类型文件执行备份,实例2査找radio和vedio类型文件执行备份。如果由于服务器拓容应用实例数量增加为4,则作业谝历数据的逻辑应为: 4个实例分别处理text,image,radio,video类型的文件。

可以看到,通过对任务的合理分片化,从而达到任务并行处理的效果.

  • 当只有一台机器时,给定时任务分片四个,在机器中启动四个线程,分别处理四个分片的内容

  • 当只有两台机器时,分片由两台机器进行分配,A负责索引为0,1的内容,B负责索引为2,3的内容

  • 三台机器,如图

  • 四台机器,平均分摊

集群之后,可以分摊CPU的处理压力,提高数据处理的速度,那到底几台机器好呢?

分片数建议是机器个数的倍数

在秒杀项目中,我们将秒杀商品的场次分成了10、12、14三个场次。在这里我们就根据场次将其分成三片

2、分片分配机制

ElasticJob的分片分配机制

  • Zookeeper 协调:ElasticJob 通过 Zookeeper 协调任务实例的分片分配。每个任务实例启动时,会向 Zookeeper 注册自己,并获取分配给自己的分片信息。

  • 动态分片分配:如果任务实例的数量发生变化(比如新增或减少实例),Zookeeper 会重新分配分片,确保每个分片都有任务实例处理。

  • 分片参数传递:分片参数通过 ShardingContext 传递给任务实例,任务实例根据分片参数执行对应的逻辑。

ElasticJob 通过 Zookeeper 协调,将分片分配给不同的任务实例。每个任务实例启动时,会从 Zookeeper 获取分配给自己的分片信息(分片编号和分片参数)。

三、项目集成

1、依赖添加

<dependency>
    <groupId>com.dangdang</groupId>
    <artifactId>elastic-job-lite-spring</artifactId>
    <version>2.1.5</version>
</dependency>
<!--zookeeper客户端-->
<dependency>
    <groupId>org.apache.curator</groupId>
    <artifactId>curator-recipes</artifactId>
    <version>2.10.0</version>
</dependency>

2、分布式调度配置

1)注册中心配置

获取zk的地址和任务名称,将任务注册到zk注册中心中

@Configuration
public class RegistryCenterConfig {
    @Bean(initMethod = "init")
    public CoordinatorRegistryCenter createRegistryCenter(@Value("${elasticjob.zookeeper-url}") String zookeeperUrl, @Value("${elasticjob.group-name}") String groupName) {
        //zk的配置
        ZookeeperConfiguration zookeeperConfiguration = new ZookeeperConfiguration(zookeeperUrl,groupName);
        //设置zk超时时间
        zookeeperConfiguration.setSessionTimeoutMilliseconds(100);
        //创建注册中心
        CoordinatorRegistryCenter zookeeperRegistryCenter = new ZookeeperRegistryCenter(zookeeperConfiguration);
        return zookeeperRegistryCenter;
    }
}
2)分布式调度参数配置
#分布式定时任务配置
elasticjob:
  zookeeper-url: 192.168.88.130:2181
  group-name: shop-job-group
  jobCron:
    #3分钟执行一次
    seckillProduct: 0 0/3 * * * ?
3)定时任务配置

需要使用定时任务的服务可能不止一个,不同的定时任务,表达式和分片参数、个数都不一样

将不同的定时任务创建不同的LiteJobConfiguration对象,指定不同的参数类型,将其创建为一个Bean,交给spring容器管理

@Configuration
public class BusinessJobConfig {
@Bean(initMethod = "init")
    public SpringJobScheduler initSPJob(CoordinatorRegistryCenter registryCenter, SeckillProductJob seckillProductJob){
        LiteJobConfiguration jobConfiguration = ElasticJobUtil.createJobConfiguration(
                seckillProductJob.getClass(),
                seckillProductJob.getCron(),//任务类的cron表达式
                seckillProductJob.getShardingTotalCount(), //分片个数
                seckillProductJob.getShardingParameters(), //分片参数
                seckillProductJob.isDataflowType());//不是dataflow类型
        SpringJobScheduler springJobScheduler = new SpringJobScheduler(seckillProductJob, registryCenter,jobConfiguration );
        return springJobScheduler;
    }
}    
4)分布式调度工具类

主要是用于创建LiteJobConfiguration对象,为不同的定时任务定义不同的配置类型

  • 指定定时任务类

  • 任务类的cron表达式

  • 分片个数

  • 分片参数

  • 是否为dataflow类型

public class ElasticJobUtil {
    public static LiteJobConfiguration createJobConfiguration(final Class<? extends SimpleJob> jobClass,final String cron,final int shardingTotalCount,
final String shardingItemParameters,boolean dataflowType) {
        // 定义作业核心配置
        JobCoreConfiguration.Builder jobCoreConfigurationBuilder = JobCoreConfiguration.newBuilder(jobClass.getSimpleName(), cron, shardingTotalCount);
        if(!StringUtils.isEmpty(shardingItemParameters)){
            jobCoreConfigurationBuilder.shardingItemParameters(shardingItemParameters);
        }
        JobTypeConfiguration jobConfig = null;
        if(dataflowType){
            jobConfig = new DataflowJobConfiguration(jobCoreConfigurationBuilder.build(),jobClass.getCanonicalName(),true);
        }else {
            // 定义SIMPLE类型配置
            jobConfig = new SimpleJobConfiguration(jobCoreConfigurationBuilder.build(), jobClass.getCanonicalName());
        }
        // 定义Lite作业根配置
        LiteJobConfiguration simpleJobRootConfig = LiteJobConfiguration.newBuilder(jobConfig).overwrite(true).build();
        return simpleJobRootConfig;
    }
    public static LiteJobConfiguration createDefaultSimpleJobConfiguration(final Class<? extends SimpleJob> jobClass, final String cron) {
        // 创建默认的SIMPLE类型作业配置
        return createJobConfiguration(jobClass,cron,1,null,false);
    }
    public static LiteJobConfiguration createDefaultDataFlowJobConfiguration(final Class<? extends SimpleJob> jobClass, final String cron) {
        // 创建默认的DataFlow类型作业配置
        return createJobConfiguration(jobClass,cron,1,null,true);
    }
}
5)任务分片参数配置
  1. 删除之前的数据

  2. 查询当天的数据同步Redis,定时任务每天执行一次

  3. 给定时任务分片处理,分三片

给定时任务做分片处理:0=10,1=12,2=14(第10场秒杀任务,第12场.........)

jobSharding:
  seckillProduct:
    shardingParameters: 0=10,1=12,2=14
    shardingTotalCount: 3
    dataflowType: false

3、定时任务类

初始化秒杀商品定时任务

@Data
@RefreshScope
@Component
public class SeckillProductJob implements SimpleJob {

    //表达式
    @Value("${elasticjob.jobCron.seckillProduct}")
    private String cron;
    //分片参数
    @Value("${jobSharding.seckillProduct.shardingParameters}")
    private String shardingParameters;
    //分片个数
    @Value("${jobSharding.seckillProduct.shardingTotalCount}")
    private int shardingTotalCount;

    @Value("${jobSharding.seckillProduct.dataflowType}")
    private boolean dataflowType;

    @Resource
    private SeckillProductFeignApi seckillProductFeignApi;

    @Autowired
    private StringRedisTemplate redisTemplate;

    @Override
    public void execute(ShardingContext shardingContext) {
        String time = shardingContext.getShardingParameter();
        //远程调用商品服务获取秒杀列表集合
        Result<List<SeckillProductVo>> result = seckillProductFeignApi.queryByTime(Integer.valueOf(time));
        if (result==null||result.hasError()) {
            //通知管理员
            return;
        }
        List<SeckillProductVo> seckillProductVoList = result.getData();
        //获取秒杀商品key-seckillProductHash:10
        String key = JobRedisKey.SECKILL_PRODUCT_HASH.getRealKey(time);
       
        //删除之前的缓存
        redisTemplate.delete(key);

        HashMap<String, String> seckillProductMap = new HashMap<>();
        //存储集合数据到Redis中
        for (SeckillProductVo vo : seckillProductVoList) {
            seckillProductMap.put(vo.getId().toString(), JSON.toJSONString(vo));
        }
        redisTemplate.opsForHash().putAll(key, seckillProductMap);
        System.out.println("分布式商品秒杀任务执行...............");
    }
}

秒杀列表缓存成功

4、任务分片处理逻辑

分片参数设置为 0=10,1=12,2=14,表示分片0处理时间参数为10的任务,分片1处理时间参数为12的任务,分片2处理时间参数为14的任务。ElasticJob 会根据分片参数将任务分片,并将每个分片分配给不同的任务实例执行。

  • 分片总数(ShardingTotalCount):表示任务的总分片数。比如你有3场秒杀活动,可以将分片总数设置为3,每个分片处理一场秒杀活动。

  • 分片参数(ShardingParameters):可以为每个分片指定参数,比如分片0处理第一场秒杀,分片1处理第二场秒杀,分片2处理第三场秒杀。

ElasticJob 通过 Zookeeper 协调,将分片分配给不同的任务实例。每个任务实例启动时,会从 Zookeeper 获取分配给自己的分片信息(分片编号和分片参数)。

今天的分享结束,感兴趣的兄弟请点赞、收藏,关注我不迷路!下期再见!

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/2318948.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

架构设计的灵魂交响曲:系统设计各维度的深度解析与实战指南

引言: 系统设计的背景与重要性 在快速变化的技术环境中&#xff0c;数字化转型成为企业生存与发展的核心驱动力。系统设计能力不仅是技术团队的核心竞争力&#xff0c;也是推动业务创新和提升整体效率的关键因素。根据Gartner的研究&#xff0c;超过70%的数字化转型项目未能实…

[贪心算法]买卖股票的最佳时机 买卖股票的最佳时机Ⅱ K次取反后最大化的数组和 按身高排序 优势洗牌(田忌赛马)

1.买卖股票的最佳时机 暴力解法就是两层循环&#xff0c;找出两个差值最大的即可。 优化&#xff1a;在找最小的时候不用每次都循环一遍&#xff0c;只要在i向后走的时候&#xff0c;每次记录一下最小的值即可 class Solution { public:int maxProfit(vector<int>& p…

【 <二> 丹方改良:Spring 时代的 JavaWeb】之 Spring MVC 的核心组件:DispatcherServlet 的工作原理

<前文回顾> 点击此处查看 合集 https://blog.csdn.net/foyodesigner/category_12907601.html?fromshareblogcolumn&sharetypeblogcolumn&sharerId12907601&sharereferPC&sharesourceFoyoDesigner&sharefromfrom_link <今日更新> 一、Dispat…

第J3周:DenseNet121算法实现01(Pytorch版)

&#x1f368; 本文为&#x1f517;365天深度学习训练营 中的学习记录博客&#x1f356; 原作者&#xff1a;K同学啊 目标 具体实现 &#xff08;一&#xff09;环境 语言环境&#xff1a;Python 3.10 编 译 器: PyCharm 框 架: Pytorch &#xff08;二&#xff09;具体步骤…

webrtc3A算法

使用ubuntu18.04 选择webrtc_audio_processing v0.3 下载地址 https://gitlab.freedesktop.org/pulseaudio/webrtc-audio-processing/-/tree/master git clone 完 编译 # Initialise into the build/ directory, for a prefixed install into the # install/ directory meson …

让“树和二叉树”埋在记忆土壤中--性质和概念

Nice to meet your! 目录 树的介绍&#xff1a; 树的创建&#xff1a; 二叉树的概念和结构&#xff1a; 二叉树的存储结构&#xff1a; 树的介绍&#xff1a; 概念和结构&#xff1a; 不知你们是否在现实中看见过分为两个叉的枯树&#xff0c;大概长这样&#xff1a; 那…

Spring Boot整合SSE实现消息推送:跨域问题解决与前后端联调实战

摘要 本文记录了一次完整的Spring Boot整合Server-Sent Events&#xff08;SSE&#xff09;实现实时消息推送的开发过程&#xff0c;重点分析前后端联调时遇到的跨域问题及解决方案。通过CrossOrigin注解的实际应用案例&#xff0c;帮助开发者快速定位和解决类似问题。 一、项…

【工具分享】vscode+deepseek的接入与使用

目录 第一章 前言 第二章 获取Deepseek APIKEY 2.1 登录与充值 2.2 创建API key 第三章 vscode接入deepseek并使用 3.1 vscode接入deepseek 3.2 vscode使用deepseek 第一章 前言 deepseek刚出来时有一段时间余额无法充值&#xff0c;导致小编没法给大家发完整的流程&…

康谋方案 | AVM合成数据仿真验证方案

随着自动驾驶技术的快速发展&#xff0c;仿真软件在开发过程中扮演着越来越重要的角色。仿真传感器与环境不仅能够加速算法验证&#xff0c;还能在安全可控的条件下进行复杂场景的重复测试。 本文将分享如何利用自动驾驶仿真软件配置仿真传感器与搭建仿真环境&#xff0c;并对…

Linux内核IPv4路由选择子系统

一、基本知识 1.具体案例&#xff1a;直连路由 结构fib_nh表示下一跳&#xff0c;包含输出网络设备、外出接口索引等信息。 有两个以太网局域网 LAN1 和 LAN2&#xff0c;其中 LAN1 包含子网 192.168.1.0/24&#xff0c;而 LAN2 包含子网 192.168.2.0/24。在这两个 LAN 之…

NWAFU 生物统计实验二 R语言版

#1 setwd(修改为你的工作路径或桌面路径) feed_types <- c("A", "B", "C") weight_gain_means <- c(36.8, 34.9, 21.3) weight_gain_sds <- c(2.4, 2.7, 6.6) weight_gain <- rnorm(3, mean weight_gain_means, sd weight_gain_sd…

Thinkphp指纹识别

识别ThinkPHP框架(指纹) 1.ioc判断 /favicon.ico 2.报错 /1 然后使用工具梭哈

【AVRCP】蓝牙AVRCP协议中的L2CAP互操作性要求深度解析

目录 一、L2CAP互操作性要求&#xff08;针对AVRCP&#xff09; 1.1 核心概念 1.2 AVRCP对L2CAP的增强需求 1.3 关键机制解析 1.4 浏览通道优化配置 1.5 实际应用场景与解决方案 二、通道类型与配置 2.1. 通道类型限制 2.2 PSM字段规范 2.3. 实现意义 3.4. 实际应用…

剑指 Offer II 111. 计算除法

comments: true edit_url: https://github.com/doocs/leetcode/edit/main/lcof2/%E5%89%91%E6%8C%87%20Offer%20II%20111.%20%E8%AE%A1%E7%AE%97%E9%99%A4%E6%B3%95/README.md 剑指 Offer II 111. 计算除法 题目描述 给定一个变量对数组 equations 和一个实数值数组 values 作…

掌握 WRF/Chem 模式:突破大气环境研究技术瓶颈的关键

技术点目录 第一部分、WRF-Chem模式应用案例和理论基础第二部分、Linux环境配置及WRF-CHEM第三部分、WRF-Chem模式编译&#xff0c;排放源制作第四部分、WRF-Chem数据准备&#xff08;气象、排放、初边界条件等&#xff09;&#xff0c;案例实践第五部分、模拟结果提取、数据可…

linux性能监控的分布式集群 prometheus + grafana 监控体系搭建

prometheusgrafana分布式集群资源监控体系搭建 前言一、安装 prometheus二、在要监控的服务器上安装监听器三、prometheus服务器配置四、grafana配置大屏五、创建Linux监控看板五、监控windows服务器注意事项 前言 Prometheus 是一个开源的 ​分布式监控系统 和 ​时间序列数据…

数字化转型 2.0:AI、低代码与智能分析如何重塑企业竞争力?

引言&#xff1a;数字化转型进入2.0时代 在过去的十几年里&#xff0c;企业的数字化转型&#xff08;1.0&#xff09;主要围绕信息化和自动化展开&#xff0c;例如引入ERP、CRM等系统&#xff0c;提高办公效率&#xff0c;减少人为失误。然而&#xff0c;随着市场竞争加剧&…

基于SpringBoot的“校园招聘网站”的设计与实现(源码+数据库+文档+PPT)

基于SpringBoot的“校园招聘网站”的设计与实现&#xff08;源码数据库文档PPT) 开发语言&#xff1a;Java 数据库&#xff1a;MySQL 技术&#xff1a;SpringBoot 工具&#xff1a;IDEA/Ecilpse、Navicat、Maven 系统展示 系统整体功能图 局部E-R图 系统首页界面 系统注册…

由LAC自动建立L2TP实验

一、实验拓扑: 二、实验配置 1.LAC的配置 基础配置: [LAC]int g 0/0/0 [LAC-GigabitEthernet1/0/0]ip address 192.168.0.1 24 [LAC]int g 1/0/0 [LAC-GigabitEthernet1/0/0]ip address 10.1.1.254 24 [LAC-GigabitEthernet1/0/0]int g1/0/1 [LAC-GigabitEthernet1/0/1]ip ad…

内网渗透(CSMSF) 构建内网代理的全面指南:Cobalt Strike 与 Metasploit Framework 深度解析

目录 1. Cobalt Strike 在什么情况下会构建内网代理&#xff1f; 2. Cobalt Strike 构建内网代理的主要作用和目的是什么&#xff1f; 3. Cobalt Strike 如何构建内网代理&#xff1f;需要什么条件和参数&#xff1f; 条件 步骤 参数 4. Cobalt Strike 内网代理能获取什…