Java使用elasticjob实现定时任务(v2.1.5)

news2025/2/27 2:06:45

elastic是一个定时任务库

https://shardingsphere.apache.org/elasticjob/index_zh.html

bff3a57751ea742698917f55e0557d5e.png

项目结构

ba89260324b237010225fa45c99d4290.png

​依赖

        <dependency>
            <groupId>com.dangdang</groupId>
            <artifactId>elastic-job-lite-core</artifactId>
            <version>2.1.5</version>
        </dependency>

实现simplejob

simplejob是使用最多、最简单的定时任务

任务类

定时任务类需要实现相应的定时任务接口(idea快捷键 ctrl+i)

public class MySimpleJob implements SimpleJob

然后在实现的execute里写定时任务的逻辑

public class MySimpleJob implements SimpleJob {
    @Override
    public void execute(ShardingContext shardingContext) {
        System.out.println("分片项: " + shardingContext.getShardingItem() +
                ",总分片项数: " + shardingContext.getShardingTotalCount());
    }
}

定时任务配置

新建App.java

public class App {
} 

添加配置信息(都写在App.java里)

1)zookeeper配置信息(zookeeper作为注册中心,elasticjob将服务注册到zookeeper)

zookeeper搭建可以看我的这一篇文章

在windows搭建zookeeper(单机/集群) - 知乎
    /**
     * 注册中心zookeeper
     */
    public static CoordinatorRegistryCenter zkCenter() {
        // 参数1: zk的地址(集群就写多个,中间用逗号隔开),参数2: 命名空间
        var zc =
                new ZookeeperConfiguration("localhost:2181", "java-simple-job");
        var crc = new ZookeeperRegistryCenter(zc);
        // 初始化注册中心
        crc.init();
        return crc;
    }

2)simplejob任务配置

    /**
     * simple-job配置
     *
     * @return
     */
    public static LiteJobConfiguration configurationSimple() {
        // 1,job核心配置
        var jcc = JobCoreConfiguration
                // 参数1: 任务名称,参数2: cron表达式(0/10 -> 10秒执行一次),参数3: 分片项数量
                .newBuilder("mySimpleJob", "0/10 * * * * ?", 2)
                .build();
        // 2,job类型配置
        // 参数1: 核心配置,参数2: 任务类的全类名
        var jtc = new SimpleJobConfiguration(jcc, MySimpleJob.class.getCanonicalName());
        // 3,job根配置 (LiteJobConfiguration)
        return LiteJobConfiguration.newBuilder(jtc)
                // 有这个才能重新布置任务,否则修改不会生效
                .overwrite(true)
                .build();
    }

3)启动定时任务

    public static void main(String[] args) {
        // 启动定时任务
        // 参数1: 注册中心;参数2: 配置
        new JobScheduler(zkCenter(), configurationSimple()).init();
    }

启动

6e23bf6b17513046cfb242a2287ed547.png

因为我们设置的分片数量是2,所以可以启动另一个定时任务,elasticjob会自动分配任务

5ae8c0ea77479358af4513c9115a76eb.png

684c25b9f11c32a15e400d0da0eeb7ca.png

复制运行配置

启动两个任务,可以看到自动分配任务,原本是一个服务执行分片1和0,现在是分别执行单个任务

dbaa405674086c2f1e712ed5dcce613b.png

caa9dfdcb430d467751bcf161e5e8115.png

dataflow任务

dataflow任务适合处理流式作业,和simplejob不同,分为数据抓取和处理,先获取数据然后进行处理

订单类(被处理的类)

public class Order {
    private Integer orderId;
    // 0 未处理; 1 已处理
    private Integer status;

    @Override
    public String toString() {
        return "Order{" +
                "orderId=" + orderId +
                ", status=" + status +
                '}';
    }

    public Integer getOrderId() {
        return orderId;
    }

    public void setOrderId(Integer orderId) {
        this.orderId = orderId;
    }

    public Integer getStatus() {
        return status;
    }

    public void setStatus(Integer status) {
        this.status = status;
    }
}

任务类

实现接口,有两个方法,对应抓取和处理,抓取方法的返回值会交给处理方法

public class MyDataflowJob implements DataflowJob<Order> { 
    // 抓取数据
    @Override
    public List<Order> fetchData(ShardingContext shardingContext) {
       return null;
    }

