elastic -job和springboot集成实现分布式调度5

news2024/11/25 6:58:21

一   案例介绍说明

1.1 案例介绍

基于 Spring boot 集成方式的而产出的工程代码,完成对作业分片的实现,文件数据备份采取更接近真实项目的数 据库存取方式。
1.分片设置

2.每个线程获取给自的类型

二 操作说明

2.1 数据表的初始化

DROP TABLE IF EXISTS `t_file`;
CREATE TABLE `t_file` (
`id` varchar(11) CHARACTER SET utf8 COLLATE utf8_general_ci NOT NULL,
`name` varchar(255) CHARACTER SET utf8 COLLATE utf8_general_ci NULL DEFAULT NULL,
`type` varchar(255) CHARACTER SET utf8 COLLATE utf8_general_ci NULL DEFAULT NULL,
`content` varchar(255) CHARACTER SET utf8 COLLATE utf8_general_ci NULL DEFAULT NULL,
`backedUp` tinyint(1) NULL DEFAULT NULL,
PRIMARY KEY (`id`) USING BTREE
) ENGINE = InnoDB CHARACTER SET = utf8 COLLATE = utf8_general_ci ROW_FORMAT = Dynamic;

如图所示:

2.2 初始化数据

2.3 pom文件的编写

   <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-web</artifactId>
        </dependency>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-jdbc</artifactId>
        </dependency>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-test</artifactId>
        </dependency>
        <dependency>
            <groupId>mysql</groupId>
            <artifactId>mysql-connector-java</artifactId>
            <version>5.1.47</version>
        </dependency>

        <dependency>
            <groupId>com.dangdang</groupId>
            <artifactId>elastic-job-lite-spring</artifactId>
            <version>2.1.5</version>
        </dependency>


        <dependency>
            <groupId>org.projectlombok</groupId>
            <artifactId>lombok</artifactId>
        </dependency>

2.4 编写注册配置类

1.截图如下

2.代码

package com.itheima.scheduler.elasticjob.springboot.config;

import com.dangdang.ddframe.job.api.ElasticJob;
import com.dangdang.ddframe.job.api.simple.SimpleJob;
import com.dangdang.ddframe.job.config.JobCoreConfiguration;
import com.dangdang.ddframe.job.config.dataflow.DataflowJobConfiguration;
import com.dangdang.ddframe.job.config.simple.SimpleJobConfiguration;
import com.dangdang.ddframe.job.event.JobEventConfiguration;
import com.dangdang.ddframe.job.event.rdb.JobEventRdbConfiguration;
import com.dangdang.ddframe.job.lite.config.LiteJobConfiguration;
import com.dangdang.ddframe.job.lite.spring.api.SpringJobScheduler;
import com.dangdang.ddframe.job.reg.base.CoordinatorRegistryCenter;
import com.itheima.scheduler.elasticjob.springboot.job.FileBackupJobDb;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.util.StringUtils;

import javax.sql.DataSource;

/**
 * @author Administrator
 * @version 1.0
 **/
@Configuration
public class ElasticJobConfig {

    @Autowired
    private DataSource dataSource; //数据源已经存在,直接引入

//    @Autowired
//    SimpleJob fileBackupJob;

    @Autowired
    FileBackupJobDb fileBackupJob;

//    @Autowired
//    FileBackupJobDataFlow fileBackupJob;

    @Autowired
    CoordinatorRegistryCenter registryCenter;

