SpringCloud 整合 Canal+RabbitMQ+Redis 实现数据监听

news2024/11/20 6:16:19

1Canal介绍

Canal 指的是阿里巴巴开源的数据同步工具,用于数据库的实时增量数据订阅和消费。它可以针对 MySQL、MariaDB、Percona、阿里云RDS、Gtid模式下的异构数据同步等情况进行实时增量数据同步。

当前的 canal 支持源端 MySQL 版本包括 5.1.x , 5.5.x , 5.6.x , 5.7.x , 8.0.x

Canal是如何同步数据库数据的呢?

Canal通过伪装成mysql从服务向主服务拉取数据,所以先来了解一下MySQL的主从复制吧

2MySQL主从复制原理

1、从库(slave)会生成两个线程,I/O线程(IOthread),SQL线程(SQLthread)。

2、当slave的I/O线程连接到master后,会去请求master的二进制日志(binlog), 此时master会通过logdump(将主库的二进制日志文件内容传输给从库的过程) 给从库传输binlog。

3、 然后slave将拿到的binlog日志依次写入Relaylog(中继日志)的最末端,同时将读取到的Master 的bin-log的文件名和位置记录到master- info文件中,作用为了让slave知道它需要从哪个位置和哪 个日志文件开始同步数据,以保证数据的一致性,并且能够及时获取到master的新的更新操作, 开始数据同步过程。slave不仅在启动时读取 master-info 文件,而且会定期更新该文件中的记 录,以确保记录都是最新的。

4、最后SQL线程会读取Relaylog,并解析为具体操作(比如DDL这种),来实现主从库的操作一致, 最终实现数据一致;

大致了解完了MySQL的主从复制,接着我们看Canal就简单啦。

3Canal工作原理

1、Canal Server与MySQL建立连接后,会通过模拟mysql slave的交互协议,伪装自己为mysql slave,向mysql master发送dump协议获取数据库的 binlog(二进制日志)文件。

2、Canal Server解析binlog文件,通过网络将解析后的事件传输给消息中间件(Kafka,RabbitMQ等),实现数据的实时同步。

了解完canal的原理后,我们就正式开始RabbitMQ+Canal+Redis实现缓存和数据库数据一致的功能。

4RabbitMQ+Canal+redis工作原理

通过上图很好理解:

  • APP向数据库进行写操作(比如我们更新商品信息啥的)

  • Canal监听到数据库发生变化,便会向rabbitMQ传递数据库发生变化的消息。

  • 消费者就可以从rabbitMQ获取这些消息,然后进行删除缓存操作。

下面通过实战让我们更好地理解是如何实现缓存和数据库数据一致性的。

5实战配置

Canal 配置

修改 conf/canal.properties 配置

# 指定模式
canal.serverMode = rabbitMQ
# 指定实例,多个实例使用逗号分隔: canal.destinations = example1,example2
canal.destinations = example 

# rabbitmq 服务端 ip
rabbitmq.host = 你的ip(注意不要加端口号哦)
# rabbitmq 虚拟主机 
rabbitmq.virtual.host = / 
# rabbitmq 交换机  
rabbitmq.exchange = canal.exchange  (这是本例子用的交换机)
# rabbitmq 用户名
rabbitmq.username = 你的用户名
# rabbitmq 密码
rabbitmq.password = 你的密码
rabbitmq.deliveryMode =

修改实例配置文件 conf/example/instance.properties

#配置 slaveId,自定义,不等于 mysql 的 server Id 即可
canal.instance.mysql.slaveId=10 

# 数据库地址:配置自己的ip和端口
canal.instance.master.address=你的IP:端口号
 
# 数据库用户名和密码 
canal.instance.dbUsername=用户名
canal.instance.dbPassword=密码
 
# 指定库和表
canal.instance.filter.regex=.*\..*    # 这里的 .* 表示 canal.instance.master.address 下面的所有数据库
  
# mq config
# rabbitmq 的 routing key
canal.mq.topic=canal.routing.key(这是本例子用的key)

然后重启 canal 服务。

RabbitMQ 配置

这样rabbitMQ就配置完啦,下面就是实战代码啦。

6实战代码

CanalMessage: Canal传来的消息

@NoArgsConstructor
@Data
public class CanalMessage<T> {
    private String type;
    private String table;
    private List<T> data;
    private String database;
    private Long es;
    private Integer id;
    private Boolean isDdl;
    private List<T> old;
    private List<String> pkNames;
    private String sql;
    private Long ts;
}

RabbitMQ配置类

@Configuration
@Slf4j
public class RabbitConfig {

    /**
     * 消息序列化配置
     */
    @Bean
    public RabbitListenerContainerFactory<?> rabbitListenerContainerFactory(ConnectionFactory connectionFactory) {
        // SimpleRabbitListenerContainerFactory 是 RabbitMQ 提供的一个实现了 RabbitListenerContainerFactory 接口的简单消息监听器容器工厂。
        // 它的作用是创建和配置 RabbitMQ 消息监听器容器,用于监听和处理消息。
        SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();
        //ConnectionFactory 是 RabbitMQ 提供的一个接口,用于创建 RabbitMQ 的连接
        factory.setConnectionFactory(connectionFactory);
        //使用了 Jackson2JsonMessageConverter 将消息转换为 JSON 格式进行序列化和反序列化
        factory.setMessageConverter(  new Jackson2JsonMessageConverter());
        return factory;
    }
}

