分页多线程处理大批量数据

news2024/9/22 15:27:07

1.业务场景

因为需要从一个返利明细表中获取大量的数据,生成返利报告,耗时相对较久,作为后台任务执行。但是后台任务如果不用多线程处理,也会要很长时间才能处理完。

另外考虑到数据量大,不能一次查询所有数据在内存中处理,为了防止内存溢出,分页查询数据,然后分批次多线程处理。

主要思想是采取分治的思想,首先分页查询数据,然后每页数据分成均匀的不同片段,多个线程处理这些片段,一个线程处理一个片段,可以加上等待的同步计数器,让这一页数据全部处理完后再去查询下一页的数据。

2.关键代码

//线程池配置
ThreadPoolExecutor poolExecutor = new ThreadPoolExecutor(10,
            10,
            10L,
            TimeUnit.SECONDS,
            new LinkedBlockingQueue<>(200), new ThreadPoolExecutor.CallerRunsPolicy());

   

public String generateReport(String periodType, String monthWid, String quarterWid) {
        int totalNum = 0;
        //计时器
        StopWatch stopWatch = new StopWatch();
        stopWatch.start();

        try {
            //这里省略了一些其他的逻辑,只关注分页查询然后多线程任务处理的逻辑......
            //查询总数量
            totalNum = getReportTotalNum(periodType, monthWid, quarterWid, totalNum);
            int pageIndex = 0;
            int pageSize = 500;
            int pageNum = 1;
            StoreRebateDetailForReportQueryReq req = null;
            while (pageNum <= (totalNum % pageSize == 0 ? (totalNum / pageSize) : (totalNum / pageSize + 1))) {//分页查询,每页500条数据
                pageIndex = pageSize * (pageNum - 1);
                List<StoreRebateDetail> list = storeRebateDetailService.selectListForRebateReport(pageIndex, pageSize);
                int batchNum = list.size();
                //每个线程处理100条                                                                                       
                int perThreadCount = 100;
                LOGGER.info("开始处理第{}页(共{}条)数据", pageNum, batchNum);
                final CountDownLatch cdl = new CountDownLatch((batchNum % perThreadCount) == 0 ? (batchNum / perThreadCount) : (batchNum / perThreadCount + 1)); //计数器
                for (int j = 0; j < batchNum; j++) {
                    //每100条一个线程处理
                    if (j % perThreadCount == 0) {
                        int start = j;
                        int end = (batchNum - j) >= perThreadCount ? (j + perThreadCount) : batchNum;
                        int pageNums = pageNum;
                        poolExecutor.submit(()->{
                            LOGGER.info("第{}页的第{}-{}条数据处理开始", pageNums, start+1, end);
                            //处理比较复杂的业务逻辑(耗时较久)
                            processInsert(list, start, end);
                            LOGGER.info("第{}页的第{}-{}条数据处理结束", pageNums, start+1, end);
                            cdl.countDown();
                        });
                    }
                }
                cdl.await();
                pageNum++;
            }
            stopWatch.stop();
            double totalTimeSeconds = stopWatch.getTotalTimeSeconds();
            result.put("syncStatus", "success");
            result.put("syncMsg", "调度处理完毕,生成" + totalNum + "条数据,执行时间为" + totalTimeSeconds + "秒");
            return SToolUtils.convertResultJSONObj(CommonAbstractService.SUCCESS_STATUS, "处理成功", totalNum, new JSONArray().fluentAdd(result)).toString();
        } catch (Exception e) {
            stopWatch.stop();
            double totalTimeSeconds = stopWatch.getTotalTimeSeconds();
            LOGGER.error("调度处理异常:{}--{}", e.getMessage(), e);
            result.put("syncStatus", "fail");
            result.put("syncMsg", "调度处理完毕,生成" + totalNum + "条数据,执行时间为" + totalTimeSeconds + "秒");
            return SToolUtils.convertResultJSONObj(CommonAbstractService.ERROR_STATUS, "处理异常", 0, new JSONArray().fluentAdd(result)).toString();
        } finally {
            //做业务需要处理的,可以没有
        }
    }

后面改了个通用版,采用接口中的默认方法实现主要公共逻辑,其他几个需要不同实现的方法让子类去实现。

