canal五部曲-如何保证消息的顺序

news2024/11/28 12:51:33

分析CanalRocketMQProducer.send
canal发送消息到RocketMQ使用到了partitionNum、partitionHash
通过partitionHash可以把消息发送到RocketMQ的不同分区上,因为同一个分区在消费时有序的

    public void send(final MQDestination destination, String topicName, com.alibaba.otter.canal.protocol.Message message) {
        // 获取当前topic的分区数
        Integer partitionNum = MQMessageUtils.parseDynamicTopicPartition(topicName,
            destination.getDynamicTopicPartitionNum());
        if (partitionNum == null) {
            partitionNum = destination.getPartitionsNum();
        }
        if (!mqProperties.isFlatMessage()) {
            ......
        } else {
            // 并发构造
            MQMessageUtils.EntryRowData[] datas = MQMessageUtils.buildMessageData(message, buildExecutor);
            // 串行分区
            List<FlatMessage> flatMessages = MQMessageUtils.messageConverter(datas, message.getId());
            // 初始化分区合并队列
            if (destination.getPartitionHash() != null && !destination.getPartitionHash().isEmpty()) {
                List<List<FlatMessage>> partitionFlatMessages = new ArrayList<>();
                for (int i = 0; i < destination.getPartitionsNum(); i++) {
                    partitionFlatMessages.add(new ArrayList<>());
                }

                for (FlatMessage flatMessage : flatMessages) {
                    FlatMessage[] partitionFlatMessage = MQMessageUtils.messagePartition(flatMessage,
                        partitionNum,
                        destination.getPartitionHash(),
                        mqProperties.isDatabaseHash());
                    int length = partitionFlatMessage.length;
                    for (int i = 0; i < length; i++) {
                        // 增加null判断,issue #3267
                        if (partitionFlatMessage[i] != null) {
                            partitionFlatMessages.get(i).add(partitionFlatMessage[i]);
                        }
                    }
                }

                ExecutorTemplate template = new ExecutorTemplate(sendPartitionExecutor);
                for (int i = 0; i < partitionFlatMessages.size(); i++) {
                    final List<FlatMessage> flatMessagePart = partitionFlatMessages.get(i);
                    if (flatMessagePart != null && flatMessagePart.size() > 0) {
                        final int index = i;
                        template.submit(() -> {
                            List<Message> messages = flatMessagePart.stream()
                                .map(flatMessage -> new Message(topicName,
                                    ((RocketMQProducerConfig) this.mqProperties).getTag(),
                                    JSON.toJSONBytes(flatMessage, SerializerFeature.WriteMapNullValue)))
                                .collect(Collectors.toList());
                            // 批量发送
                            sendMessage(messages, index);
                        });
                    }
                }

                // 批量等所有分区的结果
                template.waitForResult();
            } else {
                final int partition = destination.getPartition() != null ? destination.getPartition() : 0;
                List<Message> messages = flatMessages.stream()
                    .map(flatMessage -> new Message(topicName,
                        ((RocketMQProducerConfig) this.mqProperties).getTag(),
                        JSON.toJSONBytes(flatMessage, SerializerFeature.WriteMapNullValue)))
                    .collect(Collectors.toList());
                // 批量发送
                sendMessage(messages, partition);
            }
        }
    }

partitionHash计算公式

Math.abs(pk.hashCode) % partitionsNum

