Springboot——集成Elastic Job实现任务调度

news2024/11/17 5:51:20

目录

1.任务调度

2.Elastic Job

3.springboot集成Elastic Job


1.任务调度

什么是任务调度?

任务调度就是指系统为了自动地完成特定任务,在指定的时刻去执行任务的过程,其目的是为了让系统自动且精确地完成任务从而解放人力资源。

如:电商平台在指定地时刻开启抢购活动,而不是需要人力开启;系统在指定的时刻自动发送短信提醒客户;系统在每月的某个时刻进行订单、财务数据等相关数据的整理。

那什么是分布式任务调度?

分布式是一种系统架构,指同一个系统下的不同子服务,分布于不同的机器上,服务之间通过网络交互完成整个系统的业务处理。

分布式调度就是在分布式系统架构下,一个服务往往会拥有多个实例,在这种分布式架构下的任务调度过程。

如上图所示,同一个分布式架构系统下由不同的微服务构成,每一个微服务都拥有多个实例,在此情况下共同完成任务调度。

分布式任务调度的目的就是并向地进行任务调度,提高任务的调度处理能力;有效地分配资源,实现任务处理能力随资源配备弹性伸缩;保证任务的高可用。多个实例共同完成任务调度,即使一个实例挂了,还能由其他实例继续完成;通过zookeeper选举leader的方式,避免任务重复执行。

2.Elastic Job

 Elastic Job 基于 Quartz 和 Curator 开发,是面向互联网生态和海量任务的分布式调度解决方案,由两个相互独立的子项目 ElasticJob-Lite 和 ElasticJob-Cloud 组成。

它通过弹性调度、资源管控、以及作业治理的功能,打造一个适用于互联网场景的分布式调度解决方案,并通过开放的架构设计,提供多元化的作业生态。 它的各个产品使用统一的作业 API,开发者仅需一次开发,即可随意部署。

其中,ElasticJob-Lite 架构如下:

APP即应用程序,内部包含了执行业务逻辑和Elastic-job-lite组件。

Elastic-job-lite:定位为轻量级的无中心化解决方案,负责任务的调度以及产生日志和任务调度记录。无中心化即没有调度中心,各作业节点都是平等的,通过zookeeper分布式协调。

Registery:注册中心,以zookeeper作为Elastic-job-lite的注册中心组件,通过执行任务实例选举的方式保证任务不重复执行。

Console:运维平台,可以查看日志文件等相关信息。

简单来说,就是我们在实例中编写好任务执行逻辑,在指定的时刻告诉elastic job框架,elastic job框架通过配置好的作业分片信息,进行任务调度;elastic job框架通过zookeeper找到该任务对应的命名空间等信息,通过选举leader的形式,选择执行的实例,最后让实例执行自动任务。

我们可以通过 Elastic Job 实现分布式的任务调度,有效地分配资源,达到提高任务执行效率,避免任务重复执行等目的。

3.springboot集成Elastic Job

使用elastic job环境要求如下:

在springboot项目中导入依赖:

    <dependencies>
        <!--springboot相关依赖-->
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter</artifactId>
        </dependency>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-test</artifactId>
            <scope>test</scope>
        </dependency>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-web</artifactId>
        </dependency>
        <dependency>
            <groupId>org.projectlombok</groupId>
            <artifactId>lombok</artifactId>
        </dependency>
        <!--elastic-job-lite-->
        <dependency>
            <groupId>com.dangdang</groupId>
            <artifactId>elastic-job-lite-spring</artifactId>
            <version>2.1.5</version>
        </dependency>
        <!--zookeeper curator-->
        <dependency>
            <groupId>org.apache.curator</groupId>
            <artifactId>curator-recipes</artifactId>
            <version>2.12.0</version>
        </dependency>
        <dependency>
            <groupId>org.apache.curator</groupId>
            <artifactId>curator-framework</artifactId>
            <version>2.12.0</version>
        </dependency>
        <dependency>
            <groupId>org.apache.curator</groupId>
            <artifactId>curator-client</artifactId>
            <version>2.12.0</version>
        </dependency>
        <dependency>
            <groupId>org.apache.zookeeper</groupId>
            <artifactId>zookeeper</artifactId>
            <version>3.4.5</version>
        </dependency>
        <!--mysql-druid-->
        <dependency>
            <groupId>mysql</groupId>
            <artifactId>mysql-connector-java</artifactId>
            <version>5.1.37</version>
        </dependency>
        <dependency>
            <groupId>com.alibaba</groupId>
            <artifactId>druid-spring-boot-starter</artifactId>
            <version>1.1.10</version>
        </dependency>
        <!-- mybatis-plus启动器-->
        <dependency>
            <groupId>com.baomidou</groupId>
            <artifactId>mybatis-plus-boot-starter</artifactId>
            <version>3.5.1</version>
        </dependency>
    </dependencies>

