SpringBoot中XXL-JOB实现灵活控制的分片处理方案

news2025/1/18 7:22:33
❃博主首页 : 「码到三十五」 ,同名公众号 :「码到三十五」,wx号 : 「liwu0213」
☠博主专栏 : <mysql高手> <elasticsearch高手> <源码解读> <java核心> <面试攻关>
♝博主的话 : 搬的每块砖,皆为峰峦之基;公众号搜索「码到三十五」关注这个爱发技术干货的coder,一起筑基

场景

一个应用需要支持大量数据的批处理任务,要求:

  1. 并行处理能力:应用需能够同时处理多个数据块,即实现并行处理。
  2. 灵活的并发控制:可以灵活调整并行处理的任务数量,以确保资源利用最大化且不过载。
  3. 均衡负载分配:应将任务均匀分配到不同的服务器节点上,以平衡各节点的负载,避免单点压力过大。

解决思路

因为需要并行处理同一张数据表里的数据,所以比较自然地想到了分片查询数据,可以利用对 id 取模的方法进行分片,避免同一条数据被重复处理。那XXL-JOB 的路由策略「分片广播 & 动态分片」很贴合这种场景」来调度定时任务;

实现DEMO

SpringBoot环境下,我们集成xxl-job来实现上述方案。
SpringBoot如何集成xxl-job查看官网即可,这里不再叙述,下面看下分片调度的代码:

1.xxl-job调度管理页面配置分片调度任务

路由策略选择: 分片广播
在这里插入图片描述

2. 编写task代码:

要获取分片总数和当前分片序号,作为参数传给sql语句:

	@Resource
    private OrderDataMapper orderDataMapper;
	
    @XxlJob("orderDataStatusTask")
    public void orderDataStatusTask() {
		// 计时器
        Stopwatch timer = Stopwatch.createStarted();
        
		// 获取xxl-job的localThread中的总的分片数和当前分片
		OrderDataParam param = new OrderDataParam();
        param.setShardIndex(XxlJobHelper.getShardIndex());
        param.setShardTotal(XxlJobHelper.getShardTotal());
        // 其他参数设置,略了....
        
		// 根据分片数拉取当前分片的数据
        List<OrderData> orderDataList = orderDataMapper.getInitStatusOrder(param);
        XxlJobHelper.log("获取待处理订单数据:分片号={},数据量={},总分片数={}", XxlJobHelper.getShardIndex(), orderDataList.size(), XxlJobHelper.getShardTotal());
        if (CollUtil.isEmpty(orderDataList)) {
            return;
        }
        // 处理逻辑,略了....
        
        XxlJobHelper.log("当前分片({})处理完成,耗时={}秒", XxlJobHelper.getShardIndex(), timer.stop().elapsed(TimeUnit.SECONDS));
    }

这里服务启动了4个实例,总分片数ShardTotal就是4,每个实例的ShardIndex分别是0,1,2,3
在这里插入图片描述

3. mybatis中编写sql语句

