工具篇--分布式定时任务springBoot--elasticjob简单使用(1)

news2024/11/16 21:42:08

文章目录

  • 前言
  • 一、elasticjob 介绍:
  • 二、elasticjob 使用:
    • 2.1 部署zookeeper:
    • 2.2 引入库
    • 2.2 定义任务:
    • 2.3 任务执行:
    • 2.4 任务执行控制台输出:
  • 三、elasticjob 启动错误:
    • 3.1 KeeperErrorCode = OperationTimeout:
    • 3.2 .HostException: ip is null:
  • 总结


前言

本文对 elasticjob 的简单使用进行介绍。


一、elasticjob 介绍:

ElasticJob 是一个分布式任务调度框架,由当当网开发并开源。它基于 Zookeeper 实现分布式协调,采用经典的分片算法,能够实现弹性扩容和缩容的分布式任务调度。ElasticJob 能够灵活地应用于各种环境中,如易用性、稳定性、弹性可伸缩等方面表现优异。

以下是 ElasticJob 的一些主要特性:

  1. 分布式任务调度:ElasticJob 基于 Zookeeper 实现分布式协调,支持分布式自动负载均衡调度。

  2. 弹性扩缩容:ElasticJob 支持弹性扩容和缩容,在任务节点伸缩时能够自动调整任务分片。

  3. 丰富的定时调度策略:ElasticJob 提供了各种灵活的调度策略,如简单的固定频率、CRON 等。

  4. 支持多种任务处理逻辑:ElasticJob 支持处理数据流、打印日志、脚本处理等多种任务处理逻辑。

  5. 统计和监控:ElasticJob 提供了完善的统计和监控功能,可以监控任务执行状态和运行情况。

总体来说,ElasticJob 是一个功能丰富、可靠且易于集成的分布式任务调度框架,广泛应用于企业系统中的定时任务调度、数据处理等场景。

二、elasticjob 使用:

2.1 部署zookeeper:

因为job需要注册到zk 上,依赖于zk 的leader选举,所以需要先进行zk 的安装;
阿里云轻量服务器–Docker–Zookeeper&Kafka安装;
window Zookeeper zk 启动;

2.2 引入库


<!-- https://mvnrepository.com/artifact/org.apache.shardingsphere.elasticjob/elasticjob-lite-core -->
<!-- 定时任务核心库  -->
<dependency>
   <groupId>org.apache.shardingsphere.elasticjob</groupId>
   <artifactId>elasticjob-lite-core</artifactId>
   <version>3.0.4</version>
   <!--<version>3.0.1</version>-->
</dependency>
<!-- https://mvnrepository.com/artifact/org.yaml/snakeyaml -->
<!-- SnakeYaml用于解析YAML  -->
<dependency>
   <groupId>org.yaml</groupId>
   <artifactId>snakeyaml</artifactId>
   <!--<version>1.27</version>-->
   <version>2.2</version>
</dependency>
<!-- https://mvnrepository.com/artifact/org.codehaus.groovy/groovy-all -->
<!--在程序运行时任意修改代码逻辑  -->
<dependency>
   <groupId>org.codehaus.groovy</groupId>
   <artifactId>groovy-all</artifactId>
   <version>2.4.15</version>
</dependency>

2.2 定义任务:


import lombok.extern.slf4j.Slf4j;
import org.apache.shardingsphere.elasticjob.api.ShardingContext;
import org.apache.shardingsphere.elasticjob.simple.job.SimpleJob;
import org.springframework.stereotype.Component;


@Slf4j
@Component
public class MyJob implements SimpleJob {
    @Override
    public void execute(ShardingContext shardingContext) {

        // 分片参数 0=text,1=image,2=radio,3=vedio
        String  shardingParameter= shardingContext.getShardingParameter();
        String  jobParameter= shardingContext.getJobParameter();

        log.debug("job 执行 error,job名称:{},分片数量:{},分片:{},分片参数:{},jobParamer:{}", shardingContext.getJobName(), shardingContext.getShardingTotalCount(),
                shardingContext.getShardingItem(), shardingParameter,jobParameter);
        if ("text".equals(jobParameter)) {
            // do something by sharding
        }
        switch (shardingContext.getShardingItem()) {
            case 0:
                // do something by sharding item 0
                break;
            case 1:
                // do something by sharding item 1
                break;
            case 2:
                // do something by sharding item 2
                break;
            // case n: ...
        }
    }
}

