106 基于消息队列来做 mysql 大数据表数据的遍历处理

news2025/2/25 17:55:45

前言

最近有这样的一个需求, 我们存在一张 很大的 mysql 数据表, 数据量大概是在 六百万左右 

然后 需要获取所有的记录, 将数据传输到 es 中 

然后 当时 我就写了一个脚本来读取 这张大表, 然后 分页获取数据, 然后 按页进行数据处理 转换到 es 

但是存在的问题是, 前面 还效率还可以, 但是 约到后面, 大概是到 三百多页, 的时候 从 mysql 读取数据 已经快不行了 

十分耗时, 这里就是 记录这个问题的 另外的处理方式 

我这里的处理是基于 消息中间件, 从 mysql 通过 datax/spoon 传输数据到 kafka 很快 

然后  java 程序从 kafka 中消费队列的数据 也很快, 最终 六百万的数据 读取 + 处理 合计差不多是 一个多小时完成, 其中处理 有一部分地方 业务上面比较耗时 

 

 

待处理的数据表

待处理的数据表如下, 里面合计 600w 的数据 

CREATE TABLE `student_all` (
  `id` int NOT NULL AUTO_INCREMENT,
  `field0` varchar(128) COLLATE utf8mb4_general_ci NOT NULL,
  `field1` varchar(128) COLLATE utf8mb4_general_ci NOT NULL,
  `field2` varchar(128) COLLATE utf8mb4_general_ci NOT NULL,
  `field3` varchar(128) COLLATE utf8mb4_general_ci NOT NULL,
  `field4` varchar(128) COLLATE utf8mb4_general_ci NOT NULL,
  `field5` varchar(128) COLLATE utf8mb4_general_ci NOT NULL,
  `field6` varchar(128) COLLATE utf8mb4_general_ci NOT NULL,
  `field7` varchar(128) COLLATE utf8mb4_general_ci NOT NULL,
  `field8` varchar(128) COLLATE utf8mb4_general_ci NOT NULL,
  `field9` varchar(128) COLLATE utf8mb4_general_ci NOT NULL,
  `field10` varchar(128) COLLATE utf8mb4_general_ci NOT NULL,
  `field11` varchar(128) COLLATE utf8mb4_general_ci NOT NULL,
  `field12` varchar(128) COLLATE utf8mb4_general_ci NOT NULL,
  `field13` varchar(128) COLLATE utf8mb4_general_ci NOT NULL,
  `field14` varchar(128) COLLATE utf8mb4_general_ci NOT NULL,
  `field15` varchar(128) COLLATE utf8mb4_general_ci NOT NULL,
  `field16` varchar(128) COLLATE utf8mb4_general_ci NOT NULL,
  `field17` varchar(128) COLLATE utf8mb4_general_ci NOT NULL,
  `field18` varchar(128) COLLATE utf8mb4_general_ci NOT NULL,
  `field19` varchar(128) COLLATE utf8mb4_general_ci NOT NULL,
  `field20` varchar(128) COLLATE utf8mb4_general_ci NOT NULL,
  `field21` varchar(128) COLLATE utf8mb4_general_ci NOT NULL,
  `field22` varchar(128) COLLATE utf8mb4_general_ci NOT NULL,
  `field23` varchar(128) COLLATE utf8mb4_general_ci NOT NULL,
  `field24` varchar(128) COLLATE utf8mb4_general_ci NOT NULL,
  `field25` varchar(128) COLLATE utf8mb4_general_ci NOT NULL,
  `field26` varchar(128) COLLATE utf8mb4_general_ci NOT NULL,
  `field27` varchar(128) COLLATE utf8mb4_general_ci NOT NULL,
  `field28` varchar(128) COLLATE utf8mb4_general_ci NOT NULL,
  `field29` varchar(128) COLLATE utf8mb4_general_ci NOT NULL,
  `CREATED_AT` bigint NOT NULL,
  `UPDATED_AT` bigint NOT NULL,
  PRIMARY KEY (`id`)
) ENGINE=InnoDB AUTO_INCREMENT=4379001 DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_general_ci

 

 

基于 mysql 的数据分页处理

基于 mysql 的处理程序如下, 就是一个简单的 mysql 分页 

然后将需要提取的数据封装, 然后 批量提交给 es 

总的情况来说是 前面的一部分页是可以 很快的响应数据, 但是 越到后面, mysql 服务器越慢 

/**
 * Test05PostQy2Es
 *
 * @author Jerry.X.He
 * @version 1.0
 * @date 2022/11/21 16:00
 */
public class Test05PostEsFromMysql {