将消息转换为JSON格式,才能映射到CanalMessage上。

RabbitMQ+Canal监听处理类

@Component
@Slf4j
@RequiredArgsConstructor
public class CanalListener {

    private final SysMenuService menuService;

    //@RabbitListener(queues = "canal.queue")
    public void handleDataChange(@Payload CanalMessage message) {
        String tableName = message.getTable();

        log.info("Canal 监听 {} 发生变化;明细:{}", tableName, message);
        if (Arrays.asList("sys_menu", "sys_role", "sys_role_menu").contains(tableName)) {
            log.info("======== 清理菜单路由缓存 ========");
            menuService.cleanCache();
        }
    }
}

menuService的cleanCache()是把登录时的路由列表缓存清除掉,

具体可去源码查看,在最底下。

这样我们实现缓存和数据库数据一致性的功能就完成啦,接下来测试一下。

7测试

我们直接通过手动修改数据库来完成测试。

图片

我们在菜单表修改菜单管理的内容改成菜单管理1,点击保存

图片

可以看到更新操作已经被监听到啦。接着就完成清理缓存操作咯,然后就可以防止缓存和数据库数据不一致的问题啦。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/1337835.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

Python in Visual Studio Code 2023年12月发布

作者&#xff1a;Courtney Webster 排版&#xff1a;Alan Wang 我们很高兴地宣布 Visual Studio Code 的 Python 和 Jupyter 扩展将于 2023 年 12 月发布&#xff01; 此版本包括以下公告&#xff1a; 可配置的调试选项已添加到“运行”按钮菜单可以使用 Pylance 显示类型层次…

声明 | 为打击假冒账号、恶意抄袭账号等诈骗活动,提升本账号权威,本博主特此郑重声明

声明 | 为打击假冒账号、恶意抄袭账号诈骗活动&#xff0c;提升本账号权威&#xff0c;本博主特此郑重声明 一、本账号为《机器学习之心》博主CSDN唯一官方账号&#xff0c;唯一联系方式见文章底部。 二、《机器学习之心》博主未授权任何第三方账号进行模型合作、程序设计、源…

odoo17核心概念view7——listview总体框架分析

这是view系列的第七篇文章&#xff0c;今天主要介绍我们最常用的list视图。 1、先看list_view,这是主文件 /** odoo-module */import { registry } from "web/core/registry"; import { RelationalModel } from "web/model/relational_model/relational_mode…

2022年山东省职业院校技能大赛高职组云计算赛项试卷第二场-容器云

2022年山东省职业院校技能大赛高职组云计算赛项试卷 目录 【赛程名称】云计算赛项第二场-容器云 需要竞赛软件包以及资料可以私信博主&#xff01; 【赛程名称】云计算赛项第二场-容器云 【赛程时间】2022-11-27 09:00:00至2022-11-27 16:00:00 说明&#xff1a;完成本任务…

Python 使用fake_useragent生成随机User-Agent

大多数情况下&#xff0c;需要设置请求头。而在请求头中&#xff0c;随机更换User-Agent可以避免触发相应的反爬机制。使用第三方库fake-useragent便可轻松生成随机User-Agent。 安装使用 以下简单介绍fake-useragent的安装使用&#xff0c;以及可能出现的问题和解决方法。 安…

数据库 基础面试第一弹

1. SQL语句类型 1. DDL&#xff08;Data Definition Language&#xff0c;数据定义语言&#xff09;&#xff1a; DDL语句用于定义数据库对象&#xff08;如表、索引、视图等&#xff09;。常见的DDL语句包括&#xff1a; CREATE&#xff1a;用于创建数据库对象&#xff0c;如…

Nacos2.1.2改造适配达梦数据库7.0

出于业务需求&#xff0c;现将Nacos改造适配达梦数据库7.0&#xff0c;记录本次改造过程。 文章目录 一、前期准备二、适配流程1、项目初始化2、引入驱动3、源码修改 三、启动测试四、打包测试 一、前期准备 Nacos源码&#xff0c;版本&#xff1a;2.1.2&#xff1a;源码下载…

使用poi将pptx文件转为图片详解

目录 项目需求 后端接口实现 1、引入poi依赖 2、代码编写 1、controller 2、service层 测试出现的bug 小结 项目需求 前端需要上传pptx文件&#xff0c;后端保存为图片&#xff0c;并将图片地址保存数据库&#xff0c;最后大屏展示时显示之前上传的pptx的图片。需求看上…

Servlet见解2