(1)配置注册中心:

import com.dangdang.ddframe.job.reg.base.CoordinatorRegistryCenter;
import com.dangdang.ddframe.job.reg.zookeeper.ZookeeperConfiguration;
import com.dangdang.ddframe.job.reg.zookeeper.ZookeeperRegistryCenter;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

@Configuration
public class JobRegistryCenterConfig {
    //zookeeper端口
    private static final int ZOOKEEPER_PORT = 2181;
    private static final String JOB_NAMESPACE = "elastic-job-example-java";

    @Bean(initMethod = "init")
    public CoordinatorRegistryCenter setupRegistryCenter(){
        //zk配置
        ZookeeperConfiguration zookeeper = new ZookeeperConfiguration("localhost:" + ZOOKEEPER_PORT, JOB_NAMESPACE);
        zookeeper.setSessionTimeoutMilliseconds(100);
        //创建注册中心
        return new ZookeeperRegistryCenter(zookeeper);
    }
}

(2)然后,我们准备一些数据:

@TableName(value ="FileCustom")
@Data
@AllArgsConstructor
@NoArgsConstructor
public class FileCustom implements Serializable {
    @TableId
    private Integer id;
    private String name;
    private String type;
    private String content;
    private boolean backup;

    @TableField(exist = false)
    private static final long serialVersionUID = 1L;
}

 

业务类(基于mybatis plus实现):主要实现业务,根据文件类型查询所有未备份文件

@Service
public class FileCustomServiceImpl extends ServiceImpl<FileCustomMapper, FileCustom>
    implements FileCustomService {
    @Resource
    private FileCustomMapper filecustomMapper;

    @Override
    public List<FileCustom> getFileList(String fileType, int size) {
        //分页参数
        Page<FileCustom> page = Page.of(1,size);
        QueryWrapper<FileCustom> queryWrapper = new QueryWrapper<FileCustom>()
                .eq("type", fileType)
                .eq("backup",false);
        filecustomMapper.selectPage(page,queryWrapper);
        return page.getRecords();
    }
}

关于为什么是根据文件类型查询:

系统分布式地部署在两台服务器上,各有一个实例执行文件自动备份任务。那么可以将作业分为2片,每台服务器上的实例执行两片,每一片作业分别处理text、image类型的文件。

所以,对任务合理地分片化(一般分片项设置大于服务器的数量,最好是服务器数量的倍数)可以最大限度地提升执行作业的吞吐量。

(3)编写elastic job任务执行逻辑:

SimpleJob形式:简单实现,未经过任何封装。

import com.dangdang.ddframe.job.api.ShardingContext;
import com.dangdang.ddframe.job.api.simple.SimpleJob;
import com.seven.scheduler.entities.FileCustom;
import com.seven.scheduler.service.FileCustomService;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Component;

import javax.annotation.Resource;
import java.util.List;

/**
 * 文件备份任务
 */
@Slf4j
@Component
public class FileBackupJob implements SimpleJob {

    @Resource
    private FileCustomService filecustomService;

    //每次任务执行备份文件数量
    private final int FETCH_SIZE = 1;

    @Override
    public void execute(ShardingContext shardingContext) {
        //获取分片参数
        //此处分片参数为文件类型,根据类型对任务进行分片
        //"0=txt,1=jpg"
        String jobParameter = shardingContext.getShardingParameter();
        log.info("作业编号:"+shardingContext.getShardingItem()+",处理"+jobParameter+"类型数据");
        //获取文件
        List<FileCustom> list = filecustomService.getFileList(jobParameter,FETCH_SIZE);
        //备份文件
        backupFiles(list);
    }

    //文件备份
    public void backupFiles(List<FileCustom> list){
        list.forEach(fileCustom -> {
            fileCustom.setBackup(true);
            filecustomService.updateById(fileCustom);
            log.info(fileCustom.getName()+"进行备份");
        });
    }
}

DataflowJob形式,用于处理数据流。