batchProcess方法为主要处理逻辑入口方法,供其子类继承,子类需要传递线程池、每页大小、每个线程处理的条数、查询数据的参数等参数。

processLongTimeLogic方法为处理时间比较长,需要多线程去执行的逻辑,子类直接覆写这个方法,将复杂的耗时比较长的业务逻辑放在里面就可以了。

queryTotalNum方法为查询总记录数的方法,子类去具体实现查询逻辑,查询数量是为了后续分页处理。

queryDataListByPage方法为分页查询数据的方法,也是子类去实现具体的逻辑,这里的第一个参数list加了泛型处理,<T>为查询数据返回的实体对象类,这样在后续处理的时候就不要去强转类型了。

这样子类只需要关注查询大表的查询逻辑,以及需要处理的具体业务逻辑,而不需要去处理分页和多线程处理的逻辑,这样增加了代码的可读性以及减少了出错的可能性。

public interface BatchProcessService<T> {

    /**
     * 批量处理,分页+多线程处理
     * @param poolExecutor       线程池
     * @param pageSize           每页查询的大小
     * @param perThreadCount     每个线程处理的记录数
     * @param queryTotalNumParam 查询记录总数的参数,必须继承PageReq
     * @param queryDataParam     查询分页列表的参数,必须继承PageReq
     * @param logger             子类的日志对象
     * @param otherParam         其他参数,需要给processLongTimeLogic方法传递的参数
     * @throws InterruptedException
     */
    default int batchProcess(ThreadPoolExecutor poolExecutor, int pageSize, int perThreadCount, Object queryTotalNumParam, PageReq queryDataParam, Logger logger, Map<String, Object> otherParam) throws InterruptedException {
        int pageIndex = 0;
        int pageNum = 1;
        int totalNum = queryTotalNum(queryTotalNumParam);
        if (totalNum == 0) {
            logger.info("需要处理的数据数量为0");
            return 0;
        }
        try {
            while (pageNum <= (totalNum % pageSize == 0 ? (totalNum / pageSize) : (totalNum / pageSize + 1))) {
                pageIndex = pageSize * (pageNum - 1);
                queryDataParam.setPageIndex(pageIndex);
                queryDataParam.setPageRows(pageSize);
                List<T> list = queryDataListByPage(queryDataParam);
                int batchNum = list.size();
                final CountDownLatch cdl = new CountDownLatch((batchNum % perThreadCount) == 0 ? (batchNum / perThreadCount) : (batchNum / perThreadCount + 1)); //计数器
                for (int j = 1; j <= (batchNum % perThreadCount == 0 ? (batchNum / perThreadCount) : (batchNum / perThreadCount + 1)); j++) {
                    //每100条一个线程处理
                    int start = perThreadCount * (j - 1);
                    int end = (batchNum - start) >= perThreadCount ? (start + perThreadCount) : batchNum;
                    int pageNums = pageNum;
                    poolExecutor.submit(() -> {
                        logger.info("第{}页的第{}-{}条数据处理开始", pageNums, start + 1, end);
                        //处理其他长时间的逻辑
                        processLongTimeLogic(list.subList(start, end), otherParam);
                        logger.info("第{}页的第{}-{}条数据处理结束", pageNums, start + 1, end);
                        cdl.countDown();
                    });
                }
                cdl.await();
                pageNum++;
            }
        } catch (Exception e) {
            logger.error("批量处理数据异常", e);
            throw e;
        }
        return totalNum;
    }

    /**
     * 查询记录总数
     *
     * @param queryParam
     * @return
     */
    int queryTotalNum(Object queryParam);

    /**
     * 分页查询数据
     *
     * @param queryDataParam
     * @return
     */
    List<T> queryDataListByPage(PageReq queryDataParam);

    /**
     * 处理长时间业务逻辑
     *
     * @param list  处理的数据列表
     * @param otherParam 其他参数
     */
    void processLongTimeLogic(List<T> list, Map<String, Object> otherParam);
}

PageReq类为分页查询参数的父类,里面包含了分页的一些属性,查询参数的实体继承该类就可以了,其他是自己的业务相关的参数。

import lombok.Getter;
import lombok.Setter;

import java.io.Serializable;

@Getter
@Setter
public class PageReq implements Serializable {

    /**
     * 当前页码
     */
    private Integer pageIndex = 1;