    private static String mysqlUrl = "jdbc:mysql://127.0.0.1:3306/test?useUnicode=true&characterEncoding=UTF-8&autoReconnect=true&autoReconnectForPools=true";
    private static String mysqlUsername = "postgres";
    private static String mysqlPassword = "postgres";
    private static JdbcTemplate mysqlJdbcTemplate = JdbcTemplateUtils.getJdbcTemplate(mysqlUrl, mysqlUsername, mysqlPassword);

    private static RestHighLevelClient esClient = getEsClient();
    private static IndicesClient indicesClient = esClient.indices();

    // Test05PostQy2Es
    public static void main(String[] args) throws Exception {

        String esIndexName = "student_all_20221211";
        bulkEsData(esIndexName);

    }

    private static void bulkEsData(String esIndexName) throws Exception {
        String queryDbTableName = "student_all";
        List<String> fieldList = Arrays.asList("id", "field0", "field1", "field2", "field3", "field4", "field5", "field6", "field7", "field8", "field9", "field10", "field11", "field12", "field13", "field14", "field15", "field16", "field17", "field18", "field19", "field20", "field21", "field22", "field23", "field24", "field25", "field26", "field27", "field28", "field29", "CREATED_AT", "UPDATED_AT");

        String idKey = "id";
        String whereCond = "";
//        String orderBy = "order by id asc";
        String orderBy = "";
        AtomicInteger counter = new AtomicInteger(0);
        int pageSize = 1000;
        int startPage = 0;
        pageDo(queryDbTableName, whereCond, orderBy, pageSize, startPage, (pageNo, list) -> {
            BulkRequest bulkRequest = new BulkRequest();
            for (Map<String, Object> entity : list) {
                IndexRequest indexRequest = new IndexRequest(esIndexName);
                Map<String, Object> sourceMap = new LinkedHashMap<>();
                List<String> allFieldsListed = new ArrayList<>();
                for (String fieldName : fieldList) {
                    String fieldValue = String.valueOf(entity.get(fieldName));
                    sourceMap.put(fieldName, fieldValue);
                    allFieldsListed.add(Objects.toString(fieldValue, ""));
                }
                String id = String.valueOf(entity.get(idKey));
                indexRequest.id(id);
                sourceMap.put("_allFields", StringUtils.join(allFieldsListed, "$$"));

                indexRequest.source(sourceMap);
                bulkRequest.add(indexRequest);
            }

            try {
                BulkResponse bulkResponse = esClient.bulk(bulkRequest, RequestOptions.DEFAULT);
                counter.addAndGet(list.size());
            } catch (Exception e) {
                e.printStackTrace();
            }
            System.out.println(" page : " + pageNo + ", flushed " + counter.get() + " records ");
        });
    }

    private static void pageDo(String tableName, String whereCond, String orderBy, int pageSize, int startPage,
                               BiConsumer<Integer, List<Map<String, Object>>> func) {
        if (StringUtils.isNotBlank(whereCond) && (!whereCond.trim().toLowerCase().startsWith("where"))) {
            whereCond = " where " + whereCond;
        }
        if (StringUtils.isNotBlank(orderBy) && (!orderBy.trim().toLowerCase().startsWith("order"))) {
            orderBy = " order by " + orderBy;
        }

        String queryCountSql = String.format(" select count(*) from %s %s %s", tableName, whereCond, orderBy);
        Integer totalCount = mysqlJdbcTemplate.queryForObject(queryCountSql, Integer.class);
        Integer totalPage = (totalCount == null || totalCount == 0) ? 0 : (totalCount - 1) / pageSize + 1;
        for (int i = startPage; i < totalPage; i++) {
            int offset = i * pageSize;
            String queryPageSql = String.format(" select * from %s %s %s limit %s,%s ", tableName, whereCond, orderBy, offset, pageSize);
            List<Map<String, Object>> list = mysqlJdbcTemplate.queryForList(queryPageSql);
            func.accept(i, list);
        }
    }

}

 

 

基于中间件 kafka 的处理

首先通过 spoon/datax 将数据从 mysql 转换到 kafka 

然后 再由脚本从 kafka 消费数据, 处理 传输到 es 中 

入了一次 消息队列之后, 然后程序 再来消费, 就会快很多了, 消息队列本身功能比较单纯 比较适合于做做顺序遍历 就会有优势一些 

 

这里以 spoon 将数据从 mysql 转换到 kafka 

我这里 本地环境 内存等什么的都不足, 因此是 一分钟 入库三万条, 但是 实际生产环境 会很快 

在生产环境 五百多w 的数据, 基于 datax 传输 mysql 到 kafka, 差不多是 五六分钟 就可以了 