可通过DataflowJobConfiguration配置是否流式处理:流式处理数据只有fetchData方法的返回值为null或集合长度为空时,作业才停止抓取,否则作业将一直运行下去; 非流式处理数据则只会在每次作业执行过程中执行一次fetchData方法和processData方法,随即完成本次作业。

import com.dangdang.ddframe.job.api.ShardingContext;
import com.dangdang.ddframe.job.api.dataflow.DataflowJob;
import com.seven.scheduler.entities.FileCustom;
import com.seven.scheduler.service.FileCustomService;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Component;

import javax.annotation.Resource;
import java.util.List;

@Slf4j
@Component
public class FileBackupDataFlowJob implements DataflowJob<FileCustom> {

    @Resource
    private FileCustomService fileCustomService;

    //抓取数据
    @Override
    public List<FileCustom> fetchData(ShardingContext shardingContext) {
        //获取分片参数
        //此处分片参数为文件类型,根据类型对任务进行分片
        //"0=txt,1=jpg,2=png,3=video"
        String jobParameter = shardingContext.getShardingParameter();
        log.info("作业编号:"+shardingContext.getShardingItem()+",处理"+jobParameter+"类型数据");
        //获取文件
        return fileCustomService.getFileList(jobParameter,1);
    }

    //处理数据
    @Override
    public void processData(ShardingContext shardingContext, List<FileCustom> data) {
        backupFiles(data);
    }
    //文件备份
    public void backupFiles(List<FileCustom> list){
        list.forEach(fileCustom -> {
            fileCustom.setBackup(true);
            fileCustomService.updateById(fileCustom);
            log.info(fileCustom.getName()+"进行备份");
        });
    }
}

从以上两种方式中选取一种类型进行业务代码(都是获取数据,处理数据)的编写。

(4)定义任务调度类,配置作业分片等信息:

import com.dangdang.ddframe.job.api.ElasticJob;
import com.dangdang.ddframe.job.config.JobCoreConfiguration;
import com.dangdang.ddframe.job.config.simple.SimpleJobConfiguration;
import com.dangdang.ddframe.job.lite.config.LiteJobConfiguration;
import com.dangdang.ddframe.job.lite.spring.api.SpringJobScheduler;
import com.dangdang.ddframe.job.reg.base.CoordinatorRegistryCenter;
import com.seven.scheduler.job.FileBackupDataFlowJob;
import com.seven.scheduler.job.FileBackupJob;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.StringUtils;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

import javax.annotation.Resource;

@Configuration
@Slf4j
public class ElasticJobConfig {

    @Resource
    private FileBackupJob fileBackupJob;
    @Resource
    private FileBackupDataFlowJob fileBackupDataFlowJob;
    @Resource
    private CoordinatorRegistryCenter registryCenter;
    //数据库数据源
    @Resource
    private DataSource dataSource;

    @Bean(initMethod = "init")
    public SpringJobScheduler initSimpleElasticJob(){
        //生成日志表
        JobEventRdbConfiguration jobEventRdbConfiguration = new JobEventRdbConfiguration(dataSource);
        return new SpringJobScheduler(fileBackupJob,registryCenter,
                createJobConfiguration(fileBackupJob.getClass(),"0 20 17 * * ?",
                        2,"0=txt,1=jpg"),jobEventRdbConfiguration);
    }


    private LiteJobConfiguration createJobConfiguration(final Class<? extends ElasticJob> jobClass,
                                                        final String cron, final int shardingTotalCount,
                                                        final String shardingItemParameters){
        JobCoreConfiguration.Builder builder = JobCoreConfiguration.newBuilder(jobClass.getName(), cron, shardingTotalCount);
        if (!StringUtils.isEmpty(shardingItemParameters)){
            builder.shardingItemParameters(shardingItemParameters);
        }
        JobCoreConfiguration jobCoreConfiguration = builder.build();
        //simpleJob形式
        SimpleJobConfiguration simpleJobConfiguration = new SimpleJobConfiguration(jobCoreConfiguration,jobClass.getCanonicalName());
        return LiteJobConfiguration.newBuilder(simpleJobConfiguration).overwrite(true).build();
        
        //DataflowJob形式,最后的参数true即为开启流式抓取
//        DataflowJobConfiguration dataflowJobConfiguration =
//                new DataflowJobConfiguration(jobCoreConfiguration, jobClass.getCanonicalName(), true);
//        return LiteJobConfiguration.newBuilder(dataflowJobConfiguration).overwrite(true).build();
    }
}

上述代码中,把任务分为了两片,0号作业负责备份txt类型数据,1号作业负责备份jpg类型数据;该任务在每天的17:20执行(具体参考cron表达式)

