Canal安装使用

news2025/2/26 7:22:36

一 Canal介绍

canal是阿里巴巴旗下的一款开源项目,纯Java开发。基于数据库增量日志解析,提供增量数据订阅&消费,目前主要支持了MySQL(也支持mariaDB)。

背景
早期,阿里巴巴B2B公司因为存在杭州和美国双机房部署,存在跨机房同步的业务需求。不过早期的数据库同步业务,主要是基于trigger的方式获取增量变更,不过从2010年开始,阿里系公司开始逐步的尝试基于数据库的日志解析,获取增量变更进行同步,由此衍生出了增量订阅&消费的业务,从此开启了一段新纪元。ps. 目前内部使用的同步,已经支持mysql5.x和oracle部分版本的日志解析了;

工作原理
在这里插入图片描述
原理相对比较简单:
canal模拟mysql slave的交互协议,伪装自己为mysql slave,向mysql master发送dump协议,mysql master收到dump请求,开始推送binary log给slave(也就是canal),canal解析binary log对象(原始为byte流);
在这里插入图片描述

二 linux下安装配置启动Canal

下载
首先,下载canal安装包:canal下载地址
在这里插入图片描述
安装步骤
1、创建一个canal文件夹,上传压缩包并解压
2、修改canal配置文件

vi conf/example/instance.properties

在这里插入图片描述
3、修改mysql配置

vi /etc/my.cnf 
log-bin=mysql-bin             #添加这一行就ok 
binlog-format=ROW           #选择row模式 
server-id=1       #配置mysql replaction需要定义,不能和canal的slaveI重复
binlog-do-db=micromall

4、执行mysql 创建canal用户

create user canal identified by 'canal';

GRANT SELECT, REPLICATION SLAVE, REPLICATION CLIENT ON *.* TO 'canal'@'%';

FLUSH PRIVILEGES;

查看是否授权成功:

select * from user  where  user='canal'

canal的工作原理

  1. canal 模拟 MySQL slave 的交互协议,伪装自己为 MySQL slave ,向 MySQL master 发送dump 协议
  2. MySQL master 收到 dump 请求,开始推送 binary log 给 slave (即 canal );
  3. canal 解析 binary log 对象(原始为 byte 流);

5、启动canal

cd bin
./startup.sh

三 测试Canal

逻辑:
mysql开启主从同步
启动Canal,拿到mysql的binlog日志
消息中间件获取binlog日志
添加依赖

<dependency>
    <groupId>com.alibaba.otter</groupId>
    <artifactId>canal.client</artifactId>
    <version>1.1.0</version>
</dependency>
import java.net.InetSocketAddress;
import java.util.List;
import com.alibaba.otter.canal.client.CanalConnectors;
import com.alibaba.otter.canal.client.CanalConnector;
import com.alibaba.otter.canal.common.utils.AddressUtils;
import com.alibaba.otter.canal.protocol.Message;
import com.alibaba.otter.canal.protocol.CanalEntry.Column;
import com.alibaba.otter.canal.protocol.CanalEntry.Entry;
import com.alibaba.otter.canal.protocol.CanalEntry.EntryType;
import com.alibaba.otter.canal.protocol.CanalEntry.EventType;
import com.alibaba.otter.canal.protocol.CanalEntry.RowChange;
import com.alibaba.otter.canal.protocol.CanalEntry.RowData;


public class SimpleCanalClientExample {


    public static void main(String args[]) {
        // 创建链接
        CanalConnector connector = CanalConnectors.newSingleConnector(new InetSocketAddress("127.0.0.1",
                11111), "example", "", "");
        int batchSize = 1000;
        int emptyCount = 0;
        try {
            connector.connect();
            connector.subscribe(".*\\..*");
            connector.rollback();
            int totalEmptyCount = 120;
            while (emptyCount < totalEmptyCount) {
                Message message = connector.getWithoutAck(batchSize); // 获取指定数量的数据
                long batchId = message.getId();
                int size = message.getEntries().size();
                if (batchId == -1 || size == 0) {
                    emptyCount++;
                    System.out.println("empty count : " + emptyCount);
                    try {
                        Thread.sleep(1000);
                    } catch (InterruptedException e) {
                    }
                } else {
                    emptyCount = 0;
                    // System.out.printf("message[batchId=%s,size=%s] \n", batchId, size);
                    printEntry(message.getEntries());
                }

                connector.ack(batchId); // 提交确认
                // connector.rollback(batchId); // 处理失败, 回滚数据
            }

            System.out.println("empty too many times, exit");
        } finally {
            connector.disconnect();
        }
    }