e3cb2b641cfe4d208e11040f1b5fbc2a.png

 

 

基于 kafka 将数据传输到 es 

如下程序 仅仅是将 kafka 中的数据 原样照搬过去了, 但是 实际的场景 中会做一些 额外的业务处理, 这里仅仅是为了 演示 

/**
 * Test05PostQy2Es
 *
 * @author Jerry.X.He
 * @version 1.0
 * @date 2022/11/21 16:00
 */
public class Test05PostEsFromKafka {

    private static RestHighLevelClient esClient = getEsClient();
    private static IndicesClient indicesClient = esClient.indices();
    private static String esIndexName = "student_all_20221211";
    private static String groupId = "group-01";

    // Test05PostQy2Es
    public static void main(String[] args) throws Exception {

        bulkKafka2EsData(esIndexName, groupId);

    }

    private static void bulkKafka2EsData(String esIndexName, String groupId) throws Exception {
        List<Pair<String, String>> hjk2StdFieldMap = hjk2StdFieldMap();
        Properties properties = kafkaProperties(groupId);

        String idKey = "ID";
        KafkaConsumer<String, String> kafkaConsumer = new KafkaConsumer<>(properties);
        kafkaConsumer.subscribe(Arrays.asList("STUDENT_ALL_20221211"));
        AtomicInteger counter = new AtomicInteger(0);
        long start = System.currentTimeMillis();
        while (true) {
            ConsumerRecords<String, String> records = kafkaConsumer.poll(100);
            if (records.isEmpty()) {
                Thread.sleep(10 * 1000);
                long spent = System.currentTimeMillis() - start;
                System.out.println(" spent : " + (spent / 1000) + " s ");
                continue;
            }

            BulkRequest bulkRequest = new BulkRequest();
            boolean isEmpty = true;
            for (ConsumerRecord<String, String> record : records) {
                IndexRequest indexRequest = new IndexRequest(esIndexName);
                String value = record.value();
                JSONObject entity = JSON.parseObject(value);

                // 获取 id
                String id = StringUtils.defaultIfBlank(entity.getString(idKey), "");
                if (isFilterByQy(id)) {
                    continue;
                }

                Map<String, Object> sourceMap = new LinkedHashMap<>();
                List<String> allFieldsListed = new ArrayList<>();
                for (Pair<String, String> entry : hjk2StdFieldMap) {
                    String hjkKey = entry.getKey(), stdKey = entry.getValue();
                    String fieldValue = StringUtils.defaultIfBlank(entity.getString(hjkKey), "");
                    sourceMap.put(stdKey, fieldValue);
                    allFieldsListed.add(Objects.toString(fieldValue, ""));
                }
                indexRequest.id(id);
                sourceMap.put("_allFields", StringUtils.join(allFieldsListed, "$$"));

                isEmpty = false;
                indexRequest.source(sourceMap);
                bulkRequest.add(indexRequest);
            }
            if (isEmpty) {
                continue;
            }

            try {
                BulkResponse bulkResponse = esClient.bulk(bulkRequest, RequestOptions.DEFAULT);
                counter.addAndGet(bulkRequest.requests().size());
            } catch (Exception e) {
                e.printStackTrace();
            }
            System.out.println(" flushed " + counter.get() + " records ");
        }

    }

    private static List<Pair<String, String>> hjk2StdFieldMap() {
        List<Pair<String, String>> hjk2StdFieldMap = new ArrayList<>();
        hjk2StdFieldMap.add(new ImmutablePair<>("id", "id"));
        hjk2StdFieldMap.add(new ImmutablePair<>("CREATED_AT", "CREATED_AT"));
        hjk2StdFieldMap.add(new ImmutablePair<>("UPDATED_AT", "UPDATED_AT"));
        for (int i = 0; i < Test05CreateMysqlBigTable.maxFieldIdx; i++) {
            String fieldName = String.format("field%s", i);
            hjk2StdFieldMap.add(new ImmutablePair<>(fieldName, fieldName));
        }
        return hjk2StdFieldMap;
    }

    private static Properties kafkaProperties(String groupId) {
        Properties properties = new Properties();
        properties.put("bootstrap.servers", "192.168.0.190:9092");
        properties.put("group.id", groupId);
        properties.put("enable.auto.commit", "true");
        properties.put("auto.commit.interval.ms", "1000");
        properties.put("auto.offset.reset", "earliest");
        properties.put("session.timeout.ms", "30000");
        properties.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
        properties.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
        properties.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
        properties.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
        return properties;
    }

    private static boolean isFilterByQy(String qy) {
        if (StringUtils.isBlank(qy)) {
            return true;
        }

        return false;
    }

}

 

 