    /**
     * 配置任务详细信息
     * @param jobClass 任务执行类
     * @param cron  执行策略
     * @param shardingTotalCount 分片数量
     * @param shardingItemParameters 分片个性化参数
     * @return
     */
    private LiteJobConfiguration createJobConfiguration(final Class<? extends SimpleJob> jobClass,
                                                        final String cron,
                                                        final int shardingTotalCount,
                                                        final String shardingItemParameters){
        //JobCoreConfigurationBuilder
        JobCoreConfiguration.Builder JobCoreConfigurationBuilder = JobCoreConfiguration.newBuilder(jobClass.getName(), cron, shardingTotalCount);
        //设置shardingItemParameters
        if(!StringUtils.isEmpty(shardingItemParameters)){
            JobCoreConfigurationBuilder.shardingItemParameters(shardingItemParameters);
        }
        JobCoreConfiguration jobCoreConfiguration = JobCoreConfigurationBuilder.build();
        //创建SimpleJobConfiguration
        SimpleJobConfiguration simpleJobConfiguration = new SimpleJobConfiguration(jobCoreConfiguration, jobClass.getCanonicalName());
        //创建LiteJobConfiguration
        LiteJobConfiguration liteJobConfiguration = LiteJobConfiguration.newBuilder(simpleJobConfiguration).overwrite(true)
                .monitorPort(9888)//设置dump端口
                .build();
        return liteJobConfiguration;
    }

    //创建支持dataFlow类型的作业的配置信息
    private LiteJobConfiguration createFlowJobConfiguration(final Class<? extends ElasticJob> jobClass,
                                                        final String cron,
                                                        final int shardingTotalCount,
                                                        final String shardingItemParameters){
        //JobCoreConfigurationBuilder
        JobCoreConfiguration.Builder JobCoreConfigurationBuilder = JobCoreConfiguration.newBuilder(jobClass.getName(), cron, shardingTotalCount);
        //设置shardingItemParameters
        if(!StringUtils.isEmpty(shardingItemParameters)){
            JobCoreConfigurationBuilder.shardingItemParameters(shardingItemParameters);
        }
        JobCoreConfiguration jobCoreConfiguration = JobCoreConfigurationBuilder.build();
        // 定义数据流类型任务配置
        DataflowJobConfiguration jobConfig = new DataflowJobConfiguration(jobCoreConfiguration, jobClass.getCanonicalName(),true);
        //创建LiteJobConfiguration
        LiteJobConfiguration liteJobConfiguration = LiteJobConfiguration.newBuilder(jobConfig).overwrite(true)
                .monitorPort(9888)//设置dump端口
                .build();
        return liteJobConfiguration;
    }
    @Bean(initMethod = "init")
    public SpringJobScheduler initSimpleElasticJob() {
        // 增加任务事件追踪配置
        JobEventConfiguration jobEventConfig = new JobEventRdbConfiguration(dataSource);
        //创建SpringJobScheduler

        SpringJobScheduler springJobScheduler = new SpringJobScheduler(fileBackupJob, registryCenter,
                createJobConfiguration(fileBackupJob.getClass(), "0/10 * * * * ?", 4, "0=text,1=image,2=radio,3=vedio")
                ,jobEventConfig);
        return springJobScheduler;
    }
}

3.代码

package com.itheima.scheduler.elasticjob.springboot.config;

import com.dangdang.ddframe.job.reg.base.CoordinatorRegistryCenter;
import com.dangdang.ddframe.job.reg.zookeeper.ZookeeperConfiguration;
import com.dangdang.ddframe.job.reg.zookeeper.ZookeeperRegistryCenter;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

/**
 * @author Administrator
 * @version 1.0
 **/
@Configuration
public class ElasticJobRegistryCenterConfig {

    //zookeeper链接字符串 localhost:2181
    private  String ZOOKEEPER_CONNECTION_STRING = "localhost:2181" ;
    //定时任务命名空间
    private  String JOB_NAMESPACE = "elastic-job-example-java";

    //zk的配置及创建注册中心
    @Bean(initMethod = "init")
    public  CoordinatorRegistryCenter setUpRegistryCenter(){
        //zk的配置
        ZookeeperConfiguration zookeeperConfiguration = new ZookeeperConfiguration(ZOOKEEPER_CONNECTION_STRING, JOB_NAMESPACE);

        //创建注册中心
        CoordinatorRegistryCenter zookeeperRegistryCenter = new ZookeeperRegistryCenter(zookeeperConfiguration);
        return zookeeperRegistryCenter;

    }
}

2.5 启动类

2.6 配置连接

2.7 启动zk

2.8 测试