(选用DataflowJob形式则把 fileBackupJob 都换成fileBackupDataFlowJob, simpleJobConfiguration换为DataflowJobConfiguration即可。)

(5)我们就可以执行了,分贝启动两个进程,对数据进行处理,执行效果如下:

可以看到,17:20自动执行了任务。

作业0处理了txt文件,作业1处理了jpg文件;两个进程分别处理了各自的文件,实现了资源的合理分配,同时避免了对数据的重复处理。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/503547.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

WX小程序 - 1

视图层&#xff1a;WXML&#xff0c;WXSS 逻辑层&#xff1a;JS 响应数据绑定&#xff0c;事件绑定 勾选这个其实就是解决跨域问题&#xff08;仅限本地开发阶段&#xff09;。 上线需要去合法域名添加。 app.json 文件创建和删除&#xff0c;保持一致&#xff0c;否则报错…

二叉树的层序遍历思想模板

分为两种&#xff1a; 1.第一种是直接将遍历的数据保存到列表里&#xff1b; 2.第二种是将每一层的数据以列表形式保存在列表&#xff1b;&#xff08;今天要讲述的内容&#xff09; 代码如下&#xff0c;思想在后 class Solution {public List<List<Integer>> …

全新 – Amazon EC2 R6a 实例由第三代 AMD EPYC 处理器提供支持,适用于内存密集型工作负载

我们在 Amazon re:Invent 2021 上推出了通用型 Amazon EC2 M6a 实例&#xff0c;并于今年 2 月推出了计算密集型 C6a 实例。这些实例由运行频率高达 3.6 GHz 的第三代 AMD EPYC 处理器提供支持&#xff0c;与上一代实例相比&#xff0c;性价比提高多达 35%。 如今&#xff0c;…

不断联的从Google Drive下载超大文件

不断联的从Google Drive下载超大文件 最近在研究OWOD代码&#xff0c;需要从google drive 下载超大文件&#xff0c;普通方式下载&#xff0c;首先得有个上外网的工具&#xff0c;其次下载过程中总是会断开&#xff0c;所以看了一些博客&#xff0c;总结如下&#xff1a; 安…

基于TINY4412的Andorid开发-------简单的LED灯控制【转】

基于TINY4412的Andorid开发-------简单的LED灯控制 阅读目录(Content) 一、编写驱动程序二、编写代码测试驱动程序三、编写HAL代码四、编写Framework代码五、编写JNI代码六、编写App 参考资料&#xff1a; 《Andriod系统源代码情景分析》 《嵌入式Linux系统开发完全手册_基…

实时语义分割PIDNet算法TensorRT转换

[PIDNet](GitHub - XuJiacong/PIDNet: This is the official repository for our recent work: PIDNet) 是22年新开源的实时语义分割算法&#xff0c;和DDRNet一样具有不错的性能 网络结构如下&#xff1a; 网络分为三个分支&#xff0c;如上图&#xff0c;整体结构和DDRNet比…

shell 脚本中的函数

目录 一. shell 函数作用&#xff1a;二. shell 函数的定义格式&#xff1a;三.函数返回值&#xff1a;四.函数传参&#xff1a;扩展&#xff1a; 六. 函数变量的作用范围:七 . 递归7.1阶乘 八. 函数库 一. shell 函数作用&#xff1a; 使用函数可以避免代码的重复 使用函数可以…

OJ刷题 第十五篇(递推较多,奥赛篇)

31005 - 昆虫繁殖&#xff08;难度非常大&#xff0c;信息奥赛题&#xff09; 时间限制 : 1 秒 内存限制 : 128 MB 科学家在热带森林中发现了一种特殊的昆虫&#xff0c;这种昆虫的繁殖能力很强。每对成虫过x个月产y对卵&#xff0c;每对卵要过两个月长成成虫。假设每个成虫…

从零开始 Spring Boot 27:IoC

从零开始 Spring Boot 27&#xff1a;IoC 自从开始学习和从事Spring Boot开发以来&#xff0c;一个一直让我很迷惑的问题是IoC和Bean到底是什么东西。这个问题一直到我翻阅完Spring开发文档Core Technologies (spring.io)后才真正得到解惑。 虽然中文互联网上关于IoC的文章很多…

基于AT89C51单片机的电子琴设计与仿真