4 创建servlet的三种方式 4.1 实现Servlet接口的方式 import javax.servlet.*; import javax.servlet.annotation.WebServlet; import java.io.IOException;WebServlet("/test1") public class Servlet1 implements Servlet {Overridepublic void init(ServletConf…

alertmanage调用企业微信告警(k8s内部署)

一、前言 alertmanage调用企业微信应用告警会比直接使用钉钉告警更麻烦一点&#xff0c;调用企业微信应用告警需要在应用内配置企业可信ip&#xff0c;不然调用企业微信接口就会报错&#xff0c;提示ip地址有风险 二、部署 先自行创建企业微信&#xff0c;再使用管理后台创建应…

视频遥测终端机的设计需求

目录 1.目的 2.参考文件 3.总体描述 4.硬件资源描述 4.1微控制单元 4.2视频处理单元 4.3性能指标 5.功能要求 5.1系统参数要求 5.1.1系统管理 5.1.2系统配置 5.1.2.1一般参数 5.1.2.2编码参数 5.1.2.3网络参数 5.1.2.4网络服务 5.1.2.5OSD参数 5.1.2.6抓拍 5.…

MYSQL一一约束

概述&#xff1a; ①概念约束是作用于表中字段的规则&#xff0c;用于限制存储在表中的数据 ②目的&#xff1a;保证数据库中的数据的正确性&#xff0c;有效性和完整性 ③分类&#xff1a; 注意&#xff1a;约束是作用于表中字段上的&#xff0c;可以在创建表/修改表的时候…

【C语言】指针详解(四)

目录 1.assert断言 2.指针的使用和传址调用 2.1strlen的模拟使用 2.2传值调用和传址调用 1.assert断言 assert.h头文件定义了宏 assert()&#xff0c;用于在运行时确保程序符合指定条件&#xff0c;如果不符合&#xff0c;就报错终止运行。这个宏常常被称为“断言”。 例如…

主流级显卡的新选择,Sparkle(撼与科技)Intel Arc A750兽人体验分享

▼前言 对于玩家而言&#xff0c;英特尔独显的出现不仅打破了NVIDIA与AMD双雄天下的局面&#xff0c;而且旗下的Arc A系列显卡还拥有不俗的做工性能以及颇具优势的价格&#xff0c;无论是升级或者是装新机都非常合适。如果要在Arc A系列当中选一个性能不俗&#xff0c;能够满足…

从AMI镜像恢复AWS Amazon Linux 2实例碰到的VNC服务以及Chrome浏览器无法启动的问题

文章目录 小结问题及解决VNC服务无法启动Chrome浏览器无法启动 参考 小结 将Amazon Linux 2保存为AMI (Amazon Machine Images)后&#xff0c;恢复成EC2 Instance (实例)后&#xff0c;VNC服务以及Chrome浏览器无法启动&#xff0c;进行了解决。 问题及解决 如果要将一个EC2…

算法训练第四十八天|198. 打家劫舍、213. 打家劫舍 II、337. 打家劫舍 III

198. 打家劫舍&#xff1a; 题目链接 你是一个专业的小偷&#xff0c;计划偷窃沿街的房屋。每间房内都藏有一定的现金&#xff0c;影响你偷窃的唯一制约因素就是相邻的房屋装有相互连通的防盗系统&#xff0c;如果两间相邻的房屋在同一晚上被小偷闯入&#xff0c;系统会自动报…

LSTM中文新闻分类源码详解

LSTM中文新闻分类 一、导包二、读取数据三、数据预处理1.分词、去掉停用词和数字、字母转换成小写等2.新闻文本标签数值化 三、创建词汇表/词典1.data.Field()2.空格切分等3.构建词汇表/词典使用训练集构建单词表&#xff0c;vectorsNone:没有使用预训练好的词向量,而是使用的是…

PyTorch深度学习实战(27)——变分自编码器(Variational Autoencoder, VAE)

PyTorch深度学习实战&#xff08;27&#xff09;——变分自编码器 0. 前言1. 变分自编码器1.1 自编码器的局限性1.2 VAE 工作原理1.3 VAE 构建策略1.4 KL 散度1.5 重参数化技巧 2. 构建 VAE小结系列链接 0. 前言 变分自编码器 (Variational Autoencoder, VAE) 是一种生成模型&…

华为ipv6配置之ospf案例

R1 ipv6 ospfv3 1 router-id 1.1.1.1 //必须要手动配置ospf id&#xff0c;它不会自动生成 interface GigabitEthernet0/0/0 ipv6 enable ipv6 address 2000::2/96 ospfv3 1 area 0.0.0.0 interface LoopBack0 ipv6 enable ipv6 address 2001::1/96 ospfv3 1 area 0.0.0.0 R2…

腾讯云4核8G服务器三年优惠价格表

腾讯云轻量服务器4核8G12M有三年优惠价吗&#xff1f;有&#xff0c;但是不怎么优势&#xff0c;相对于云轻量2核2G4M带宽三年价格是540元、2核4G5M带宽3年优惠价756元&#xff0c;4核8G12M轻量应用服务器三年价格是5292元&#xff0c;怎么样&#xff1f;还想买吗&#xff1f;阿…