1.启动application服务,查看打印log:

三 多实例测试

3.1 

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/1255578.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

【算法萌新闯力扣】:回文链表

力扣题目&#xff1a;回文链表 开篇 今天是备战蓝桥杯的第23天。我加入的编程导航算法通关村也在今天开营啦&#xff01;那从现在起&#xff0c;我的算法题更新会按照算法村的给的路线更新&#xff0c;更加系统。大家也可以关注我新开的专栏“算法通关村”。里面会有更全面的知…

VMware安装部署kail镜像服务器【详细包含百度云盘镜像】

VMware安装部署kail镜像服务器【详细包含百度云盘镜像】 kail是一个很好玩的操作系统&#xff0c;不多说了哈 下载kail镜像 kail官网:https://www.kali.org/get-kali/#kali-platforms 百度云盘下载&#xff1a; 链接&#xff1a;https://pan.baidu.com/s/1PRjoP_1v8DEZ7-dA_…

python桌面开发PyQt6库和工具库QTDesigner安装和配置

一、安装PyQt6 二、安装pyqt6-tools 三、安装外部工具 四、创建QTDesigner 1.首先查找designer.exe的路径&#xff08;可以自己在窗口中选择&#xff0c;也可以使用Everything搜索&#xff09; 2.使用Everything搜索后会出现多个designer.exe,选中&#xff0c;OpenPath 3.选择…

讯飞星火知识库文档问答Web API的使用(二)

上一篇提到过星火spark大模型&#xff0c;现在有更新到3.0&#xff1a; 给ChuanhuChatGPT 配上讯飞星火spark大模型V2.0&#xff08;一&#xff09; 同时又看到有知识库问答的web api&#xff0c;于是就测试了一下。 下一篇是在ChuanhuChatGPT 中单独写一个基于星火知识库的内容…

【Web-Note】 JavaScript概述

JavaSript基本语法 JavaSript程序不能独立运行&#xff0c;必须依赖于HTML文件。 <script type "text/javascript" [src "外部文件"]> JS语句块; </script> script标记是成对标记。 type属性&#xff1a;说明脚本的类型。 "text/jav…

rdf-file:分布式环境下的文件处理

一&#xff1a;数据量大了以后&#xff0c;单机解析或者生成文件的效率就很低&#xff0c;需要通过集群处理 机构过来的文件&#xff1a;我们先对文件进行分片&#xff0c;在利用集群集群处理分片文件。给机构文件&#xff1a;分库分表数据&#xff0c;每个分表生成一个分片文…

什么是分布式锁?Redis实现分布式锁详解

目录 前言&#xff1a; 分布式系统买票示例 引入redis做分布式锁 引入过期时间 引入校验id 引入lua脚本 过期时间续约问题 redlock算法 小结&#xff1a; 前言&#xff1a; 在分布式系统中&#xff0c;涉及多个主机访问同一块资源&#xff0c;此时就需要锁来做互斥控制…

Linux篇:文件系统

一、共识原理&#xff1a; 文件文件内容文件属性 磁盘上存储文件存文件的内容&#xff08;数据块&#xff09;存文件的属性&#xff08;inode&#xff09; Linux的文件在磁盘中存储是将属性和内容分开存储的。 二、硬件简述&#xff1a; 1. 认识硬件 磁盘&#xff1a;唯一的一…

Vue+ElementUI技巧分享:周数选择器

文章目录 前言一、周数的计算逻辑1.1 周数的定义1.2 年初周数的确定1.3 周数的计算方法 二、代码实现2.1 计算周数2.2 获取周的日期范围2.3 根据周数获取日期范围2.4 控件引用2.4.1 控件引用代码分析2.4.2 初始化变量代码分析 2.5 周数选择器完整代码 总结 前言 Vue.js和Eleme…

文心一言 VS 讯飞星火 VS chatgpt (143)-- 算法导论12.1 3题

