canal五部曲-canal是如何处理insert幂等性的

news2025/1/19 20:22:09

canal使用了Rocketmq来接收mysql采集的binlog的事件,做到采集和处理的解耦。同时满足一次采集多方消费的需求。那么既然使用到Rocketmq就一定会存在MQ消费超时或是处理失败MQ重发的问题。

那么canal是如何处理MQ重复消费幂等性问题的呢

一般,在业务上我们都会为每个消息生成一个uuid来标记这条消息的唯一性。在消费时业务表增加uuid字段或是MQ唯一表来判断是否已经处理过这条消息,如果消费过了就直接回给MQ ack。
但我们定义的t_user表中并没有用于检查唯一性的uuid字段。那canal是如何做的呢。
首先从canal接收RocketMQ的代码开始分析。
canal正对消费方做了不同的adapter实现,例:RdbAdapter、ESAdapter、HbaseAdapter
我们使用的mysql数据库,直接分析RdbAdapter

    /**
     * 同步方法
     *
     * @param dmls 数据包
     */
    @Override
    public void sync(List<Dml> dmls) {
        if (dmls == null || dmls.isEmpty()) {
            return;
        }
        try {
            //rdb同步服务
            rdbSyncService.sync(mappingConfigCache, dmls, envProperties);
            rdbMirrorDbSyncService.sync(dmls);
        } catch (Exception e) {
            throw new RuntimeException(e);
        }
    }

RdbSyncService

    //dmlsPartition这里不展开分析,下文做分析
    futures.add(executorThreads[i].submit(() -> {
        try {
            //通过多线程并行执行dmlsPartition的里dml
            dmlsPartition[j].forEach(syncItem -> sync(batchExecutors[j],
                syncItem.config,
                syncItem.singleDml));
            dmlsPartition[j].clear();
            batchExecutors[j].commit();
            return true;
        } catch (Throwable e) {
            dmlsPartition[j].clear();
            batchExecutors[j].rollback();
            throw new RuntimeException(e);
        }
    }));
    /**
     * 单条 dml 同步
     *
     * @param batchExecutor 批量事务执行器
     * @param config 对应配置对象
     * @param dml DML
     */
    public void sync(BatchExecutor batchExecutor, MappingConfig config, SingleDml dml) {
        if (config != null) {
            try {
                String type = dml.getType();
                if (type != null && type.equalsIgnoreCase("INSERT")) {
                    //直接分析insert
                    insert(batchExecutor, config, dml);
                } else if (type != null && type.equalsIgnoreCase("UPDATE")) {
                    update(batchExecutor, config, dml);
                } else if (type != null && type.equalsIgnoreCase("DELETE")) {
                    delete(batchExecutor, config, dml);
                } else if (type != null && type.equalsIgnoreCase("TRUNCATE")) {
                    truncate(batchExecutor, config);
                }
                if (logger.isDebugEnabled()) {
                    logger.debug("DML: {}", JSON.toJSONString(dml, SerializerFeature.WriteMapNullValue));
                }
            } catch (SQLException e) {
                throw new RuntimeException(e);
            }
        }
    }