    private static void printEntry(List<Entry> entrys) {
        for (Entry entry : entrys) {
            if (entry.getEntryType() == EntryType.TRANSACTIONBEGIN || entry.getEntryType() == EntryType.TRANSACTIONEND) {
                continue;
            }

            RowChange rowChage = null;
            try {
                rowChage = RowChange.parseFrom(entry.getStoreValue());
            } catch (Exception e) {
                throw new RuntimeException("ERROR ## parser of eromanga-event has an error , data:" + entry.toString(),
                        e);
            }

            EventType eventType = rowChage.getEventType();
            System.out.println(String.format("================&gt; binlog[%s:%s] , name[%s,%s] , eventType : %s",
                    entry.getHeader().getLogfileName(), entry.getHeader().getLogfileOffset(),
                    entry.getHeader().getSchemaName(), entry.getHeader().getTableName(),
                    eventType));

            for (RowData rowData : rowChage.getRowDatasList()) {
                if (eventType == EventType.DELETE) {
                    printColumn(rowData.getBeforeColumnsList());
                } else if (eventType == EventType.INSERT) {
                    printColumn(rowData.getAfterColumnsList());
                } else {
                    System.out.println("-------&gt; before");
                    printColumn(rowData.getBeforeColumnsList());
                    System.out.println("-------&gt; after");
                    printColumn(rowData.getAfterColumnsList());
                }
            }
        }
    }

    private static void printColumn(List<Column> columns) {
        for (Column column : columns) {
            System.out.println(column.getName() + " : " + column.getValue() + "    update=" + column.getUpdated());
        }
    }
}

修改数据库的值,发现日志里有新的输出
在这里插入图片描述

四 接入中间件

目的:修改mysql数据后,把消息同步到RocketMQ

修改instance 配置文件

vi conf/example/instance.properties
# mq config
canal.mq.topic=example
# dynamic topic route by schema or table regex
#canal.mq.dynamicTopic=mytest1.user,mytest2\\..*,.*\\..*
canal.mq.partition=0
# hash partition config
#canal.mq.partitionsNum=3
#canal.mq.partitionHash=test.table:id^name,.*\\..*
#################################################

instance.properties完整版

#################################################
## mysql serverId , v1.0.26+ will autoGen
# canal.instance.mysql.slaveId=0

# enable gtid use true/false
canal.instance.gtidon=false

# position info
#canal.instance.master.address=192.168.65.232:3306
canal.instance.master.address=192.168.65.232:3306
canal.instance.master.journal.name=
canal.instance.master.position=
canal.instance.master.timestamp=
canal.instance.master.gtid=

# rds oss binlog
canal.instance.rds.accesskey=
canal.instance.rds.secretkey=
canal.instance.rds.instanceId=

# table meta tsdb info
canal.instance.tsdb.enable=true
#canal.instance.tsdb.url=jdbc:mysql://127.0.0.1:3306/canal_tsdb
#canal.instance.tsdb.dbUsername=canal
#canal.instance.tsdb.dbPassword=canal

#canal.instance.standby.address =
#canal.instance.standby.journal.name =
#canal.instance.standby.position =
#canal.instance.standby.timestamp =
#canal.instance.standby.gtid=

# username/password
canal.instance.dbUsername=root
canal.instance.dbPassword=root
canal.instance.connectionCharset = UTF-8
canal.instance.defaultDatabaseName=micromall
# enable druid Decrypt database password
canal.instance.enableDruid=false
#canal.instance.pwdPublicKey=MFwwDQYJKoZIhvcNAQEBBQADSwAwSAJBALK4BUxdDltRRE5/zXpVEVPUgunvscYFtEip3pmLlhrWpacX7y7GCMo2/JM6LeHmiiNdH1FWgGCpUfircSwlWKUCAwEAAQ==

# table regex
#canal.instance.filter.regex=.*\\..*
canal.instance.filter.regex=micromall.pms_product,micromall.sms_flash_promotion_product_relation
# table black regex
canal.instance.filter.black.regex=