spoon 安装 kakfa 插件

来自 Kettle安装Kafka Consumer和Kafka Producer插件

    1.从github上下载kettle的kafka插件,地址如下
    Kafka Consumer地址:
    https://github.com/RuckusWirelessIL/pentaho-kafka-consumer/releases/tag/v1.7
    Kafka Producer地址:
    https://github.com/RuckusWirelessIL/pentaho-kafka-producer/releases/tag/v1.9
    2.进入 kettle 安装目录:在plugin目录下创建steps目录
    3.把下载的插件解压后放到 steps 目录下
    5.重启 spoon.bat 即可

 

 

 

 

参考

Kettle安装Kafka Consumer和Kafka Producer插件

 

 

 

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/1528850.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

CTF题型 SSTI(2) Flask-SSTI典型题巩固

CTF题型 SSTI(2) Flask-SSTI典型题巩固 文章目录 CTF题型 SSTI(2) Flask-SSTI典型题巩固前记1.klf__sstiSSTI_Fuzz字典&#xff08;网上收集自己补充&#xff09; 2.klf_2数字问题如何解决了&#xff1f;|count |length都被禁&#xff1f; 3.klf_3 前记 从基础到自己构造paylo…

java String的深入了解

1、String 概述 &#xff08;1&#xff09;String 类在 java.lang 包下&#xff0c;所以使用的时候不需要导包。 &#xff08;2&#xff09;String 类代表字符串&#xff0c;Java程序中的所有字符串文字&#xff08;例如“abc”&#xff09;都被实现为此类的实例。也就是说&a…

【漏洞复现】用友U8Cloud base64 SQL注入漏洞

0x01 产品简介 用友U8 Cloud是用友推出的新一代云ERP&#xff0c;主要聚焦成长型、创新型企业&#xff0c;提供企业级云ERP整体解决方案。 0x02 漏洞概述 用友U8 Cloud 存在SQL注入漏洞&#xff0c;未授权的攻击者可通过此漏洞获取数据库权限&#xff0c;从而盗取用户数据&a…

Leetcode 48. 旋转图像

心路历程&#xff1a; 第一次需要这种类型的题。 一开始从双指针、递归、栈队的角度去思考问题&#xff0c;没有发现明显的特征。 后来想到这个算是二维数组问题&#xff0c;应该也是双指针的一种。 总感觉有什么妙招可以一下子解决&#xff0c;但是没想出来就去找的网上的答案…

Android 之 GMS 认证知多少?

GMS认证 1.什么是GMS GMS全称Google Mobile Service&#xff0c;谷歌移动服务。 为什么要通过GMS认证 Android 系统是开源的&#xff0c;但是 Google 针对GMS所提供的服务却是收费的&#xff0c;比如Google Map&#xff0c;Google Play&#xff0c;Youtube&#xff0c;Gmai…

Stable Diffusion + Segment Anything试用

安装 从continue-revolution/sd-webui-segment-anything安装插件分割模型下载后放到这个位置&#xff1a;${sd-webui}/extension/sd-webui-segment-anything/models/sam下&#xff0c;可以下载3个不同大小的模型&#xff0c;从大到小如下&#xff1a;vit_h is 2.56GB, vit_l i…

嵌入式汇编语言简介

嵌入式汇编语言是一种在嵌入式系统开发中广泛使用的编程语言&#xff0c;它直接操作底层硬件资源&#xff0c;具有高效性和灵活性。本文将介绍嵌入式汇编语言的基本概念、特点以及应用场景。 以下是我整理的关于嵌入式开发的一些入门级资料&#xff0c;免费分享给大家&#xf…

零售饮料企业通过精准铺货与动态调整,结合指标平台的智能分析,实现对线下渠道的全面掌控

作为一名消费者&#xff0c;炎热的夏天我们会走进一家便利店&#xff0c;从冰柜中选出一瓶汽水&#xff1b;下午工作有点累了&#xff0c;我们会在公司的自动贩卖机扫码买一瓶快乐水......零售品牌从线上到线下渠道都开展了激烈的竞争&#xff0c;从供应链、物流到销售环节&…

转座子插入序列分析1-GENE-IS分析管道

如果你使用 GENE-IS: Saira Afzal et al。 &#xff0c;2016请引用这篇研究文章。GENE-IS: time-efficient and accurate analysis of viral integration events in large-scale gene therapy data. Molecular Therapy - Nucleic Acids 2016, vol. 6:133-139. DOI:https://doi.…

规划系列的常见术语:龙格现象、控制点、型值点和插值点、规划控制的开环、闭环、前馈、反馈与重规划