    // 处理数据
    @Override
    public void processData(ShardingContext shardingContext, List<Order> data) { 
    }
} 

具体逻辑:初始化100个order,然后抓取指定数据(status为0 并且 订单号%分片总数 == 当前分片项)的订单进行处理,返回值交给处理方法,处理方法进行处理(将order的status设置为1)

public class MyDataflowJob implements DataflowJob<Order> {
    private List<Order> orders = new ArrayList<Order>();

    {
        // 实例化该类时执行
        for (int i = 0; i < 100; i++) {
            Order order = new Order();
            order.setOrderId(i + 1);
            // 未处理
            order.setStatus(0);
            orders.add(order);
        }
    }

    // 抓取数据
    @Override
    public List<Order> fetchData(ShardingContext shardingContext) {
        // 将 订单号%分片总数 == 当前分片项 的订单进行处理
        var orderList = orders.stream()
                // 过滤状态为0的
                .filter(o -> o.getStatus() == 0)
                .filter(o -> o.getOrderId() % shardingContext.getShardingTotalCount()
                        == shardingContext.getShardingItem())
                // 放入集合
                .collect(toList());
        List<Order> subList = null;
        if (orderList != null && orderList.size() > 0) {
            // (抓)截取list
            subList = orderList.subList(0, 10);
        }
        try {
            // 休眠3秒
            Thread.sleep(3000);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        LocalTime time = LocalTime.now();
        System.out.println(time + "我是分片项: " + shardingContext.getShardingItem() + ",我抓取的数据是: " + subList);
        return subList;
    }

    // 处理数据
    @Override
    public void processData(ShardingContext shardingContext, List<Order> data) {
        // 设置为已处理,下次不会再抓取到
        data.forEach(o -> o.setStatus(1));
        try {
            Thread.sleep(3000);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        LocalTime time = LocalTime.now();
        System.out.println(time + "我是分片项: " + shardingContext.getShardingItem() + ",正在处理数据!");
    }
}

App.java

1)dataflow任务配置

   /**
     * dataflow-job配置
     *
     * @return
     */
    public static LiteJobConfiguration configurationDataflow() {
        // 1,job核心配置
        var jcc = JobCoreConfiguration
                // 参数1: 任务名称,参数2: cron表达式(0/10 -> 10秒执行一次),参数3: 分片项数量
                .newBuilder("myDataflowJob", "0/10 * * * * ?", 2)
                .build();
        // 2,job类型配置
        // 参数1: 核心配置,参数2: 任务类的全类名,参数3: 是否开启定时任务(不开则只执行1次)
        var jtc =
                new DataflowJobConfiguration(jcc, MyDataflowJob.class.getCanonicalName(), true);
        // 3,job根配置 (LiteJobConfiguration)
        return LiteJobConfiguration.newBuilder(jtc)
                // 有这个才能重新布置任务,否则修改不会生效
                .overwrite(true)
                .build();
    }

2)main方法

    public static void main(String[] args) {
        // 启动定时任务
        // 参数1: 注册中心;参数2: 配置
        new JobScheduler(zkCenter(), configurationDataflow()).init();
    }


启动

0363103046d4c3da2737246318e51fde.png

script任务

可以运行脚本文件(cmd、python……)

d盘下新建test.txt,修改内容后重命名为.cmd

%1这些是用来接收elastic传递来的参数的

echo running cmd cript: %1,%2,%3,%4,%5

App.java

1)任务配置

    /**
     * script-job配置
     *
     * @return
     */
    public static LiteJobConfiguration configurationScript() {
        // 1,job核心配置
        var jcc = JobCoreConfiguration
                // 参数1: 任务名称,参数2: cron表达式(0/10 -> 10秒执行一次),参数3: 分片项数量
                .newBuilder("myScriptJob", "0/10 * * * * ?", 2)
                .build();
        // 2,job类型配置
        // 参数1: 核心配置,参数2: 任务脚本所在目录
        var jtc =
                new ScriptJobConfiguration(jcc, "d:/test.cmd");
        // 3,job根配置 (LiteJobConfiguration)
        return LiteJobConfiguration.newBuilder(jtc)
                // 有这个才能重新布置任务,否则修改不会生效
                .overwrite(true)
                .build();
    }