# mq config
canal.mq.topic=productDetailChange
# dynamic topic route by schema or table regex
#canal.mq.dynamicTopic=mytest1.user,mytest2\\..*,.*\\..*
canal.mq.partition=0
# hash partition config
#canal.mq.partitionsNum=3
#canal.mq.partitionHash=test.table:id^name,.*\\..*
#################################################

修改canal 配置文件

vi /usr/local/canal/conf/canal.properties
##################################################
#########                    MQ     #############
##################################################
# kafka/rocketmq 集群配置: 192.168.1.117:9092,192.168.1.118:9092,192.168.1.119:9092
canal.mq.servers = 192.168.1.150:9876
canal.mq.retries = 0
canal.mq.batchSize = 16384
canal.mq.maxRequestSize = 1048576
canal.mq.lingerMs = 1
canal.mq.bufferMemory = 33554432
#消息生产组名
canal.mq.producerGroup = Canal-Producer
# Canal的batch size, 默认50K, 由于kafka最大消息体限制请勿超过1M(900K以下)
canal.mq.canalBatchSize = 30
# Canal get数据的超时时间, 单位: 毫秒, 空为不限超时
canal.mq.canalGetTimeout = 100
# 是否为flat json格式对象
canal.mq.flatMessage = true
canal.mq.compressionType = none
canal.mq.acks = all
# use transaction for kafka flatMessage batch produce
canal.mq.transaction = false
#canal.mq.properties. =

canal.properties完整版

#################################################
######### 		common argument		############# 
#################################################
#canal.manager.jdbc.url=jdbc:mysql://127.0.0.1:3306/canal_manager?useUnicode=true&characterEncoding=UTF-8
#canal.manager.jdbc.username=root
#canal.manager.jdbc.password=121212
canal.id = 1
canal.ip =
canal.port = 11111
canal.metrics.pull.port = 11112
canal.zkServers =
# flush data to zk
canal.zookeeper.flush.period = 1000
canal.withoutNetty = false
# tcp, kafka, RocketMQ
canal.serverMode = RocketMQ
# flush meta cursor/parse position to file
canal.file.data.dir = ${canal.conf.dir}
canal.file.flush.period = 1000
## memory store RingBuffer size, should be Math.pow(2,n)
canal.instance.memory.buffer.size = 16384
## memory store RingBuffer used memory unit size , default 1kb
canal.instance.memory.buffer.memunit = 1024 
## meory store gets mode used MEMSIZE or ITEMSIZE
canal.instance.memory.batch.mode = MEMSIZE
canal.instance.memory.rawEntry = true

## detecing config
canal.instance.detecting.enable = false
#canal.instance.detecting.sql = insert into retl.xdual values(1,now()) on duplicate key update x=now()
canal.instance.detecting.sql = select 1
canal.instance.detecting.interval.time = 3
canal.instance.detecting.retry.threshold = 3
canal.instance.detecting.heartbeatHaEnable = false

# support maximum transaction size, more than the size of the transaction will be cut into multiple transactions delivery
canal.instance.transaction.size =  1024
# mysql fallback connected to new master should fallback times
canal.instance.fallbackIntervalInSeconds = 60

# network config
canal.instance.network.receiveBufferSize = 16384
canal.instance.network.sendBufferSize = 16384
canal.instance.network.soTimeout = 30

# binlog filter config
canal.instance.filter.druid.ddl = true
canal.instance.filter.query.dcl = false
canal.instance.filter.query.dml = false
canal.instance.filter.query.ddl = false
canal.instance.filter.table.error = false
canal.instance.filter.rows = false
canal.instance.filter.transaction.entry = false

# binlog format/image check
canal.instance.binlog.format = ROW,STATEMENT,MIXED 
canal.instance.binlog.image = FULL,MINIMAL,NOBLOB

# binlog ddl isolation
canal.instance.get.ddl.isolation = false

# parallel parser config
canal.instance.parser.parallel = true
## concurrent thread number, default 60% available processors, suggest not to exceed Runtime.getRuntime().availableProcessors()
#canal.instance.parser.parallelThreadSize = 16
## disruptor ringbuffer size, must be power of 2
canal.instance.parser.parallelBufferSize = 256

