跑批系统设计

news2025/1/10 23:33:59

需求分析

将大批量的数据,从一个地方,迁移到另外一个地方,如何处理
跑批系统

主要的涉及到的问题

  • 亿级数据怎么存
  • 怎么防止重复调度
  • 怎么做到负载均衡
  • 同一个节点,任务怎么并行
  • 如何动态调整并发度
  • 机器节点挂了怎么办

概要设计

数据存储

数据导出出来以后怎么存?一般是用文件存或者是数据库存。

数据库存储

对于mysql来说,单表不能太大,一亿行数据拆分成多个表,可以拆分十个表,每一个表大约一千万左右。
问题:

  • 遍历数据,对数据库压力巨大
  • 跑批数据需要删除,造成数据库的碎片问题
文件存储

采用文件系统存储,将文件拆分成多个,而且读取速度快,因为是磁盘顺序读取的

重复调度问题

调度系统我们一般采用比如xxl-job,通常都失败重试的功能,我们要做到访问重复调度的可能,这里可以设计一张表来存储当前任务,如何已经存,则抛弃掉。表的结构类似这样,还可以使用乐观锁更新状态。

防止重复调度

负载均衡

跑批系统怎么上集群,我们可以用MQ,把任务均匀的发布的队列上,在MQ的选型上,我们建议选择redis的list来实现,至于为什么?我们后面会说。
负载均衡

单节点任务并行

使用redis需要自己建一个Event Loop的线程,从redis队列中取任务,然后放入线程池中运行,因为我们使用redis队列缓冲,所以线程池的队列长度设置为0。
为什么选redis队列:
一般的分布式消息队列为了提高效率,会预先取很多消息放在本地,然后使用一个线程把消息放到线程池来处理,这就带来一个问题,如果你的任务太少了的话就会被一个客户端捞取很多任务,使用redis我们就不会有这个问题,因为Event Loop线程是任务进行完一个之后,才从redis里面再拿一个。
并行

###动态调整并发

动态调整任务并发,有两个地方:
1,是任务调用远程接口的速度,这个速度控制可以使用Thread.sleep就可以了
2,是任务并发度控制,有多少个线程同时在跑任务,也就是控制线程池数量,我们修改动态修改线程数可能比较困难。另外一个思路是使用开源组件,比较guava的RateLimiter组件,如下图示。

调整并发数

失败重试逻辑

任务失败原因:
1,机器重启,导致任务都终止了
2,调用远程接口失败,超过次数,任务就会停止
3,机器资源不足,内存溢出了

解决上面的问题,两个步骤:记录进度和任务重试

  • 记录进度,在子任务执行过程中,需要一直刷新执行到任务哪一行了,同时更新进度的时间。
  • 分布式任务系统增加一个补偿任务,定时扫描所有还在执行中的子任务,如果发现任务长时间没有更新,那说明任务终止了。将任务状态改成待执行,在放入消息队列就可以了。
    失败重试逻辑

详情设计

跑批任务表设计

我们讲大文件切割,然后得到若干个相对小的的文件。在linux中切割文件命令如下:

  1. 按行数切割split -l [行数] [输入文件] [输出文件前缀]。例如,如果你想将一个文件分割成每1万行一个文件,后缀是2位,子文件以child开头,可以使用命令:split -a 2 -l 10000 app.log child

  2. 按文件大小切割split -b [大小] [输入文件] [输出文件前缀]。例如,如果你想将一个log文件切割成每个1M大小,后缀是2位数字结尾的子文件,子文件以child开头,可以使用命令:split -a 2 -d -b 1M app.log.10 child

其他常用的选项包括:

  • -a, --suffix-length=N:指定后缀长度为N (默认为2)。
  • -d, --numeric-suffixes:使用数字后缀代替字母后缀。
  • --verbose:在每个输出文件打开前输出文件特征

然后上传的文件系统上,就可以得到文件url地址。
任务表设计信息如下:

CREATE TABLE `t_task` (
  `id` bigint(20) NOT NULL AUTO_INCREMENT,
  `file_name` varchar(64) NOT NULL DEFAULT '' COMMENT '文件名称',
  `task_status` tinyint(4) NOT NULL DEFAULT '0' COMMENT '0 是初始化 1 运行中 2 已完成',
  `task_progress` int(11) NOT NULL DEFAULT '0' COMMENT '读取进度',
  `file_url` varchar(128) NOT NULL DEFAULT '' COMMENT '保存文件的url',
  `ctime` timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP COMMENT '创建时间',
  `utime` timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP COMMENT '更新时间',
  PRIMARY KEY (`id`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_0900_ai_ci;

定时任务

我们可以在后台管理系统里面上传我们分割后的文件,文件表一般都是会有一个group的字段,这里文件我们单独设置一个组,以参数的形式在使用xxl-job动态的传到执行器里面。任务重复我们可以使用文件名称或者是文件地址作为唯一键,每生产一条任务向MQ投递一条消息。伪代码如下:

    @XxlJob("createBatchRunTask")
    public void createBatchRunTask(String fileGroup) throws Exception {
        List<File> files = fileMapper.findByGroup(fileGroup);
        if (CollectionUtils.isEmpty(files)) {
            logger.info("暂无文件 {}", fileGroup);
            return;
        }
        for (File f : files) {
          String fileName =  f.getName();
          String url = f.getFileUrl();
          Task t = new Task();
          t.setFileName(fileName);
            try {
                //略....
                taskMapper.insert(t);
                mqServer.send(t);
            } catch (Exception e) {
                 if (e instanceof SQLIntegrityConstraintViolationException)   logger.error("重复任务");
                 else throw e;
              
            }
        }
        // default success
    }

任务处理过程

在监听redis队列中,我们拿到对应的任务,这时候我们需要指定任务的下载的文件目录,同时判断该处理的任务是否存在文件,如果存在可能之前处理过的。如果不存在我们之间下载下来就可以了。
这里就涉及到一个问题,任务可能是补偿任务,不一定是从0行开始的,同时我们是用FileReader+ BufferedReader 逐行读取,这里可以选择两种方案:

  • FileReader+ BufferedReader根据任务进度,将已经读过的数据直接跳过不处理,但是还是需要重0行开始读取
  • 要更有效地跳过文件的一部分,可以使用LineNumberReader类,它是BufferedReader的子类,提供了一个setLineNumber(int lineNumber)方法,可以让你直接跳到指定的行。

在逐行读取数据以后,我们需要调用远程接口,一条一条的处理,同时将结果入库。这里我们需要做对应的设计,在落库的时候,我们是批量入库的,我们需要设置数据库刷盘策略innodb_flush_log_at_trx_commit=2每隔1s刷新到磁盘。同时我们设置mysql的批量插入参数为rewriteBatchedStatements=true,同时关闭调自动提交。
代码示例如下:

Connection conn = DriverManager.getConnection("jdbc:mysql://localhost:3306/myDatabase?rewriteBatchedStatements=true", "username", "password");
conn.setAutoCommit(false);
String sql = "INSERT INTO myTable VALUES (?, ?)";
PreparedStatement pstmt = conn.prepareStatement(sql);

for (int i = 0; i < 10000; i++) {
    pstmt.setString(1, "column1Value");
    pstmt.setString(2, "column2Value");
    pstmt.addBatch();

    if (i % 500 == 0) {
        pstmt.executeBatch();
    }
}

pstmt.executeBatch();
conn.commit();

但是对应事务的刷盘策略设置,有严格的控制权限,运维老哥也不会让你随便修改数据库的配置,如果这样的话,我们只能使用MyIsam了,如果数据量不大,还是使用innodb也是可以的。
我们在批量插入同时,需要记录当前任务进度详情,还有就是更新时间,方便我们将来排查问题,做任务补偿。

控制任务并发度

在我们资源不足的情况,我们希望任务处理的慢点,或者是不用那么快。这个时候我们需要动态修改对应的值。比如使用nacos或者zookeeper这写配置中心,把对应的参数让配置中心管理起来,动态修改以后能够及时生效的。在任务数量少的情况,我们使用限流组件或者是让任务睡眠是可以的。概要设计已经说的明确,这里不做赘述。
具体措施:

  • 任务处理过程中使用TimeUnit.SECODES.sleep();
  • 任务处理中guava的RateLimiter组件,它是规定时间内,只能有多少个线程处理
  • JDK自带并发控制工具,比如Semaphore

任务补偿

在各种情况下任务发生故障的时候,我们任务进度会停滞,我们需要再创建一个定时任务,扫描任务表中任务状态是运行中,并且是更新时间超过了十几分钟或者半个小时(这个可以自行设置对应的阈值),重新入队就可以了。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/2179391.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

springboot整合MybatisPlus+MySQL

上一篇&#xff1a;springboot整合sentinel和对feign熔断降级 文章目录 一、准备二、主要工作三、具体步骤3.1 准备数据库环境3.20 pre引入依赖3.2 引入依赖3.3 bootstrap.yml配置mybatisplus3.40 pre引入service、mapper3.4 引入实体类、service、mapper 四、测试目录结构 五…

数据结构 ——— 单链表oj题:移除链表中所有 val 的元素

目录 题目要求 手搓简易单链表 代码实现 题目要求 给你一个链表的头节点 head 和一个整数 val &#xff0c;请你删除链表中所有满足 Node.val val 的节点&#xff0c;并返回新的头节点 手搓简易单链表 在实现以上逻辑函数前&#xff0c;要先手搓一个单链表出来&#xff…

iOS--App启动过程及优化

前言 App启动是用户对于一个app的第一印象&#xff0c;因此如何使用户在最短的时间打开进入app显得格外重要。启动优化因此成为了App调优至关重要的一项。 只有具体了解了App的启动过程&#xff0c;我们才能对其进行优化。 App启动过程 App启动分为冷启动和热启动 热启动&…

公司申请商标注册需要什么材料

申请商标注册的&#xff0c;应当向商标局提交《商标注册申请书》及其它文件&#xff0c;具体要求是&#xff1a; 1、申请人必须按一类商品一件商标一份申请的原则&#xff0c;提交《商标注册申请书》一份。也即一份申请书上填报的商品或服务只能限定在《商标注册用商品和服务国…

Debian 配置 Python 开发与运行环境

配置 Python 开发与运行环境。 1.3.1. Debian下的安装与配置 Debian 是一个致力于自由软件开发并宣扬自由软件基金会理念的自愿者组织。 Debian 计划创建于 1993 年。当时&#xff0c;Ian Murdock 发出一份公开信&#xff0c; 邀请软件开发者们参与构建一个基于较新的 Linux …

Java8/9/10/11新特性

目录 一、 Lambda表达式二、函数式(Functional)接口三、方法引用与构造器引用3.1、方法引用3.2 构造器引用和数组引用3.2.1 构造器引用3.2.2 数组引用 四、 强大的Stream API4.1 Stream API说明4.2 Stream 的操作三个步骤4.3 创建 Stream方式4.4 、Stream 的中间操作4.4.1 筛选…

Python | Leetcode Python题解之第447题回旋镖的数量

题目&#xff1a; 题解&#xff1a; class Solution:def numberOfBoomerangs(self, points: List[List[int]]) -> int:ans 0for p in points:cnt defaultdict(int)for q in points:dis (p[0] - q[0]) * (p[0] - q[0]) (p[1] - q[1]) * (p[1] - q[1])cnt[dis] 1for m i…

使用DolphinScheduler调度实现sqoop增量导入时遇到 Caused by:Class QueryResult not found 错误解决

解决方法&#xff1a; 拷贝一个 QueryResult.jar 到 sqoop 的 lib 下 【临时解决方案】 报错信息中有一个相关路径&#xff01;拷贝该路径下的QueryResult.jar到sqoop的lib下&#xff1a; cp /tmp/sqoop-root/compile/dc8e6e7d48be670d676323bf76fd9fe9/QueryResult.jar /op…

通信工程师笔记

第一章 1.支撑网是使业务网正常运行,增强网络功能,提供全网服务质量以满足用户要求的网络。 2.常见的有线通信线路包括&#xff08;1&#xff09;双绞线&#xff0c;&#xff08;2&#xff09;同轴电缆&#xff0c;&#xff08;3&#xff09;光纤等&#xff0c;无线通信线路是…

过渡到内存安全语言:挑战和注意事项

开放源代码安全基金会 ( OpenSSF )总经理 Omkhar Arasaratnam 讨论了内存安全编程语言的演变及其为应对 C 和 C 等语言的局限性而出现的现象。 内存安全问题已存在五十多年&#xff0c;它要求程序员从内存管理任务中抽离出来。 Java、Rust、Python 和 JavaScript 等现代语言通…

NLP_情感分类_机器学习(w2v)方案

文章目录 项目背景数据清洗导包导入数据切分评论及标签Word2Vec构造w2v 数据切分模型训练查看结果 同类型项目 项目背景 项目的目的&#xff0c;是为了对情感评论数据集进行预测打标。在训练之前&#xff0c;需要对数据进行数据清洗环节&#xff0c;前面已对数据进行清洗&…

数据权限的设计与实现系列11——前端筛选器组件Everright-filter集成功能完善2

‍ 筛选条件数据类型完善 文本类 筛选器组件给了一个文本类操作的范例&#xff0c;如下&#xff1a; Text: [{label: 等于,en_label: Equal,style: noop},{label: 等于其中之一,en_label: Equal to one of,value: one_of,style: tags},{label: 不等于,en_label: Not equal,v…

2024年9月30日历史上的今天大事件早读

1626年9月30日 清太祖努尔哈赤去世 1862年9月30日 德国首任宰相俾斯麦实行“铁血政策” 1887年9月30日 黄河决口 1931年9月30日 国际联盟决议日本撤兵 1937年9月30日 平型关战役结束 1938年9月30日 慕尼黑协议签订 1938年9月30日 前中华民国国务总理唐绍仪遇刺身亡 1941…

使用three.js 实现测量

使用three.js 实现测量 在线预览https://threehub.cn/#/codeMirror?navigationThreeJS&classifyapplication&idlineMeasure 在 https://threehub.cn 中还有很多案例 <!doctype html> <html lang"en"> <head> <meta charset"U…

实验四 IEEE 802.3协议分析和以太网

实验四 IEEE 802.3协议分析和以太网 一、实验目的 1、分析802.3协议 2、熟悉以太网帧的格式 二、实验内容及结果 1、俘获并分析以太网帧 &#xff08;1&#xff09;清空浏览器缓存&#xff08;在IE窗口中&#xff0c;选择“工具/Internet选项/删除文件”命令&#xff09;。 &…

【linux 多进程并发】linux进程状态与生命周期各阶段转换,进程状态查看分析,助力高性能优化

0102 Linux进程生命周期 ​专栏内容&#xff1a; postgresql使用入门基础手写数据库toadb并发编程 个人主页&#xff1a;我的主页 管理社区&#xff1a;开源数据库 座右铭&#xff1a;天行健&#xff0c;君子以自强不息&#xff1b;地势坤&#xff0c;君子以厚德载物. 文章目录…

spring security 超详细使用教程(接入springboot、前后端分离)

Spring Security 是一个强大且可扩展的框架&#xff0c;用于保护 Java 应用程序&#xff0c;尤其是基于 Spring 的应用。它提供了身份验证&#xff08;验证用户身份&#xff09;、授权&#xff08;管理用户权限&#xff09;和防护机制&#xff08;如 CSRF 保护和防止会话劫持&a…

变压吸附制氧机:原理、应用与优势

变压吸附制氧机(Pressure Swing Adsorption Oxygen Generator&#xff0c;简称PSA制氧机)是一种利用变压吸附技术从空气中分离出氧气的设备。 一、基本原理 变压吸附制氧机的基本原理基于不同气体在吸附剂上的吸附能力随压力变化的特性。当原料空气经过鼓风机增压后&#xff0c…

鸿蒙harmonyos next flutter通信之MethodChannel获取设备信息

本文将通过MethodChannel获取设备信息&#xff0c;以此来演练MethodChannel用法。 建立channel flutter代码&#xff1a; MethodChannel methodChannel MethodChannel("com.xmg.test"); ohos代码&#xff1a; private channel: MethodChannel | null nullthis.c…

面试题:通过队列实现栈

题目&#xff1a; 请你仅使用两个队列实现一个后入先出&#xff08;LIFO&#xff09;的栈&#xff0c;并支持普通栈的全部四种操作&#xff08;push、top、pop 和 empty&#xff09;。 实现 MyStack 类&#xff1a; void push(int x) 将元素 x 压入栈顶。int pop() 移除并返…