2)main方法

    public static void main(String[] args) {
        // 启动定时任务
        // 参数1: 注册中心;参数2: 配置
        new JobScheduler(zkCenter(), configurationScript()).init();
    }

0c3768fe9861e8e2cb0bfc86fc9b7c97.png

后续更文:springboot整合(2.1.5和3.0.0-alpha)

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/412907.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

【linux】——进程和计划任务管理

文章目录1.进程 VS 线程1.1 程序和进程的关系1.2 线程1.3 进程和线程的关系2.查看进程2.1 查看进程信息ps2.2 查看进程信息top2.3 查看进程信息pgrep2.4 查看进程树pstree3.控制进程3.1 进程的启动方式3.2 进程的前后台调度3.3 终止进程的运行kill3.4 终止进程的运行pkill4.计划…

【华为OD机试】1039 - 迷宫问题

文章目录一、题目&#x1f538;题目描述&#x1f538;输入输出&#x1f538;样例1&#x1f538;样例2二、代码参考作者&#xff1a;KJ.JK&#x1f308; &#x1f308; &#x1f308; &#x1f308; &#x1f308; &#x1f308; &#x1f308; &#x1f308; &#x1f308; &am…

CASA(Carnegie-Ames-Stanford Approach)模型应用

植被作为陆地生态系统的重要组成部分对于生态环境功能的维持具有关键作用。植被净初级生产力&#xff08;Net Primary Productivity, NPP&#xff09;是指单位面积上绿色植被在单位时间内由光合作用生产的有机质总量扣除自养呼吸的剩余部分。植被NPP是表征陆地生态系统功能及可…

全网最详细,Jmeter性能测试-性能基础详解,控制器不同选择(四)

目录&#xff1a;导读前言一、Python编程入门到精通二、接口自动化项目实战三、Web自动化项目实战四、App自动化项目实战五、一线大厂简历六、测试开发DevOps体系七、常用自动化测试工具八、JMeter性能测试九、总结&#xff08;尾部小惊喜&#xff09;前言 逻辑控制器 提前说…

机器学习实战:Python基于支持向量机SVM-RFE进行分类预测(三)

文章目录1 前言1.1 支持向量机的介绍1.2 支持向量机的应用2 demo数据集演示2.1 导入函数2.2 构建数据集拟合2.3 预测模型及可视化3 实例演示分类&#xff08;非SVM&#xff09;3.1 导入函数和数据3.2 简单线性分类3.3 最大间隔决定分类4 实例演示分类&#xff08;SVM&#xff0…

面试之Java的SPI机制详细讲解你会吗?

很多小伙伴对SPi不是很熟悉&#xff0c;今天我给大家详细讲解分享下&#xff1a; Java之SPI机制详细目录 1: SPI机制简介 2: SPI原理 3: 使用场景 4: 源码论证 5: 实战 6: 优缺点 6.1 优点 6.2 缺点 Java之SPI机制详解 1: SPI机制简介 SPI 全称是 Service Provider Interface…

Django整合mysqlclinet步骤

准备工作 要在 Django 中使用 MySQL 数据库&#xff0c;您需要完成以下步骤&#xff1a; 安装 MySQL 服务器和客户端。你可以从官方网站下载并安装&#xff1a;https://www.mysql.com/downloads/ 安装 mysqlclient。mysqlclient 是一个 Python 的第三方库&#xff0c;用于连接…

chapter-7数据库事务

以下课程来源于MOOC学习—原课程请见&#xff1a;数据库原理与应用 考研复习 DBMS保证系统中一切事务的原子性、一致性、隔离性和持续性 DBMS必须对事务故障、系统故障和介质故障进行恢复 恢复中最经常使用的技术&#xff1a;数据库转储和登记日志文件 恢复的基本原理&#…

十二、网络规划与设计

&#xff08;一&#xff09;网络设计基础 1、网络系统生命周期 &#xff08;1&#xff09;四阶段周期&#xff08;重叠&#xff09; 构思与规划阶段、分析与设计阶段、实施与构建阶段、运行与维护阶段 特点&#xff1a;能够快速适应新的需求变化&#xff0c;成本低&#xf…

【SQL 初阶教程】一文轻松玩转 SQL

目录 一、SQL 通用语法 二、SQL 语句的分类 三、DDL语句 DDL——数据库操作 查询所有数据库 语句&#xff1a; SHOW DATABASES;&#xff08;大小写均可&#xff0c;建议大写&#xff09; 创建数据库HSK 语句&#xff1a;CREATE DATABASE HSK; 删除数据库HSK 语句 &#…