canal在insert时,出现主键冲突时走了SQLException。skipDupException默认是=true,直接忽略了这个异常

    /**
     * 插入操作
     *
     * @param config 配置项
     * @param dml DML数据
     */
    private void insert(BatchExecutor batchExecutor, MappingConfig config, SingleDml dml) throws SQLException {
        Map<String, Object> data = dml.getData();
        if (data == null || data.isEmpty()) {
            return;
        }

        DbMapping dbMapping = config.getDbMapping();

        Map<String, String> columnsMap = SyncUtil.getColumnsMap(dbMapping, data);

        StringBuilder insertSql = new StringBuilder();
        insertSql.append("INSERT INTO ").append(SyncUtil.getDbTableName(dbMapping)).append(" (");

        columnsMap.forEach((targetColumnName, srcColumnName) -> insertSql.append("`")
            .append(targetColumnName)
            .append("`")
            .append(","));
        int len = insertSql.length();
        insertSql.delete(len - 1, len).append(") VALUES (");
        int mapLen = columnsMap.size();
        for (int i = 0; i < mapLen; i++) {
            insertSql.append("?,");
        }
        len = insertSql.length();
        insertSql.delete(len - 1, len).append(")");

        Map<String, Integer> ctype = getTargetColumnType(batchExecutor.getConn(), config);

        List<Map<String, ?>> values = new ArrayList<>();
        for (Map.Entry<String, String> entry : columnsMap.entrySet()) {
            String targetColumnName = entry.getKey();
            String srcColumnName = entry.getValue();
            if (srcColumnName == null) {
                srcColumnName = Util.cleanColumn(targetColumnName);
            }

            Integer type = ctype.get(Util.cleanColumn(targetColumnName).toLowerCase());
            if (type == null) {
                throw new RuntimeException("Target column: " + targetColumnName + " not matched");
            }
            Object value = data.get(srcColumnName);
            BatchExecutor.setValue(values, type, value);
        }

        try {
            batchExecutor.execute(insertSql.toString(), values);
        } catch (SQLException e) {
            if (skipDupException
                && (e.getMessage().contains("Duplicate entry") || e.getMessage().startsWith("ORA-00001:"))) {
                // ignore
                // TODO 增加更多关系数据库的主键冲突的错误码
            } else {
                throw e;
            }
        }
        if (logger.isTraceEnabled()) {
            logger.trace("Insert into target table, sql: {}", insertSql);
        }

    }

结论

canal在处理MQ重复消费insert事件时,使用的是忽略的方式。当数据库存在这条数据时,数据库会返回Duplicate entry告诉canal这条数据已经在数据库里了。canal直接回复MQ ack就行了。

扩展: insert的批量插入

在源数据库中执行一条批量插入的sql,canal是怎么进行同步的。

insert into t_user (username,password,create_time,sex)
values ('1','1','2020-10-10',1) , ('1','1','2020-10-10',1);

回到canal的RdbAdapter的批量同步方法

    /**
     * 批量同步
     *
     * @param mappingConfig 配置集合
     * @param dmls 批量 DML
     */
    public void sync(Map<String, Map<String, MappingConfig>> mappingConfig, List<Dml> dmls, Properties envProperties) {
        sync(dmls, dml -> {
            if (dml.getIsDdl() != null && dml.getIsDdl() && StringUtils.isNotEmpty(dml.getSql())) {
                // DDL
            columnsTypeCache.remove(dml.getDestination() + "." + dml.getDatabase() + "." + dml.getTable());
            return false;
        } else {
            // DML
            ......

            for (MappingConfig config : configMap.values()) {
                boolean caseInsensitive = config.getDbMapping().isCaseInsensitive();
                if (config.getConcurrent()) {
                    //将批量的多个values数据转换成了一条条单个的insert
                    List<SingleDml> singleDmls = SingleDml.dml2SingleDmls(dml, caseInsensitive);
                    singleDmls.forEach(singleDml -> {
                        int hash = pkHash(config.getDbMapping(), singleDml.getData());
                        SyncItem syncItem = new SyncItem(config, singleDml);
                        dmlsPartition[hash].add(syncItem);
                    });
                } else {
                    int hash = 0;
                    List<SingleDml> singleDmls = SingleDml.dml2SingleDmls(dml, caseInsensitive);
                    singleDmls.forEach(singleDml -> {
                        SyncItem syncItem = new SyncItem(config, singleDml);
                        dmlsPartition[hash].add(syncItem);
                    });
                }
            }
            return true;
        }
    }   );
    }

在这里插入图片描述

继续分析dmlsPartition的作用

关键代码

    List<SingleDml> singleDmls = SingleDml.dml2SingleDmls(dml, caseInsensitive);
    singleDmls.forEach(singleDml -> {
        int hash = pkHash(config.getDbMapping(), singleDml.getData());
        SyncItem syncItem = new SyncItem(config, singleDml);
        dmlsPartition[hash].add(syncItem);
    });

canal将单条批量insert的sql,转换成了多条单个的insert。并将每条的主键pk和处理线程数threads做hash(pk % threads)放入不通的分区,多线程执行提高canal的处理能力。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/1130846.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

网络安全保险行业面临的挑战与变革