# table meta tsdb info
canal.instance.tsdb.enable = true
canal.instance.tsdb.dir = ${canal.file.data.dir:../conf}/${canal.instance.destination:}
canal.instance.tsdb.url = jdbc:h2:${canal.instance.tsdb.dir}/h2;CACHE_SIZE=1000;MODE=MYSQL;
canal.instance.tsdb.dbUsername = canal
canal.instance.tsdb.dbPassword = canal
# dump snapshot interval, default 24 hour
canal.instance.tsdb.snapshot.interval = 24
# purge snapshot expire , default 360 hour(15 days)
canal.instance.tsdb.snapshot.expire = 360

# aliyun ak/sk , support rds/mq
canal.aliyun.accessKey =
canal.aliyun.secretKey =

#################################################
######### 		destinations		############# 
#################################################
canal.destinations = example
# conf root dir
canal.conf.dir = ../conf
# auto scan instance dir add/remove and start/stop instance
canal.auto.scan = true
canal.auto.scan.interval = 5

#canal.instance.tsdb.spring.xml = classpath:spring/tsdb/h2-tsdb.xml
#canal.instance.tsdb.spring.xml = classpath:spring/tsdb/mysql-tsdb.xml

canal.instance.global.mode = spring
canal.instance.global.lazy = false
#canal.instance.global.manager.address = 127.0.0.1:1099
#canal.instance.global.spring.xml = classpath:spring/memory-instance.xml
canal.instance.global.spring.xml = classpath:spring/file-instance.xml
#canal.instance.global.spring.xml = classpath:spring/default-instance.xml

##################################################
######### 		     MQ 		     #############
##################################################
##################################################
#########                    MQ     #############
##################################################
# kafka/rocketmq 集群配置: 192.168.1.117:9092,192.168.1.118:9092,192.168.1.119:9092
canal.mq.servers = 127.0.0.1:9876
canal.mq.retries = 0
canal.mq.batchSize = 16384
canal.mq.maxRequestSize = 1048576
canal.mq.lingerMs = 1
canal.mq.bufferMemory = 33554432
#消息生产组名
canal.mq.producerGroup = canalProducer
# Canal的batch size, 默认50K, 由于kafka最大消息体限制请勿超过1M(900K以下)
canal.mq.canalBatchSize = 30
# Canal get数据的超时时间, 单位: 毫秒, 空为不限超时
canal.mq.canalGetTimeout = 100
# 是否为flat json格式对象
canal.mq.flatMessage = true
canal.mq.compressionType = none
canal.mq.acks = all
# use transaction for kafka flatMessage batch produce
canal.mq.transaction = false
#canal.mq.properties. =

参数说明:
在这里插入图片描述

五 Canal内部原理

在这里插入图片描述

  • server代表一个canal运行实例,对应于一个jvm
  • instance对应于一个数据队列 (1个server对应1…n个instance)

instance模块:

  • eventParser (数据源接入,模拟slave协议和master进行交互,协议解析)
  • eventSink (Parser和Store链接器,进行数据过滤,加工,分发的工作)
  • eventStore (数据存储)
  • metaManager (增量订阅&消费信息管理器)
mysql> show variables like 'binlog_format';
    +---------------+-------+
    | Variable_name | Value |
    +---------------+-------+
    | binlog_format | ROW   |
    +---------------+-------+
    1 row in set (0.00 sec)

解析开始:
com.alibaba.otter.canal.parse.inbound.AbstractEventParser#start

六 Canal集群高可用

canal的ha分为两部分,canal server和canal client分别有对应的ha实现

canal server: 为了减少对mysql dump的请求,不同server上的instance要求同一时间只能有一个处于running,其他的处于standby状态.

canal client: 为了保证有序性,一份instance同一时间只能由一个canal client进行get/ack/rollback操作,否则客户端接收无法保证有序。

整个HA机制的控制主要是依赖了zookeeper的几个特性,watcher和EPHEMERAL节点(和session生命周期绑定)。
在这里插入图片描述
大致步骤:

  1. canal server要启动某个canal instance时都先向zookeeper进行一次尝试启动判断 (实现:创建EPHEMERAL节点,谁创建成功就允许谁启动)
  2. 创建zookeeper节点成功后,对应的canal server就启动对应的canal instance,没有创建成功的canal instance就会处于standby状态
  3. 一旦zookeeper发现canal server A创建的节点消失后,立即通知其他的canal server再次进行步骤1的操作,重新选出一个canal server启动instance.
  4. canal client每次进行connect时,会首先向zookeeper询问当前是谁启动了canal instance,然后和其建立链接,一旦链接不可用,会重新尝试connect.