2.3 任务执行:


import groovy.lang.GroovyShell;
import org.apache.shardingsphere.elasticjob.api.JobConfiguration;
import org.apache.shardingsphere.elasticjob.api.JobExtraConfiguration;
import org.apache.shardingsphere.elasticjob.infra.env.HostException;
import org.apache.shardingsphere.elasticjob.infra.env.IpUtils;
import org.apache.shardingsphere.elasticjob.lite.api.bootstrap.impl.ScheduleJobBootstrap;
import org.apache.shardingsphere.elasticjob.lite.internal.snapshot.SnapshotService;
import org.apache.shardingsphere.elasticjob.reg.base.CoordinatorRegistryCenter;
import org.apache.shardingsphere.elasticjob.reg.zookeeper.ZookeeperConfiguration;
import org.apache.shardingsphere.elasticjob.reg.zookeeper.ZookeeperRegistryCenter;
import org.apache.shardingsphere.elasticjob.tracing.event.JobEvent;
import org.apache.shardingsphere.elasticjob.tracing.event.JobExecutionEvent;
import org.apache.shardingsphere.elasticjob.tracing.event.JobStatusTraceEvent;
import org.springframework.util.StringUtils;


public class MyJobDemo {
    public static void main(String[] args) {
    	// 电脑连接无线网时 连接zk 可能出现ip is null 错误,此时设置一个ip给到zk
        shieldElasticjobIpIsNull();
        // 运行任务
        new ScheduleJobBootstrap(createRegistryCenter(), new MyJob(), createJobConfiguration()).schedule();
    }
	// 配置zookeeper 连接
    private static CoordinatorRegistryCenter createRegistryCenter() {
        ZookeeperConfiguration zookeeperConfiguration = new ZookeeperConfiguration("localhost:2181", "my-job");
        zookeeperConfiguration.setConnectionTimeoutMilliseconds(10000);
        zookeeperConfiguration.setSessionTimeoutMilliseconds(10000);
        zookeeperConfiguration.setMaxSleepTimeMilliseconds(10000);

        CoordinatorRegistryCenter regCenter = new ZookeeperRegistryCenter(zookeeperConfiguration);
        regCenter.init();
        return regCenter;
        // ... 分片参数 分片从0开始到分片总数-1
    }
	// 配置job 运行时机
    private static JobConfiguration createJobConfiguration() {

        // 创建作业配置 
        /**
        * myjob-param job 的名称 同一个zk命名空间下 需要唯一
        * 1 分片个数
        * cron 任务运行的cron 表达式
        * overwrite 运行job 的配置被覆盖写入,默认为false
        * shardingItemParameters  分片参数(随后同 分片个数一同介绍)
        * jobParameter job的参数(job 业务端在执行任务的时候可以接收到该参数)
        **/
        JobConfiguration jobConfiguration = JobConfiguration.newBuilder("myjob-param", 1).cron("0/5 * * * * ?")
                .overwrite(true)
              //  .shardingItemParameters("0=Beijing,1=Shanghai,2=Guangzhou")
                .jobParameter("jobparamer")
                .build();
        return jobConfiguration;

    }

    /**
     * 屏蔽org.apache.shardingsphere.elasticjob.infra.env.IpUtils.getIp()抛出
     * HostException(ip is null) 的异常导致windows本地程序无法启动
     */
    private static void shieldElasticjobIpIsNull(){
        try {
            IpUtils.getIp();
        } catch (HostException e) {
            //抛出HostException 且 异常信息为 "ip is null" 时,设置ip地址为 0.0.0.0
            if("ip is null".equals(e.getMessage())){
                String code = "org.apache.shardingsphere.elasticjob.infra.env.IpUtils.cachedIpAddress=\"0.0.0.0\";";
                GroovyShell groovy = new GroovyShell();
                groovy.evaluate(code);
            }
        }
    }

}