保险业内大多数资产类别的数据可以追溯到几个世纪以前&#xff1b;然而&#xff0c;网络安全保险业仍处于初级阶段。由于勒索软件攻击、高度复杂的黑客和昂贵的数据泄漏事件不断增加&#xff0c;许多网络安全保险提供商开始感到害怕继续承保更多业务。 保险行业 根据最近的路…

电脑每过几天就要自动更新,一键教你解决

windows更新&#xff0c;暂停更新最多只能选5个周&#xff0c;还是很麻烦&#xff0c;那么看下图&#xff0c;教你如何改为10年后&#xff1b; 建立一个txt文件&#xff0c;复制下述内容&#xff0c;然后保存&#xff0c;再把后缀名改为.reg,运行一下就可以轻松改为10年了&…

Python爬虫网易云音乐,Tkinter制作音乐播放器

目录 一、效果展示 二、环境 三、实现过程 四、源码 一、效果展示 页面的美化以及功能还有待升级~ 先来说一下已有功能吧&#xff1a; 可以在搜索框中通过歌曲或歌手名称进行搜索&#xff0c;效果和在网易云官网搜索一样。 点击开始下载&#xff0c;就会将搜索结果的第一…

小程序实现圆环进度

一&#xff1a;需求 小程序中要展示进度&#xff0c;要求类似示例图&#xff0c;用圆环形式展示进度&#xff0c;那这该如何实现呢&#xff1f;这一篇文章主要讲的就是这样一个功能。 二&#xff1a;实现 实现的大致流程是把圆环进度条封装成一个组件&#xff0c;然后在需要使…

【开源框架】Glide的图片加载流程

引入依赖 以下的所有分析都是基于此版本的Glide分析 //引入第三方库glide implementation com.github.bumptech.glide:glide:4.11.0 annotationProcessor com.github.bumptech.glide:compiler:4.11.0分析 Glide的使用就是短短的一行代码 Glide.with(this).load("xxx&q…

Dell R720服务器已有win10系统下安装Ubuntu10.04双系统

先在win10下进磁盘管理&#xff0c;分配空间 重启电脑&#xff0c;开机时按F11进入BIOS。 one-shot boot&#xff0c;选U盘 datatraveler 我原来装的是ubuntu18&#xff0c;ubuntu18升级成了ubuntu20&#xff0c;但是apt-get upgrade有很多问题&#xff0c;所以只能重装。 …

数据结构与算法之矩阵: Leetcode 134. 螺旋矩阵 (Typescript版)

螺旋矩阵 https://leetcode.cn/problems/spiral-matrix/ 描述 给你一个 m 行 n 列的矩阵 matrix &#xff0c;请按照 顺时针螺旋顺序 &#xff0c;返回矩阵中的所有元素。 示例 1 输入&#xff1a;matrix [[1,2,3],[4,5,6],[7,8,9]] 输出&#xff1a;[1,2,3,6,9,8,7,4,5]示…

第19章 Dubbo

本文中所有的原理及流程都是针对Dubbo3.0.2.1版本 19.1 谈谈你对Dubbo的理解 难度:★★★★ 重点:★★ 白话解析 1、背景:参考18.13题,这里不在赘述。 2、简介:Dubbo在3.x版本之前都只是一个高性能的RPC框架,但是在3.x版本之后,官网的描述变了,Dubbo已经升级成一个等…

ke8学校陈老师H5

目录 例一&#xff1a; 1label for与表单元素建立关联 2鼠标选中区域 3classlist属性&#xff1a;更换类选择器。添加、删除、切换和查询一个元素上的类。 4nextElementSibling属性 5title属性&#xff1a;鼠标放上去会有提示信息 6placeholder属性&#xff1a;填了就有…

Linux常用命令——cksum命令

在线Linux命令查询工具 cksum 检查文件的CRC是否正确 补充说明 cksum命令是检查文件的CRC是否正确&#xff0c;确保文件从一个系统传输到另一个系统的过程中不被损坏。这种方法要求校验和在源系统中被计算出来&#xff0c;在目的系统中又被计算一次&#xff0c;两个数字进行…