分析计算公式实现MQMessageUtils.messagePartition

    List<String> pkNames = hashMode.pkNames;
    if (hashMode.autoPkHash) {
        pkNames = flatMessage.getPkNames();
    }

    int idx = 0;
    for (Map<String, String> row : flatMessage.getData()) {
        int hashCode = 0;
        if (databaseHash) {
            hashCode = database.hashCode();
        }
        if (pkNames != null) {
            for (String pkName : pkNames) {
                String value = row.get(pkName);
                if (value == null) {
                    value = "";
                }
                hashCode = hashCode ^ value.hashCode();
            }
        }

        int pkHash = Math.abs(hashCode) % partitionsNum;
        // math.abs可能返回负值,这里再取反,把出现负值的数据还是写到固定的分区,仍然可以保证消费顺序
        pkHash = Math.abs(pkHash);

RocketMQ client发送消息时指定了partition

    private void sendMessage(Message message, int partition) {
        try {
            SendResult sendResult = this.defaultMQProducer.send(message, (mqs, msg, arg) -> {
                if (partition >= mqs.size()) {
                    return mqs.get(partition % mqs.size());
                } else {
                    return mqs.get(partition);
                }
            }, null);

            if (logger.isDebugEnabled()) {
                logger.debug("Send Message Result: {}", sendResult);
            }
        } catch (Throwable e) {
            throw new RuntimeException(e);
        }
    }

partitionHash表达式如何配置
分析MQMessageUtils.getPartitionHashColumns,从partitionDatas中获取配置

    public static HashMode getPartitionHashColumns(String name, String pkHashConfigs) {
        if (StringUtils.isEmpty(pkHashConfigs)) {
            return null;
        }

        List<PartitionData> datas = partitionDatas.get(pkHashConfigs);
        for (PartitionData data : datas) {
            if (data.simpleName != null) {
                if (data.simpleName.equalsIgnoreCase(name)) {
                    return data.hashMode;
                }
            } else {
                if (data.regexFilter.filter(name)) {
                    return data.hashMode;
                }
            }
        }

        return null;
    }

代码中认为一个冒号后面的表达式为pkHash的表达式。使用$pk$变量名来表示取主键,当然也可以自定义表达式

Map<String, List<PartitionData>> partitionDatas = MigrateMap.makeComputingMap(CacheBuilder.newBuilder()
  .softValues(),
  pkHashConfigs -> {
      List<PartitionData> datas = Lists.newArrayList();

      String[] pkHashConfigArray = StringUtils.split(StringUtils.replace(pkHashConfigs,
          ",",
          ";"),
          ";");
      // schema.table:id^name
      for (String pkHashConfig : pkHashConfigArray) {
          PartitionData data = new PartitionData();
          int i = pkHashConfig.lastIndexOf(":");
          if (i > 0) {
              String pkStr = pkHashConfig.substring(i + 1);
              // 变量名
              if (pkStr.equalsIgnoreCase("$pk$")) {
                  data.hashMode.autoPkHash = true;
              } else {
                  //自定义表达式 val1 ^ val2 ^ val3
                  data.hashMode.pkNames = Lists.newArrayList(StringUtils.split(pkStr,
                      '^'));
              }

              pkHashConfig = pkHashConfig.substring(0,
                  i);
          } else {
              data.hashMode.tableHash = true;
          }

          if (!isWildCard(pkHashConfig)) {
              data.simpleName = pkHashConfig;
          } else {
              data.regexFilter = new AviaterRegexFilter(pkHashConfig);
          }
          datas.add(data);
      }

      return datas;
  });

所以通用的多表分区表达式如下

.*\\..*:$pk$

实践

修改rocketmq分区
在这里插入图片描述
修改instance配置
在这里插入图片描述
测试查询消息分区后的结构
在这里插入图片描述
查询rocketmq
在这里插入图片描述

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/331469.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

2020年因果推断综述《A Survey on Causal Inference》

最近阅读了TKDD2020年的《A Survey on Causal Inference》&#xff0c;传送门&#xff0c;自己对文章按照顺序做了整理&#xff0c;同时对优秀的内容进行融合&#xff0c;如有不当之处&#xff0c;请多多指教。 文章对因果推理方法进行了全面的回顾&#xff0c;根据传统因果框…

威胁情报是什么

文章目录前言一、威胁情报是什么&#xff1f;数据与情报IOC二、威胁情报的分类1.战略情报2.技术情报3.战术情报4.运营情报三、总结四、参考前言 只要有斗争冲突&#xff0c;就有那些研究、分析和努力去了解对手的人。一场战争的输赢&#xff0c;取决于你对对手的了解&#xff0…

springboot启动过程源码

概述版本<parent><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-parent</artifactId><version>2.3.3.RELEASE</version><relativePath/></parent>启动入口代码package com.ybjdw.tool;i…

如何解决混合精度训练大模型的局限性问题

混合精度已经成为训练大型深度学习模型的必要条件&#xff0c;但也带来了许多挑战。将模型参数和梯度转换为较低精度数据类型&#xff08;如FP16&#xff09;可以加快训练速度&#xff0c;但也会带来数值稳定性的问题。使用进行FP16 训练梯度更容易溢出或不足&#xff0c;导致优…

【王道数据结构】第六章(下) | 图的应用

目录 一、最小生成树 二、最短路径 三、有向⽆环图描述表达式 四、拓扑排序 五、关键路径 一、最小生成树 1、最小生成树的概念 对于一个带权连通无向图G &#xff08;V,E)&#xff0c;生成树不&#xff0c;每棵树的权(即树中所有边上的权值之和)也可能不同。设R为G的所…

【2023】Prometheus-接入Alertmanager并实现邮件告警通知

目录1.使用二进制方式安装Alertmanager2.Alertmanager配置3.alert接入prometheus4.创建告警配置文件&#xff08;在prometheus服务器&#xff09;5.测试告警1.使用二进制方式安装Alertmanager 下载安装包 wget https://github.com/prometheus/alertmanager/releases/download…

Python pip工具使用

一、pip工具 1、pip简介 pip 是一个通用的 Python包管理工具。提供了对 Python 包的查找、下载、安装、卸载的功能&#xff0c;便于我们对 Python的资源包进行管理。 在安装 Python时&#xff0c;会自动下载并且安装 pip。 &#xff08;1&#xff09;查看是否安装 pip 查看…

C/C++ :程序环境和预处理(上)

目录 程序的编译链接过程 1.编译过程中的预处理阶段 2.编译过程中的正式编译阶段 3.编译过程中的汇编阶段 4.链接过程 程序的编译链接过程 一个程序的源码文件要经过复杂的编译链接过程才能被转换为可执行的机器指令(二进制指令) 编译链接过程概述&#xff1a; 编译过程&…

java顺序存储二叉树应用实例

八大排序算法中的堆排序&#xff0c;就会使用到顺序存储二叉树。 1.线索化二叉树 1.1先看一个问题 将数列 {1, 3, 6, 8, 10, 14 } 构建成一颗二叉树. n17 问题分析: 当我们对上面的二叉树进行中序遍历时&#xff0c;数列为 {8, 3, 10, 1, 6, 14 } 但是 6, 8, 10, 14 这几个…

windows装双系统,添加ubuntu

1、查看分区 此电脑右键---管理----磁盘管理----选有空闲位置的硬盘右键----压缩卷 就会出现空闲的卷 2 制作U盘&#xff0c;U盘初始是空的 下载rufus win10系统怎么查看磁盘分区形式 【百科全说】 (bkqs.com.cn) 双击打开----如下配置 出现这个提示&#xff0c;照做 …

内存数据库Apache Derby、H2

概述 传统关系型数据库涉及大量的工作&#xff0c;如果想在Java应用程序里使用MySQL数据库&#xff0c;至少需要如下步骤&#xff1a; 安装&#xff08;可选&#xff1a;配置用户名密码&#xff09;建表&#xff08;要么从命令行进入&#xff0c;要么安装一个可视化工具&…

Java基础-网络编程

1. 网络编程入门 1.1 网络编程概述 计算机网络 是指将地理位置不同的具有独立功能的多台计算机及其外部设备&#xff0c;通过通信线路连接起来&#xff0c;在网络操作系统&#xff0c;网络管理软件及网络通信协议的管理和协调下&#xff0c;实现资源共享和信息传递的计算机系统…

Springboot扩展点之InstantiationAwareBeanPostProcessor

前言前面介绍了Springboot的扩展点之BeanPostProcessor&#xff0c;再来介绍另一个扩展点InstantiationAwareBeanPostProcessor就容易多了。因为InstantiationAwareBeanPostProcessor也属于Bean级的后置处理器&#xff0c;还继于BeanPostProcessor&#xff0c;因此Instantiatio…

vue-cli3创建Vue项目

文章目录前言一、使用vue-cli3创建项目1.检查当前vue的版本2.下载并安装Vue-cli33.使用命令行创建项目二、关于配置前言 本文讲解了如何使用vue-cli3创建属于自己的Vue项目&#xff0c;如果本文对你有所帮助请三连支持博主&#xff0c;你的支持是我更新的动力。 下面案例可供…

【C++】类与对象(上)

文章目录一、面向过程和面向对象初步认识二、类的引入三、类的定义四、类的访问限定符及封装①访问限定符②封装五、类的作用域六、类的实例化七、类对象模型①如何计算类对象大小②类对象的存储方式③结构体中内存对齐规则八、this指针①this指针的引出②this指针的特性一、面…

XCP实战系列介绍07-使用ASAP2 Editor生成A2l文件详解

本文框架 1.概述2. A2L文件编辑及生成2.1 新建项目工程2.2 加载elf文件2.3 A2L文件的项目属性配置2.4 DAQ事件的设定2.5 添加观察量2.6 添加标定量2.7 编译生成A2l1.概述 在前面一篇文章《看了就会的XCP协议介绍》中详细介绍了XCP的协议,在《XCP实战系列介绍01-测量与标定底层…

JavaScript 类继承

JavaScript 类继承 JavaScript 类继承使用 extends 关键字。 继承允许我们依据另一个类来定义一个类&#xff0c;这使得创建和维护一个应用程序变得更容易。 super() 方法用于调用父类的构造函数。 当创建一个类时&#xff0c;您不需要重新编写新的数据成员和成员函数&…

synchronized和ReentrantLock之间的区别

synchronized和ReentrantLock的区别 synchronized是一个关键字&#xff0c;是JVM内部实现的&#xff1b;ReentrantLock是标准库的一个类&#xff0c;是JVM外部基于Java实现的。synchronized在申请锁失败时会死等&#xff1b;ReentrantLock可以通过tryLock的方式等待一段时间就…

偏微分方程约束下的优化控制问题(PDE-constrained optimal control problems)

优化控制问题介绍 优化控制问题的数学形式 {min⁡(y(x),u(x))∈YUJ(y(x),u(x)),s.t. F(y(x),u(x))0in Ω,and u(x)∈Uad,\left\{\begin{aligned} &\min _{(y(\mathbf{x}), u(\mathbf{x})) \in Y \times U} J(y(\mathbf{x}), u(\mathbf{x}) ),\\ &\text { s.t. } \ \…

Python列表的元素比较

在用python处理多个列表元素时&#xff0c;需要处理的信息一般比较多且杂。这时运用Python列表元素比较的方法&#xff0c;就能快速处理列表信息&#xff0c;能更轻松访问每个元素。1 问题如何运用Python列表的元素比较解决问题。2 方法方法一 for循环&#xff0c;此情况是list…