    /**
     * 页大小
     */
    private Integer pageRows = 10;

    public PageReq() {
    }

    public PageReq(Integer pageIndex, Integer pageRows) {
        this.pageIndex = pageIndex;
        this.pageRows = pageRows;
    }

}

3.测试效果

原来跑一个月的数据需要40多分钟,后面通过这样处理后,采用5个线程跑,时间缩短至8分钟左右,相当于差不多时间缩短到原来的1/5。

image-20240320124945462

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/1545870.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

华为数通方向HCIP-DataCom H12-821题库(多选题:201-220)

第201题 以下关于BGP中Orginator ID属性的描述,正确的是哪些项? A、Originator ID属于公认任意属性 B、当其他BGP Speaker接收到这条路由的时候,将比较收到的0nginator ID和本地的Router ID,如果两个ID相同BGP Speaker会忽略掉这条路由,不做处理 C、当一条路由第一次被RR…

【目标检测】NMS算法的理论讲解

将NMS就必须先讲IOU&#xff0c; IOU就是交并比&#xff0c;两个检测框的交集除以两个检测框的并集就是IOU 为什么要做NMS操作&#xff0c;因为要去除同一个物体的多的冗余检测框 那么NMS算法是如何做的呢&#xff1f; 以上是算法的流程图 下面讲解算法的流程 首先输入是预…

爬虫Day3

用到的网页--豆瓣电影Top250 需要爬取信息&#xff1a; 数据保存在网页源代码中&#xff0c;是服务加载方式。先拿到网页源代码--request。再通过re提取想要的信息---re。 新知识&#xff1a;用csv存数据&#xff0c;可以用excel表格展示数据 import csv result obj.findite…

串口通信标准RS232 RS485 RS422的区别

RS-232、RS-422、RS-485是关于串口通讯的一个机械和电气接口标准&#xff08;顶多是网络协议中的物理层&#xff09;&#xff0c;不是通讯协议&#xff0c;它们之间的几个不同点如下&#xff1a; 一、硬件管脚接口定义不同 二、工作方式不同 RS232&#xff1a; 3线全双工 RS…

element UI季度选择器的实现

效果展示 用elementUI的select实现季度选择器 代码实现 generateQuarterOption放在methods中&#xff0c;需要近几年的只需要修改第一个循环的次数即可&#xff0c;mounted生命周期函数中调用generateQuarterOption() generateQuarterOption() {//近3年所有季度let now ne…

深入解析以太坊Dencun升级:提升网络性能与安全的关键举措

近年来&#xff0c;以太坊网络一直在不断演进和发展&#xff0c;为了应对日益增长的用户需求和挑战&#xff0c;以太坊社区不断提出并实施各种升级和改进措施。其中&#xff0c;Dencun升级作为最新的一项重大改革&#xff0c;旨在提升以太坊网络的性能和安全性&#xff0c;为其…

护眼台灯有必要买贵的吗?看看业内人士推荐的这五款!

随着学习压力的增大和担心孩子的近视&#xff0c;很多家长朋友们除了培养孩子正确的用眼习惯之外&#xff0c;也开始关注或准备添置学习用的护眼台灯&#xff0c;以缓解学习工作时的用眼疲劳&#xff0c;而相关的护眼灯也成为了市场的热门产品。而市面上护眼灯品牌众多&#xf…

CUDA从入门到放弃(四):CUDA 编程模式 CUDA Programming Model

CUDA从入门到放弃&#xff08;四&#xff09;&#xff1a;CUDA 编程模式 CUDA Programming Model 1 Kernels CUDA C 扩展了 C&#xff0c;允许定义名为内核的函数&#xff0c;这些函数可以被不同的 CUDA 线程并行执行多次&#xff0c;而不是像普通 C 函数那样只执行一次。内核…

【2024.3.26练习】画中漂流

题目描述 题目分析 根据题型分析应该可以用动态规划解决。设为第秒&#xff0c;剩余体力值为&#xff0c;且当前位置距离峡谷米时的总方案数。根据题意&#xff0c;状态转移方程如下&#xff1a; 这样定义状态的话空间复杂度为&#xff0c;大大超出了空间限制。观察转移方程左…

【SpringBoot】实现一个简单的图片上传