关于本地git通过ssh链接github时 time out问题的解决方法

目录问题描述解决方法问题描述 我们如果想要用git ssh链接到远端github&#xff0c;进行repo的clone等操作时&#xff0c;会进行如下的操作&#xff1a; 首先在ssh端命令生成rsa秘钥&#xff0c;命令如下&#xff1a; ssh-keygen -t rsa -C “你的git绑定的邮箱名字”然后在g…

【HBase-读写流程】HBase的读写流程与内部执行机制

【HBase-读写流程】HBase的读写流程与内部执行机制1&#xff09;HBase 读取数据流程1.1.文字描述1.2.流程图2&#xff09;HBase 写入数据流程2.1.文字描述2.2.流程图3&#xff09;flush 机制与 compact 机制的原理3.1.文字描述3.2.流程图1&#xff09;HBase 读取数据流程 1.1.…

游戏开发之Unity2021URP项目场景的构建

地面的修改和编辑&#xff1a;地面插件的使用 打开包管理器&#xff0c;在左边的包那里选择“Unity注册表”&#xff0c;在右边进行搜索“Polybrush”&#xff0c;之后选择右下角的安装 安装完之后要选择样本中的URP进行导入&#xff0c;因为我们的项目是URP渲染管线的&#x…

IronOCR for .NET crack,IronOCR的独特功能

IronOCR for .NET crack,IronOCR的独特功能  在IronTesseract上添加了新的“ReadPdfAndOverlayText”方法&#xff0c;该方法允许您添加文本并保留原始PDF书签/注释。 添加了对存储在应用程序子文件夹中的.config和.json文件中的许可证密钥的支持。 将IronSoftware.System.Dra…

shell 函数和数组作业

1、编写函数&#xff0c;实现打印绿色OK和红色FAILED,判断是否有参数&#xff0c;存在为Ok&#xff0c;不存在为FAILED 2、编写函数&#xff0c;实现判断是否无位置参数&#xff0c;如无参数&#xff0c;提示错误 3、编写函数实现两个数字做为参数&#xff0c;返回最大值 4、…

Minecraft 1.12.2模组开发(五十六) 网络(Networking)

我们本次在模组中实现客户端向服务器发送数据的功能。 演示效果演示效果演示效果 1.新建packet包&#xff0c;包中新建PacketHandler类&#xff1a; PacketHandler.java package com.joy187.mcjoygun.packet;import com.joy187.mcjoygun.Main; import com.joy187.mcjoygun.u…

Huggingface微调BART的代码示例:WMT16数据集训练新的标记进行翻译

BART模型是用来预训练seq-to-seq模型的降噪自动编码器&#xff08;autoencoder&#xff09;。它是一个序列到序列的模型&#xff0c;具有对损坏文本的双向编码器和一个从左到右的自回归解码器&#xff0c;所以它可以完美的执行翻译任务。 如果你想在翻译任务上测试一个新的体系…

Java Stream API 操作完全攻略:让你的代码更加出色 (四)

前言 Java Stream 是一种强大的数据处理工具&#xff0c;可以帮助开发人员快速高效地处理和转换数据流。使用 Stream 操作可以大大简化代码&#xff0c;使其更具可读性和可维护性&#xff0c;从而提高开发效率。本文将为您介绍 Java Stream 操作的所有方面&#xff0c;包括 ran…

交友项目【通用设置】三个功能实现

目录 1&#xff1a;交友项目【通用设置】 1.1&#xff1a;查询通用设置 1.1.1&#xff1a;接口地址 1.1.2&#xff1a;流程分析 1.1.3&#xff1a;代码实现 1.2&#xff1a;设置陌生人问题 1.2.1&#xff1a;接口地址 1.2.2&#xff1a;流程分析 1.2.3&#xff1a;代码…

Python 小型项目大全 51~55

五十一、九十九瓶的变体 原文&#xff1a;http://inventwithpython.com/bigbookpython/project51.html 在歌曲“九十九瓶”的这个版本中&#xff0c;该程序通过删除一个字母、交换一个字母的大小写、调换两个字母或重叠一个字母&#xff0c;在每个小节中引入了一些小的不完美。…