参考b站大佬Ally的规划控制系列 1 龙格现象 1.1 初探龙格现象 龙格现象由德国数学家Carl Runge&#xff08;卡尔龙格&#xff09;于1901年发现&#xff0c;龙格函数定义为&#xff1a; f ( x ) 1 25 x 2 1 f(x)\frac{1}{25 x^{2}1} f(x)25x211​ 我们在 [ − 1 , 1 ] [-1…

MNN createRuntime(二)

系列文章目录 MNN createFromBuffer&#xff08;一&#xff09; MNN createRuntime&#xff08;二&#xff09; MNN createSession 之 Schedule&#xff08;三&#xff09; MNN createSession 之创建流水线后端&#xff08;四&#xff09; MNN Session::resize 之流水线编码&am…

后端开发要不要转鸿蒙?

看到一条很有意思的提问&#xff0c;互联网太卷了&#xff0c;熬过了2023才发现&#xff0c;2024更难熬&#xff01;只因行业发展多年&#xff0c;人才过度饱和&#xff01;那后端的出路在哪里&#xff1f; 我推荐大家学【鸿蒙应用开发】新兴行业需求大&#xff0c;各大厂都在…

AI大模型智能大气科学探索之:ChatGPT在大气科学领域建模、数据分析、可视化与资源评估中的高效应用及论文写作

本文深度探讨人工智能在大气科学中的应用&#xff0c;特别是如何结合最新AI模型与Python技术处理和分析气候数据。课程介绍包括GPT-4等先进AI工具&#xff0c;旨在帮助大家掌握这些工具的功能及应用范围。本文内容覆盖使用GPT处理数据、生成论文摘要、文献综述、技术方法分析等…

camunda流程引擎事务管理和乐观锁

本文重点介绍camunda开源流程引擎的事务配置&#xff0c;以及在高并发多线程情况下&#xff0c;可能会发生多个线程尝试对相同流程实例数据进行更改的情况&#xff0c;Camunda如何通过数据库的乐观锁解决这种并发冲突的&#xff0c;并介绍了乐观锁和悲观锁的适用场景、性能影响…

MySQL数据库概念及安装

一、数据库的基本概率 1.1 数据 记录每个人的信息 或者记录数据 1.2 表 存放信息的集合 或者存放行和列的信息 1.3 数据库 表的集合 二、数据库管理系统&#xff08;DBMS&#xff09; 2.1 DBMS定义 &#xff08;DBMS&#xff09;是一种软件&#xff0c;用于创建和管理…

【Python脚本随手笔记】 --- 复制文件并修改权限

&#x1f48c; 所属专栏&#xff1a;【Python脚本随手笔记】 &#x1f600; 作  者&#xff1a;我是夜阑的狗&#x1f436; &#x1f680; 个人简介&#xff1a;一个正在努力学技术的CV工程师&#xff0c;专注基础和实战分享 &#xff0c;欢迎咨询&#xff01; &#…

HarmonyOS 通知意图

之前的文章 我们讲了 harmonyos 中的 基础和进度条通知 那么 今天 我们来说说 任何给通知添加意图 通知意图 简单说 就是 当我们点击某个通知 如下图 然后 就会拉起某个 应用 就例如说 我们某个微信好友发消息给我们 我们 点击系统通知 可以直接跳到你们的聊天界面 好 回到…

Vue+SpringBoot打造民宿预定管理系统

目录 一、摘要1.1 项目介绍1.2 项目录屏 二、功能模块2.1 用例设计2.2 功能设计2.2.1 租客角色2.2.2 房主角色2.2.3 系统管理员角色 三、系统展示四、核心代码4.1 查询民宿4.2 新增民宿4.3 新增民宿评价4.4 查询留言4.5 新增民宿订单 五、免责说明 一、摘要 1.1 项目介绍 基于…

VsCode中高效书写Vue3代码的插件

Vue-Official&#xff08;原Volar&#xff09; 就是原先的Volar&#xff0c;现已弃用。 Vue-Official 提供的功能&#xff1a; 语法高亮: Vue-Official 扩展可以为 Vue 单文件组件&#xff08;.vue 文件&#xff09;中的 HTML、CSS 和 JavaScript 部分提供语法高亮&#xff…

linux系统------------Mysql数据库

目录 一、数据库基本概念 1.1数据(Data) 1.2表 1.3数据库 1.4数据库管理系统(DBMS) 数据库管理系统DBMS原理 1.5数据库系统&#xff08;DBS) 二、数据库发展史 1、第一代数据库 2、第二代数据库 3、第三代数据库 三、关系型数据库 3.1关系型数据库应用 3.2主流的…