前端上传表单 <!DOCTYPE html> <html lang"en"> <head><meta charset"UTF-8"><title>Title</title> </head> <body> <form enctype"multipart/form-data" method"post" action&q…

拓展AI边界:去中心化人工智能的应用场景和主要项目盘点

随着区块链技术的发展和普及&#xff0c;去中心化人工智能&#xff08;AI&#xff09;逐渐成为技术领域的焦点之一。区块链的去中心化特性为AI技术的应用提供了新的可能性&#xff0c;使得数据共享、模型训练和应用部署更加安全、透明和可靠。本文将探索去中心化AI的应用场景&a…

【NLP学习记录】Embedding和EmbeddingBag

Embedding与EmbeddingBag详解 ●&#x1f368; 本文为&#x1f517;365天深度学习训练营 中的学习记录博客 ●&#x1f356; 原作者&#xff1a;K同学啊 | 接辅导、项目定制 ●&#x1f680; 文章来源&#xff1a;K同学的学习圈子1、Embedding详解 Embedding是Pytorch中最基本…

Spring实例化Bean的三种方式

参考资料&#xff1a; Core Technologies 核心技术 spring实例化bean的三种方式 构造器来实例化bean 静态工厂方法实例化bean 非静态工厂方法实例化bean_spring中有参构造器实例化-CSDN博客 1. 构造函数 1.1. 空参构造函数 下面这样表示调用空参构造函数&#xff0c;使用p…

Mysql数据库函数【Mysql】

Mysql数据库函数【Mysql】 前言版权Mysql数据库函数常用函数排序与分页排序分页 单行函数2.数值函数2.1基本函数2.2角度与弧度2.3三角函数2.4指数与对数函数2.5进制间的转换 3.字符串函数4.日期和时间函数4.1获取日期、时间4.2日期与时间戳的转换4.3获取月份、星期、星期数、天…

C语言数据流讲解

目录 4.1 流&#xff08;Stream&#xff09;&#xff1a;数据流动的隐喻 4.1.1 流&#xff1a;数据传输的通用接口 4.1.2 标准流&#xff1a;预定义的流通道 4.2 文件指针&#xff1a;流操作的桥梁 4.2.1 文件指针的本质与结构 4.2.2 使用文件指针操作流 图解 结语 在C…

AI研报:从Sora看多模态大模型发展

《从Sora看多模态大模型发展》的研报来自浙商证券&#xff0c;写于2024年2月。 这篇报告主要探讨了多模态大模型的发展趋势&#xff0c;特别是OpenAI发布的视频生成模型Sora&#xff0c;以及其对行业发展的影响。以下是报告的核心内容概述&#xff1a; Sora模型的发布&#x…

错误 C2872 “byte”: 不明确的符号,在rpcndr.h或者objidl.h

主要问题出在这里面 #include “objbase.h” qtcreator 5.12 可以直接运行 vsqt2022 msvs2017就要报错 错误 C2872 “byte”: 不明确的符号 E:\GGtie\out\build\x64-debug\GGtie C:\Program Files (x86)\Windows Kits\10\include\10.0.22621.0\um\objidl.h 13832 解决方法…

网络七层模型之物理层:理解网络通信的架构(一)

&#x1f90d; 前端开发工程师、技术日更博主、已过CET6 &#x1f368; 阿珊和她的猫_CSDN博客专家、23年度博客之星前端领域TOP1 &#x1f560; 牛客高级专题作者、打造专栏《前端面试必备》 、《2024面试高频手撕题》 &#x1f35a; 蓝桥云课签约作者、上架课程《Vue.js 和 E…

C++new与delete函数

CSDN成就一亿技术人 目录 C/C内存分布&#xff1a; 一.C内存管理方式 1.new/delete操作内置类型 2.new和delete操作自定义类型 二.operato new与operator delete函数 1.operator new与operator delete函数 三.new和delete的实现原理 1.内置类型 2.自定义类型 四…

openssl 升级1.1.1.1k 到 3.0.13

下载 https://www.openssl.org/source/ tar -zxvf openssl-3.0.13.tar.gzcd openssl-3.0.13/./config enable-fips --prefix/usr/local --openssldir/usr/local/opensslmake && make install 将原有openssl备份 mv /usr/bin/openssl /usr/bin/openssl.bak mv /usr/i…