2.4 任务执行控制台输出:

在这里插入图片描述

三、elasticjob 启动错误:

3.1 KeeperErrorCode = OperationTimeout:

在这里插入图片描述
报错位置在 ZookeeperRegistryCenter 的 init() 方法中:
在这里插入图片描述
这里等待一段时间后如果还没有连接到zk 就会报错,默认的等待时间是 3000ms * 3 = 9s ,此时可以考虑增加 maxSleepTimeMilliseconds 的时间:

 private static CoordinatorRegistryCenter createRegistryCenter() {
        ZookeeperConfiguration zookeeperConfiguration = new ZookeeperConfiguration("139.196.92.249:2181", "my-job");
        zookeeperConfiguration.setConnectionTimeoutMilliseconds(10000);
        zookeeperConfiguration.setSessionTimeoutMilliseconds(10000);
        // 增加 maxSleepTimeMilliseconds  时间
        zookeeperConfiguration.setMaxSleepTimeMilliseconds(10000);

        CoordinatorRegistryCenter regCenter = new ZookeeperRegistryCenter(zookeeperConfiguration);
        regCenter.init();
        return regCenter;
        // ... 分片参数 分片从0开始到分片总数-1
    }

3.2 .HostException: ip is null:

在这里插入图片描述

错误代码:
在这里插入图片描述
这里会会获取到本机的ip 进行遍历找到一个符合要求的ip 然后进行返回,如果所有的ip 都不通过,则抛出ip is null 的问题;
处理:引入: groovy-all

<dependency>
    <groupId>org.codehaus.groovy</groupId>
    <artifactId>groovy-all</artifactId>
    <version>2.4.15</version>
</dependency>

编写如下方法,目的只是为了在特殊情况下改变cachedIpAddress的值:

/**
 * 屏蔽org.apache.shardingsphere.elasticjob.infra.env.IpUtils.getIp()抛出
 * HostException(ip is null) 的异常导致windows本地程序无法启动
 */
private static void shieldElasticjobIpIsNull(){
    try {
        IpUtils.getIp();
    } catch (HostException e) {
        //抛出HostException 且 异常信息为 "ip is null" 时,设置ip地址为 0.0.0.0
        if("ip is null".equals(e.getMessage())){
            String code = "org.apache.shardingsphere.elasticjob.infra.env.IpUtils.cachedIpAddress=\"0.0.0.0\";";
            GroovyShell groovy = new GroovyShell();
            groovy.evaluate(code);
        }
    }
}

启动类的main方法内部一开始就调用上面这个shieldElasticjobIpIsNull初始化cachedIpAddress:

public static void main(String[] args) {
 //屏蔽org.apache.shardingsphere.elasticjob.infra.env.IpUtils.getIp()抛出
    //HostException(ip is null) 的异常导致windowes本地程序无法启动的问题
    shieldElasticjobIpIsNull();
    new ScheduleJobBootstrap(createRegistryCenter(), new MyJob(), createJobConfiguration()).schedule();

}

总结

本文对 elasticjob 的简单使用进行介绍。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/1508541.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

数据结构->双向链表带你体验开火车(哨兵)与拼接火车(应用)厢的乐趣

✅作者简介&#xff1a;大家好&#xff0c;我是橘橙黄又青&#xff0c;一个想要与大家共同进步的男人&#x1f609;&#x1f609; &#x1f34e;个人主页&#xff1a;橘橙黄又青-CSDN博客 目的&#xff1a;学习双向带头链表的增&#xff0c;删&#xff0c;查&#xff0c;销毁…

Vue+SpringBoot打造个人健康管理系统