根据分片总数和当前分片数据对Id哈希取模, 这里做了两次hash,主要作用是用id最后一位hash方便直接看出数据被哪个分片调度了。

	// 获取未处理的订单数据
	// 根据id末位数取hash后分片拉取
	<select id="getInitStatusOrder" parameterType="com.xxx.OrderDataParam"
            resultType="com.xxx.OrderData">
			select id,order_no,customer_code,
            from tt_order_data t
            where t.status = 0
				  and t.fail_count <![CDATA[ < ]]> #{retryCount}
				  and t.update_time <![CDATA[ >= ]]> #{lastUpdateTime}
				  and mod(mod(t.id, 10) , #{shardTotal}) = #{shardIndex}
			limit 0,200	
	</select>	
4.最后看下调度日志

同一次调度任务,4个实例个调度一次,并且拉取到各自部分的数据进行处理:

第3个实例的调度日志:

	    2024-09-25 08:31:40 [com.xxl.job.core.thread.JobThread#run]-[130]-[Thread-144] 
		----------- xxl-job job execute start -----------
		----------- Param:{"lastHoursAgoModify":4,"rows":3000,"lastMonthAgoCreate":6,"retryCount":1}
		2024-09-25 08:31:40 [com.xxx.xxxx#orderDataStatusTask]-[47]-[Thread-144] 获取待处理订单数据:分片号=3,数据量=100,总分片数=4
		2024-09-25 08:31:41 [com.xxx.xxxx#orderDataStatusTask]-[53]-[Thread-144] 当前分片(3)处理完成,耗时=12024-09-25 08:31:41 [com.xxl.job.core.thread.JobThread#run]-[176]-[Thread-144] 
		----------- xxl-job job execute end(finish) -----------
		----------- Result: handleCode=200, handleMsg = null
		2024-09-25 08:31:41 [com.xxl.job.core.thread.TriggerCallbackThread#callbackLog]-[197]-[xxl-job, executor TriggerCallbackThread] 
        ----------- xxl-job job callback finish.

第4个实例的调度日志:

	    2024-09-25 08:31:40 [com.xxl.job.core.thread.JobThread#run]-[130]-[Thread-144] 
		----------- xxl-job job execute start -----------
		----------- Param:{"lastHoursAgoModify":4,"rows":3000,"lastMonthAgoCreate":6,"retryCount":1}
		2024-09-25 08:31:40 [com.xxx.xxxx#orderDataStatusTask]-[47]-[Thread-144] 获取待处理订单数据:分片号=4,数据量=80,总分片数=4
		2024-09-25 08:31:41 [com.xxx.xxxx#orderDataStatusTask]-[53]-[Thread-144] 当前分片(4)处理完成,耗时=12024-09-25 08:31:41 [com.xxl.job.core.thread.JobThread#run]-[176]-[Thread-144] 
		----------- xxl-job job execute end(finish) -----------
		----------- Result: handleCode=200, handleMsg = null
		2024-09-25 08:31:41 [com.xxl.job.core.thread.TriggerCallbackThread#callbackLog]-[197]-[xxl-job, executor TriggerCallbackThread] 
        ----------- xxl-job job callback finish.

关注公众号[码到三十五]获取更多技术干货 !

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/2165616.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

9.25盒马鲜生一面

1.自我介绍 2.css两种盒子模型 ​3.rem和em 4.px概念 5.transition和animation的区别 6.移动端适配方案 7.vh、vw、% 8.js基本数据类型 9.call、apply、bind的区别 10.js实现继承的方法 11.get和post的区别 12.web安全&#xff08;XSS&#xff0c;CSRF&#xff09; …

甩锅笔记:好好的服务端应用突然起不来,经定位是无法访问外网了?测试又说没改网络配置,该如何定位?

在工作中、团队协作时&#xff0c;可能遇到的问题&#xff0c;如集成测试等场景。但是作为偏前端的全栈&#xff0c;锅从天上来&#xff0c;不是你想甩就能甩&#xff0c;尤其面对测试等比较强势的团体&#xff08;bug创造者&#xff09;&#xff0c;你必须有强大的心理承受能力…

Vscode Run Code Py中文乱码问题

F1打开命令行界面&#xff0c;找到settings.json文件&#xff1b;选Workspace这个 找/直接输"code-runner.executorMap" python值改为"$pythonPath $fullFileName"

进程和线程的区别;线程的多种创建方式;Thread 类及常见方法;线程的状态

文章目录 进程和线程的区别线程的创建方式继承Thread&#xff0c;重写run&#xff08;创建单独的类/匿名内部类&#xff09;实现Runnable&#xff0c;重写run&#xff08;创建单独的类/匿名内部类&#xff09;实现Callable&#xff0c;重写call&#xff08;创建单独的类/匿名内…

828华为云征文 | 云服务器Flexus X实例,Docker集成搭建 Jupyter Notebook

828华为云征文 | 云服务器Flexus X实例&#xff0c;Docker集成搭建 Jupyter Notebook Docker 部署 Jupyter Notebook 是一个方便且快速的方式&#xff0c;可以帮助你搭建一个用于数据分析、机器学习和科学计算的环境 华为云端口放行 服务器放行对应端口9955 Docker安装并配置镜…

计算物理精解【1】-C++计算(1)

文章目录 基础hello,worldgetlinestd::cin引用与指针函数数据类型基本数据类型更多类型sizeof 正则表达式单次匹配多次匹配组匹配字符串的匹配 splitmap基础实战整型变量符号表简单分析生成整型变量表 正则表达式基础regex_matchregex_replaceswap Eigen概述简单例子Matrix基础…

DNS协议解析

DNS协议解析 什么是DNS协议 IP地址&#xff1a;一长串唯一标识网络上的计算机的数字 域名&#xff1a;一串由点分割的字符串名字 网址包含了域名 DNS&#xff1a;域名解析协议 IP>域名 --反向解析 域名>IP --正向解析 域名 由ICANN管理&#xff0c;有级别&#xf…

2.1 HuggingFists系统架构(二)

部署架构 上图为HuggingFists的部署架构。从架构图可知&#xff0c;HuggingFists主要分为服务器(Server)、计算节点(Node)以及数据库(Storage)三部分。这三部分可以分别部署在不同的机器上&#xff0c;以满足系统的性能需求。为部署方便&#xff0c;HuggingFists社区版将这三部…

Python | Leetcode Python题解之第419题棋盘上的战舰

题目&#xff1a; 题解&#xff1a; class Solution:def countBattleships(self, board: List[List[str]]) -> int:return sum(ch X and not (i > 0 and board[i - 1][j] X or j > 0 and board[i][j - 1] X)for i, row in enumerate(board) for j, ch in enumerat…

vue全局注册和局部注册的区别

1.全局注册&#xff1a; a、b为注册的组件 2.局部注册 3.有图可以看出&#xff0c;全局注册会影响白屏时间

2024最新Python Debugger工具pdb的用法(深度学习项目),了解输入输出的形状大小

侵入式方法 &#xff08;在被调试的代码中添加以下代码然后再正常运行代码&#xff09; import pdb pdb.set_trace() 例如&#xff1a; 正常运行训练文件后&#xff1a; 在命令行发现输出以下内容&#xff1a; 出现了(Pdb) 的提示符&#xff0c;说明已经打开pdb 在使用Pyth…

解决fatal: unable to access ‘https://........git/‘: Recv failure: Operation time

目录 前言 解决方法一 解决方法二 解决方法三 解决方法四 总结 前言 在使用 Git 进行代码拉取时&#xff0c;可能会遇到连接超时的问题&#xff0c;特别是在某些网络环境下&#xff0c;例如公司网络或防火墙严格的环境中。这种情况下&#xff0c;Git 无法访问远程仓…

红帽认证会过期吗?一文给你解释清楚!

红帽认证以其权威性、实用性和高含金量受到了广大IT人士的青睐&#xff0c;尤其是Linux领域。然而&#xff0c;许多人在考取红帽认证后&#xff0c;心中都有一个疑问&#xff1a;红帽认证会过期吗?本文将为大家详细解答这个问题。 红帽认证是什么? 红帽认证是由红帽公司(Re…

Windows 2003系统的防护技巧,禁止IPC$空连接

一、修改管理员帐号和新建“陷阱”帐号 多年以来&#xff0c;微软一直在强调建议重命名Administrator账号并禁用Guest账号&#xff0c;提高计算机的安全性。Windows Server 2003系统&#xff0c;Guest 账号是默认禁用的&#xff0c;管理员账号默认是Administrator&#xff0c;…

html 几行的空间分成3个区域

1.代码 <!DOCTYPE html> <html lang"en"> <head> <meta charset"UTF-8"> <meta name"viewport" content"widthdevice-width, initial-scale1.0"> <title>三个区域示例</title> …

深入解析Python 中的 sortedcontainers 库:高效的排序数据结构

在日常的 Python 编程中&#xff0c;列表&#xff08;list&#xff09;、集合&#xff08;set&#xff09;和字典&#xff08;dict&#xff09;是常用的数据结构。然而&#xff0c;在某些特定的场景下&#xff0c;我们需要对数据进行排序&#xff0c;并且希望在插入、删除或访问…

4.2章节python中选择结构

选择结构主要通过if、elif&#xff08;else if的缩写&#xff09;和else语句来实现。这些语句允许程序根据条件执行不同的代码块。另外还有表达式中多个条件连接等。 一、基本语句if if 语句后面跟一个条件表达式&#xff0c;如果条件为真&#xff08;True&#xff09;&#…

Machine Learning Specialization 学习笔记(4)

文章目录 前言一、模型评估训练集常规训练集线性回归逻辑回归 交叉验证集 偏差与方差正则化 学习曲线数据集的添加&#xff08;数据增强&#xff09;迁移学习精确率与召回率 二、决策树基本概念决策树的工作原理决策树的优点决策树的缺点决策树算法的变体决策树在Python中的实现…

OpenCV4.8 开发实战系列专栏之 01- 环境搭建与图像读写

大家好&#xff0c;欢迎大家学习OpenCV4.8 开发实战专栏&#xff0c;长期更新&#xff0c;不断分享源码。 专栏代码全部基于C 与Python双语演示&#xff0c;专栏答疑群 请联系微信 OpenCVXueTang_Asst 本文关键知识点&#xff1a; 开发环境搭建、读取图像与显示图像,读取图像…

什么是换电连接器

换电连接器是一种高压连接器&#xff0c;安装在整车中&#xff0c;属于车载连接器。它利用导体和绝缘体的特性&#xff0c;将电能和数据信号从换电站设备传输到电动汽车的电池组&#xff0c;实现能源的快速补充。换电连接器通常由插头和插座两部分组成&#xff0c;通过精密的对…