点击链接获取Keil源码与Project Backups仿真图&#xff1a; https://download.csdn.net/download/qq_64505944/87765092?spm1001.2014.3001.5503 源码获取 运用单片机&#xff0c;将音乐的大部分音符与相应按键相匹配&#xff0c;让音乐爱好者利用单片机也可以进行演奏。 基…

SAP EWM /SCWM/CHM_LOG - 显示及分析检查日志

很多公司上了EWM系统后会在运行一段时间之后出现一些系统数据异常情况&#xff0c;问题大致分为以下一些情况&#xff1a; 序号异常情况1 库存调整过账后可用数量的数据不正确2 错误地传输原产国 (2)3 HU 在可用库存中&#xff0c;即使不允许 HU4 非 AQUA 级别创建仓库任务后可…

TCP通讯(三次握手、四次挥手;滑动窗口;TCP状态转换;端口复用;TCP心跳检测机制)

前言&#xff1a;建议看着图片&#xff0c;根据文字描述走一遍TCP通讯过程&#xff0c;加深理解。 目录 TCP通信时序&#xff1a; 1&#xff09;建立连接&#xff08;三次握手&#xff09;的过程&#xff1a; 2&#xff09;数据传输的过程&#xff1a; 3&#xff09;关闭连…

智能优化算法:斑马优化算法-附代码

智能优化算法&#xff1a;斑马优化算法 文章目录 智能优化算法&#xff1a;斑马优化算法1.斑马优化算法1.1 初始化1.2 阶段一&#xff1a;觅食行为1.3 阶段二&#xff1a;针对捕食者的防御策略 2.实验结果3.参考文献4.Matlab 摘要&#xff1a;斑马优化算法&#xff08;Zebra Op…

协同设计有什么优势?都有哪些协同设计软件

设计师创作既有视觉吸引力又实用的作品需要很多时间。对于某些项目&#xff0c;第一次可能会顺利验收&#xff0c;但事实上&#xff0c;设计和修改总是伴随着。 如何有效地修改和促进项目的实施&#xff1f;答案很简单&#xff1a;协作设计。本文将带您深入学习协作设计的相关…

Spring01-Spring简介、IOC简介及入门、IOC详解、bean讲解、依赖注入、配置文件模块化、实现CRUD

Spring简介 一、Spring 是什么 Spring 是一个分层的 Java SE/EE full-stack (一站式) 轻量级开源框架 以 IOC &#xff08;Inverse Of Control:反转控制) 和 AOP &#xff08;Aspect Oriented Programming:面向切面编程) 为核心。 在 Java三层架构中 分别提供了响应技术 分…

docker php安装redis扩展

有这么一个情况&#xff0c;我在docker中&#xff0c;安装了镜像&#xff0c;也启动了容器&#xff0c;容器有&#xff1a;nginx、mysql、redis、php 是一个基本的开发环境 容器启动成功&#xff0c;我们先连接一下&#xff0c;看看是否正常。 先保证这些都ok&#xff0c;我们…

【Spring框架一】——Spring框架简介

系列文章目录 Spring框架简介 系列文章目录前言一、什么是Spring框架&#xff1f;二、Spring框架的优势1.简化开发流程&#xff1a;Spring提供了许多现成的功能&#xff0c;可以使得开发人员在构建应用程序时减少编写重复代码的工作。2.提高可维护性&#xff1a;Spring框架采用…

chatgpt搜索脚本

安装地址 https://greasyfork.org/zh-CN/scripts/459997 注意事项 &#xff01;&#xff01;注意&#xff1a;如果你在360相关浏览器上使用插件。360搜索将不会生效&#xff0c;因为已被浏览器禁用在so.com网址上使用。 &#xff01;&#xff01;尽量选择tampermonkey脚本管…

面试华为测试岗,收到offer后我却毫不犹豫拒绝了....

我大学学的是计算机专业&#xff0c;毕业的时候&#xff0c;对于找工作比较迷茫&#xff0c;也不知道当时怎么想的&#xff0c;一头就扎进了一家外包公司&#xff0c;一干就是2年。我想说的是&#xff0c;但凡有点机会&#xff0c;千万别去外包&#xff01; 在深思熟虑过后&am…

Android性能优化专家需求量大,人才缺口呼之欲出

前言 Android性能优化是Android应用开发中一个非常重要的环节。一款高性能、流畅的应用可以提高用户体验和满意度&#xff0c;提升应用的用户留存率和活跃度。而在今天&#xff0c;移动设备日趋普及&#xff0c;市场竞争日益激烈&#xff0c;优秀的性能已经成为了Android应用不…