三、用go语言&#xff0c;设计一个执行中序遍历的非递归算法。&#xff08;提示:一种容易的方法是使用栈作为辅助数据结构;另一种较复杂但比较简洁的做法是不使用栈&#xff0c;但要假设能测试两个指针是否相等。&#xff09; 文心一言&#xff0c;代码正常运行&#xff1a; …

Linux 面试题(一)

目录 1、绝对路径用什么符号表示&#xff1f;当前目录、上层目录用什么表示&#xff1f;主目录用什么表示? 切换目录用什么命令&#xff1f; 2、怎么查看当前进程&#xff1f;怎么执行退出&#xff1f;怎么查看当前路径&#xff1f; 3、怎么清屏&#xff1f;怎么退出当前命…

4G模块(EC600N)通过MQTT连接华为云

目录 一、前言 二、EC600N模块使用 1&#xff0e;透传模式 2&#xff0e;非透传模式 3、华为云的MQTT使用教程&#xff1a; 三、具体连接步骤 1、初始化检测 2、打开MQTT客户端网络 3、创建产品 4、创建模型 5、注册设备 6、连接客户端到MQTT服务器 7、发布主题消…

【数据分享】我国12.5米分辨率的坡向数据(免费获取)

地形数据&#xff0c;也叫DEM数据&#xff0c;是我们在各项研究中最常使用的数据之一。之前我们分享过源于NASA地球科学数据网站发布的12.5米分辨率DEM地形数据&#xff01;基于该数据我们处理得到12.5米分辨率的坡度数据、12.5米分辨率的山体阴影数据&#xff08;均可查看之前…

Python 安装mysqlclient 错误 无法打开包括文件: “mysql.h”: 解决方法

解决方案&#xff1a;python最新3.12.0不支持mysqlclient 请下载 python3.9.9 版本 高速下载地址CNPM Binaries Mirror 官方下载地址Welcome to Python.org 下载完成后将python添加到环境变量 pycharm 虚拟环境下的python版本切换到你刚才下载的3.9.9的python版本 Avai…

C语言标准

1、概述 C语言标准是由ANSI&#xff08;美国国家标准协会&#xff09;和ISO&#xff08;国际标准化组织&#xff09;共同制定的一种语言规范。标准经历过如下更新&#xff1a; C89/C90标准C99标准C11标准C17标准 2、C89/C90标准 (1)这是1989年正式发布的C语言标准&#xff0…

使用项目管理工具进行新媒体运营管理的策略与方法

使用Zoho Projects项目管理工具&#xff0c;新媒体运营可轻松驾驭从策划选题、撰写到排期发布的全流程。运用项目管理工具对新媒体运营进行精细化管理&#xff0c;助力团队更高效地规划、执行和追踪各项任务与活动。 以下是运用项目管理工具管理新媒体运营的妙招&#xff1a; 1…

Java进阶(第二期):package 包 抽象类和抽象方法 接口的实现 多态的实现 综合继承、接口、多态的使用。

2023年11月26日20:11:11 文章目录 Java进阶&#xff08;第二期&#xff09;一、package包的概念二、抽象类和抽象方法(abstract)2.1 使用2.1 抽象类注意事项 三、接口3.1 接口的定义格式3.2 接口成员特点3.3 类和接口的关系3.4 接口和抽象类的对比 四、多态4.1 多态的前提条件4…

2009年iMac装64位windows7及win10

2009年iMac装64位windows7及win10 Boot Camp没有“创建 Windows7 或更高版本的安装磁盘”选项 安装完Mac OS系统后&#xff0c;要制作Windows7安装U盘时才发现&#xff0c;Boot Camp没有“创建 Windows7 或更高版本的安装磁盘”选项&#xff0c;搜索到文章&#xff1a;修改Boo…

Mac 最佳使用指南

如何在macOS系统安装根证书mac Terminal config proxy 【mac 终端配置代理】iPhone 安装 iOS 17公测版&#xff08;Public Beta)macOS 最佳命令行客户端&#xff1a;iTermMac 配置与 Linux 互信Mac mini 外接移动硬盘无法写入或者无法显示的解决方法如何在 macOS 美化 iterm2 &…