目录 一、摘要1.1 项目介绍1.2 项目录屏 二、功能模块2.1 健康档案模块2.2 体检档案模块2.3 健康咨询模块 三、系统展示四、核心代码4.1 查询健康档案4.2 新增健康档案4.3 查询体检档案4.4 新增体检档案4.5 新增健康咨询 五、免责说明 一、摘要 1.1 项目介绍 基于JAVAVueSpri…

PyTorch 源码解读之 torch.cuda.amp: 自动混合精度详解

PyTorch 源码解读之 torch.cuda.amp: 自动混合精度详解 Nvidia 在 Volta 架构中引入 Tensor Core 单元&#xff0c;来支持 FP32 和 FP16 混合精度计算。也在 2018 年提出一个 PyTorch 拓展 apex&#xff0c;来支持模型参数自动混合精度训练。自动混合精度&#xff08;Automati…

2024.03.11作业

1. 提示并输入一个字符串&#xff0c;统计该字符串中大写小写字母个数&#xff0c;数字个数&#xff0c;空格个数以及其他字符个数&#xff0c;要求使用c风格字符串完成 #include <iostream> #include <string>using namespace std;int main() {cout << &qu…

蓝桥杯2023年第十四届Java省赛真题-矩形总面积

题目描述 平面上有个两个矩形 R1 和 R2&#xff0c;它们各边都与坐标轴平行。设 (x1, y1) 和(x2, y2) 依次是 R1 的左下角和右上角坐标&#xff0c;(x3, y3) 和 (x4, y4) 依次是 R2 的左下角和右上角坐标&#xff0c;请你计算 R1 和 R2 的总面积是多少&#xff1f; 注意&…

设计模式深度解析:工厂方法模式与抽象工厂模式的深度对比

​&#x1f308; 个人主页&#xff1a;danci_ &#x1f525; 系列专栏&#xff1a;《设计模式》 &#x1f4aa;&#x1f3fb; 制定明确可量化的目标&#xff0c;坚持默默的做事。 探索设计模式的魅力&#xff1a;工厂方法模式文章浏览阅读17k次&#xff0c;点赞105次&#xff0…

根据xlsx文件第一列的网址爬虫(selenium)

seleniumXpath 在与该ipynb文件同文件下新增一个111.xlsx&#xff0c;第一列放一堆需要爬虫的同样式网页 然后使用seleniumXpath爬虫 from selenium import webdriver from selenium.webdriver.common.by import By import openpyxl import timedef crawl_data(driver, url)…

2024年零基础自学网络安全/Web安全,看这一篇就够了

作为一个安全从业人员&#xff0c;我自知web安全的概念太过于宽泛&#xff0c;我本人了解的也并不够精深&#xff0c;还需要继续学习。 但又不想新入行的人走弯路&#xff0c;所以今天随手写写关于web安全的内容&#xff0c;希望对初次遇到web安全问题的同学提供帮助&#xff…

334.递增的三元子序列

题目&#xff1a;给你一个整数数组 nums &#xff0c;判断这个数组中是否存在长度为 3 的递增子序列。 如果存在这样的三元组下标 (i, j, k) 且满足 i < j < k &#xff0c;使得 nums[i] < nums[j] < nums[k] &#xff0c;返回 true &#xff1b;否则&#xff0c;…

Nginx+keepalived实现七层的负载均衡的高可用

目录 Nginxkeepalived实现七层的负载均衡的高可用 一、准备服务器 1、主机清单 2、配置安装nginx 所有的机器&#xff0c;关闭防火墙和selinux 3.安装nginx&#xff0c; 全部4台 二、部署负载均衡 1、修改nginx的配置文件&#xff0c;添加以下内容&#xff0c; 2、重启n…

APP自动化测试-Appium Inspector入门操作指南

上一篇博客APP自动化测试-入门示例-CSDN博客介绍了APP自动化测试的入门示例,下面详细介绍下Appium 实现的页面元素查看器工具:Appium Inspector的使用方法。 Appium Inspector简介 Appium Inspector 是 Appium 测试框架中的一个工具,用于可视化和调试移动应用程序的 UI 结…