【RuoYi-Vue-Plus】学习笔记 50 - 集成 JSEncrypt 实现请求加密传输(源码)

文章目录 前言框架版本前端服务端 框架集成前端集成1、总览2、代码实现服务端集成1、总览2、代码实现2.1、配置信息 application.yml2.2、配置类 ApiDecryptProperties2.3、过滤器 CryptoFilter2.4、包装类 DecryptRequestBodyWrapper2.5、加解密工具类 EncryptUtils2.6、自动装…

Windows环境部署流媒体服务器ZLMediaKit

参考资料 快速开始 ZLMediaKit/ZLMediaKit Wiki GitHub 环境准备 序号名称版本作用下载地址1Microsoft Visual Studio链接&#xff1a;https://pan.baidu.com/s/1DoWjNZ72Y8YpGpSTY0CNKw 提取码&#xff1a;pv6a2opensslWin32/Win64 OpenSSL Installer for Windows - Shi…

聚观早报 | vivo Y100官宣;极氪001 FR将上市

【聚观365】10月25日消息 vivo Y100官宣 一极氪001 FR将上市 特斯拉加速扩张 苹果扩大招聘力度 小米澎湃OS实现历史性跨越 vivo Y100官宣 vivo Y系列是vivo存在比较久的入门系列&#xff0c;主打千元价位的线下市场&#xff0c;在消费者中有着不错的口碑。而不久前一款型…

R语言代码示例

以下是一个使用R语言和httrOAuth库的下载器程序&#xff0c;用于下载的内容。程序使用以下代码。 # 安装和加载必要的库 install.packages("httr") install.packages("httrOAuth") library(httr) library(httrOAuth) ​ # 设置 http_proxy <- "du…

10 个最佳免费 PDF 压缩工具软件

PDF 是一种全球流行的文件格式&#xff0c;可在不损失质量或文本对齐的情况下传输文档。问题是许多文件共享应用程序和网站限制您可以共享或上传的 PDF 的大小。 10 个最佳免费 PDF 压缩工具软件 在这种情况下&#xff0c;您将需要一个可以为您减小 PDF 文件大小的应用程序。P…

Kafka磁盘写满日志清理操作

最近项目组的kafka集群&#xff0c;老是由于应用端写入kafka topic的消息太多&#xff0c;导致所在的broker节点占满&#xff0c;导致其他的组件接连宕机。 这里和应用端沟通可以删除1天之前的消息来清理磁盘&#xff0c;并且可以调整topic的消息存活时间。 一、调整Topic的消…

手写 Promise(2)实例方法与静态方法的实现

一&#xff1a;什么是 Promise Promise 是异步编程的一种解决方案&#xff0c;其实是一个构造函数&#xff0c;自己身上有all、reject、resolve这几个方法&#xff0c;原型上有then、catch等方法。 Promise对象有以下两个特点。 &#xff08;1&#xff09;对象的状态不受…

[③ADRV902x]: Digital Filter Configuration(接收端)

前言 本篇博客主要总结了ADRV9029 Rx接收端链路中各个滤波器的配置。配置不同的滤波器系数以及不同的参数&#xff0c;可以对输入的数字信号灵活得做decimation处理&#xff0c;decimation信号抽取&#xff0c;就是降低信号采样率的过程。 Receiver Signal Path 下图为接收端…

macbook2024免费mac系统优化清理软件CleanMyMac X

清理电脑的操作系统可能是我们一直以来的习惯&#xff0c;从windows系统到mac系统&#xff0c;我们一直在寻求最好的清理方法&#xff0c;能够有效地清理操作系统对于电脑来说是非常重要的。今天小编想和大家一起讨论使用在macbook上的清理软件&#xff0c;清理macbook的空间可…

在React中,什么是状态(state)?如何更新组件的状态?

聚沙成塔每天进步一点点 ⭐ 专栏简介 前端入门之旅&#xff1a;探索Web开发的奇妙世界 欢迎来到前端入门之旅&#xff01;感兴趣的可以订阅本专栏哦&#xff01;这个专栏是为那些对Web开发感兴趣、刚刚踏入前端领域的朋友们量身打造的。无论你是完全的新手还是有一些基础的开发…