Canal Client的方式和canal server方式类似,也是利用zookeeper的抢占EPHEMERAL节点的方式进行控制.

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/1504845.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

MyBatis3源码深度解析(七)JDBC单连接事务

文章目录 前言2.7 JDBC单连接事务2.7.1 事务的开启与提交2.7.2 事务隔离级别2.7.2.1 并发访问问题&#xff08;1&#xff09;脏读&#xff08;2&#xff09;不可重复读&#xff08;3&#xff09;幻读 2.7.2.2 事务隔离级别&#xff08;1&#xff09;TRANSACTION_NONE&#xff1…

JVM内存问题排错最佳实践

写在文章开头 上一篇的文章中分享了一个入门级别的调优实践,收到很多读者的好评,所以笔者今天再次分享一个进阶一点的案例,希望对近期在面试的读者对于JVM这一块的实践经验有所帮助。 Hi,我是 sharkChili ,是个不断在硬核技术上作死的 java coder ,是 CSDN的博客专家 ,…

光伏数字化管理平台:驱动绿色能源革命的智能化引擎

随着全球对可再生能源需求的不断增长&#xff0c;光伏产业已经成为推动绿色能源革命的重要力量。在这个背景下&#xff0c;光伏数字化管理平台应运而生&#xff0c;以其强大的数据处理、实时监控和智能优化功能&#xff0c;为光伏电站的运营管理和维护带来了革命性的变革。 光伏…

有源电桥电路

有源电桥电路 有源电桥由A3运放的正向输入端与负向输入端电压相等且为零可知&#xff1a;G点&#xff08;待测阻抗Zx与被测阻抗Rs的连接点&#xff09;电平一直为零&#xff0c;也就是平衡点虚地点&#xff0c;Ux与Us也就变成参照虚地点的绝对相量电压。并且根据运放的虚断原理…

7-16 计算符号函数的值

对于任一整数n&#xff0c;符号函数sign(n)的定义如下&#xff1a; 请编写程序计算该函数对任一输入整数的值。 输入格式: 输入在一行中给出整数n。 输出格式: 在一行中按照格式“sign(n) 函数值”输出该整数n对应的函数值。 输入样例1: 10输出样例1: sign(10) 1输入样…

unity学习(54)——选择角色界面--解析赋值服务器返回的信息1

1.decode这种照猫画虎的工作 把逆向出来UserHandler.cs中的内容&#xff0c;融到自建客户端的MessageManager.cs中&#xff1a; 2.此时登录账号&#xff0c;马上显示当前账号下已有三名角色&#xff1a; 此时返回数据包中的command的值是1&#xff1a; 3.当注册玩家数超过三名…

springboot255基于spring boot的疫情信息管理系统

疫情信息管理系统的设计与实现 摘要 近年来&#xff0c;信息化管理行业的不断兴起&#xff0c;使得人们的日常生活越来越离不开计算机和互联网技术。首先&#xff0c;根据收集到的用户需求分析&#xff0c;对设计系统有一个初步的认识与了解&#xff0c;确定疫情信息管理系统…

人工智能|机器学习——DBSCAN聚类算法(密度聚类)

1.算法简介 DBSCAN(Density-Based Spatial Clustering of Applications with Noise)是一种基于密度的聚类算法&#xff0c;簇集的划定完全由样本的聚集程度决定。聚集程度不足以构成簇落的那些样本视为噪声点&#xff0c;因此DBSCAN聚类的方式也可以用于异常点的检测。 2.算法原…

Xss-labs-master 1-16关

第一关 <?php ini_set("display_errors", 0); $str $_GET["name"]; echo "<h2 aligncenter>欢迎用户".$str."</h2>"; ?> <center><img srclevel1.png></center> <?php echo "&l…

【NR技术】 3GPP支持无人机服务的关键性能指标

1 性能指标概述 5G系统传输的数据包括安装在无人机上的硬件设备(如摄像头)收集的数据&#xff0c;例如图片、视频和文件。也可以传输一些软件计算或统计数据&#xff0c;例如无人机管理数据。5G系统传输的业务控制数据可基于应用触发&#xff0c;如无人机上设备的开关、旋转、升…