污水处理厂重金属废水深度处理CH-90树脂处理系统

项目名称 广东某工业污水处理厂重金属废水深度处理工程项目 工艺选择 科海思重金属深度处理工艺 工艺原理 离子交换吸附 项目背景 随着环保要求不断提高&#xff0c;工业废水处理已成为众多企业的必修课。然而在工业生产中&#xff0c;如何有效处理含有重金属的废水成为…

结构化思维助力Prompt创作:专业化技术讲解和实践案例

最早接触 Prompt engineering 时, 学到的 Prompt 技巧都是: 你是一个 XX 角色… 你是一个有着 X 年经验的 XX 角色… 你会 XX, 不要 YY.. 对于你不会的东西, 不要瞎说!…对比什么技巧都不用, 直接像使用搜索引擎一样提问, 上面的技巧对于回复的效果确实有着 明显提升. 在看了 N…

【CSS面试题】外边距折叠的原因和解决

参考文章 什么时候出现外边距塌陷 外边距塌陷&#xff0c;也叫外边距折叠&#xff0c;在普通文档流中&#xff0c;在垂直方向上的2个或多个相邻的块级元素&#xff08;父子或者兄弟&#xff09;外边距合并成一个外边距的现象&#xff0c;不过只有上下外边距才会有塌陷&#x…

Xinstall CPA结算系统:精准追踪,轻松提升广告ROI

在如今的移动互联网时代&#xff0c;App推广已经成为各大企业获取用户、扩大市场份额的重要手段。然而&#xff0c;随着推广渠道的多样化&#xff0c;如何精准评估各渠道的效果、优化广告投放策略&#xff0c;以及提升用户体验&#xff0c;成为了摆在推广者面前的难题。 这时…

R语言绘制桑基图教程

原文链接&#xff1a;R语言绘制桑基图教程 写在前面 在昨天3月10日&#xff0c;我们在知乎、B站等分享了功能富集桑基气泡图的绘制教程。相关链接&#xff1a;NC|高颜值功能富集桑基气泡图&#xff0c;桑基气泡组合图。 确实&#xff0c;目前这个图在文章中出现的频率相对比较…

YOLOv8模型改进4【增加注意力机制GAM-Attention(超越CBAM,不计成本地提高精度)】

一、GAM-Attention注意力机制简介 GAM全称:Global Attention Mechanism。它被推出的时候有一个响亮的口号叫做:超越CBAM,不计成本地提高精度。由此可见,它的主要作用是为了目标检测精度的提高。 但是,大家都明白,具体效果怎么样,还得看具体的任务,我浅浅地试了一下,…

SpringBoot +WebSocket应用

我们今天不研究原理&#xff0c;只看应用。 什么是WebSocket WebSocket是一种在单个TCP连接上进行全双工通信的协议。WebSocket通信协议于2011年被IETF定为标准RFC 6455&#xff0c;并由RFC7936补充规范。WebSocket API也被W3C定为标准。 WebSocket使得客户端和服务器之间的数…

微信小程序开发系列(二十)·wxml语法·setData()修改对象类型数据、ES6 提供的展开运算符、delete和rest的用法

目录 1. 新增单个、多个属性 1.1 新增单个属性 1.2 新增多个属性 2. 修改单个、多个属性 2.1 修改单个属性 2.2 修改多个属性 3. 优化 3.1 ES6 提供的展开运算符 3.2 Object.assign()将多个对象合并为一个对象 4. 删除单个、多个属性 4.1 删除单个属性 …

Spring揭秘:Environment接口应用场景及实现原理!

内容概要 Environment接口提供了强大且灵活的环境属性管理能力&#xff0c;通过它&#xff0c;开发者能轻松地访问和配置应用程序运行时的各种属性&#xff0c;如系统属性、环境变量等。 同时&#xff0c;Environment接口还支持属性源的定制和扩展&#xff0c;使得开发者能根…