实时智能应答3D数字人搭建

语音驱动口型的算法 先看效果&#xff1a; 你很快就可以帮得上我了 FACEGOOD 决定将语音驱动口型的算法技术正式开源&#xff0c;这是 AI 虚拟数字人的核心算法&#xff0c;技术开源后将大程度降低 AI 数字人的开发门槛。FACEGOOD是一家国际领先的3D基础软件开发商&#xff0c;…

【sgExcelGrid】自定义组件:简单模拟Excel表格拖拽、选中单元格、横行、纵列、拖拽圈选等操作

特性&#xff1a; 可以自定义拖拽过表格可以点击某个表格&#xff0c;拖拽右下角小正方形进行任意方向选取单元格支持选中某一行、列支持监听selectedGrids、selectedDatas事件获取选中项的DOM对象和数据数组支持props自定义显示label字段别名 sgExcelGrid源码 <template&g…

MySQL主从读写分离之Proxysql(openEuler版)

实验目的&#xff1a; 基于proxysql实现MySQL的主从读写分离。 实验过程&#xff1a; 前期准备&#xff1a; 一共有四台虚拟机&#xff0c;其中三台为配置好的一主两从虚拟机&#xff0c;还有一台干净的虚拟机用来配置proxysql。 主机名地址master192.168.27.137node1192.…

如何在Windows系统使用固定tcp公网地址ssh远程Kali系统

文章目录 1. 启动kali ssh 服务2. kali 安装cpolar 内网穿透3. 配置kali ssh公网地址4. 远程连接5. 固定连接SSH公网地址6. SSH固定地址连接测试 简单几步通过[cpolar 内网穿透](cpolar官网-安全的内网穿透工具 | 无需公网ip | 远程访问 | 搭建网站)软件实现ssh 远程连接kali! …

【机器学习300问】30、准确率的局限性在哪里?

一、什么是准确率&#xff1f; 在解答这个问题之前&#xff0c;我们首先得先回顾一下准确率的定义&#xff0c;准确率是机器学习分类问题中一个很直观的指标&#xff0c;它告诉我们模型正确预测的比例&#xff0c;即 还是用我最喜欢的方式&#xff0c;举例子来解释一下&#xf…

业务随行简介

定义 业务随行是一种不管用户身处何地、使用哪个IP地址&#xff0c;都可以保证该用户获得相同的网络访问策略的解决方案。 背景 在企业网络中&#xff0c;为实现用户不同的网络访问需求&#xff0c;可以在接入设备上为用户部署不同的网络访问策略。在传统园区网络中&#xf…

文本生成视频:从 Write-a-video到 Sora

2024年2月15日&#xff0c;OpenAI 推出了其最新的文本生成视频模型——Sora。Sora 能够根据用户的指令生成一分钟长度的高质量视频内容。这一创新的发布迅速在社会各界引发了广泛关注与深入讨论。本文将围绕本实验室发表于SIGGRAPH AISA 的 Write-a-video和 Sora 展开&#xff…

修改简化docker命令

修改|简化docker命令 使用命令打开 .bashrc 文件&#xff1a; vim ~/.bashrc在文件中添加类似以下行来创建别名&#xff1a; # 查看所有容器 alias disdocker images # 查看运行容器 alias dpsdocker ps # 查看所有容器 alias dpsadocker ps -a # 停止容器 alias dsdocker s…

python 蓝桥杯之动态规划入门

文章目录 DFS滑行&#xff08;DFS 记忆搜索&#xff09; 思路&#xff1a; 要思考回溯怎么写&#xff08;入参与返回值、递归到哪里&#xff0c;递归的边界和入口&#xff09; DFS 滑行&#xff08;DFS 记忆搜索&#xff09; 代码分析&#xff1a; 学会将输入的数据用二维列表…

云原生架构设计:分布式消息队列技术解析

消息队列是在消息传输过程中保存消息的容器&#xff0c;消息队列管理器在将消息从源到目标时充当中间人的角色&#xff0c;消息队列的主要目的是提供路由并保证消息的可靠传递。如果发送消息时接收者不可用&#xff0c;那消息队列就会保留消息&#xff0c;直到下次成功消费为止…