SpringBoot结合Canal 实现数据同步

news2024/10/6 5:53:30

1、Canal介绍

     Canal 指的是阿里巴巴开源的数据同步工具,用于数据库的实时增量数据订阅和消费。它可以针对 MySQL、MariaDB、Percona、阿里云RDS、Gtid模式下的异构数据同步等情况进行实时增量数据同步。

    当前的 canal 支持源端 MySQL 版本包括 5.1.x , 5.5.x , 5.6.x , 5.7.x , 8.0.x,Canal通过伪装成mysql从服务向主服务拉取数据。所以首先我们要了解mysql 主从数据同步。

2、Mysql 主从数据同步

1、从库(slave)会生成两个线程,I/O线程(IOthread),SQL线程(SQLthread)。

2、当slave的I/O线程连接到master后,会去请求master的二进制日志(binlog), 此时master会通过logdump(将主库的二进制日志文件内容传输给从库的过程) 给从库传输binlog。

3、 然后slave将拿到的binlog日志依次写入Relaylog(中继日志)的最末端,同时将读取到的Master 的bin-log的文件名和位置记录到master- info文件中,作用为了让slave知道它需要从哪个位置和哪 个日志文件开始同步数据,以保证数据的一致性,并且能够及时获取到master的新的更新操作, 开始数据同步过程。slave不仅在启动时读取 master-info 文件,而且会定期更新该文件中的记 录,以确保记录都是最新的。

4、最后SQL线程会读取Relaylog,并解析为具体操作(比如DDL这种),来实现主从库的操作一致。

3、Canal 原理

1、Canal Server与MySQL建立连接后,会通过模拟mysql slave的交互协议,伪装自己为mysql slave,向mysql master发送dump协议获取数据库的 binlog(二进制日志)文件。

2、Canal Server解析binlog文件,通过网络将解析后的事件传输给消息中间件(Kafka,RabbitMQ、Rocket等),实现数据的实时同步。

4、实现过程

       下载Canal,地址为https://github.com/alibaba/canal/releases,然后解压

  修改Canal的配置文件(conf/canal.properties)

指定模式
        canal.serverMode = rabbitMQ
        # 指定实例,多个实例使用逗号分隔: canal.destinations = example1,example2
        canal.destinations = example

        # rabbitmq 服务端 ip
        rabbitmq.host = 你的ip(注意不要加端口号哦)
        # rabbitmq 虚拟主机
        rabbitmq.virtual.host = /
        # rabbitmq 交换机
        rabbitmq.exchange = canal.exchange  (这是本例子用的交换机)
        # rabbitmq 用户名
        rabbitmq.username = 你的用户名
        # rabbitmq 密码
        rabbitmq.password = 你的密码
        rabbitmq.deliveryMode =

修改实例配置文件 conf/example/instance.properties。

#配置 slaveId,自定义,不等于 mysql 的 server Id 即可
        canal.instance.mysql.slaveId=10

        # 数据库地址:配置自己的ip和端口
        canal.instance.master.address=你的IP:端口号

        # 数据库用户名和密码
        canal.instance.dbUsername=用户名
        canal.instance.dbPassword=密码

        # 指定库和表
        canal.instance.filter.regex=.*\..*    # 这里的 .* 表示 canal.instance.master.address 下面的所有数据库

        # mq config
        # rabbitmq 的 routing key
        canal.mq.topic=canal.routing.key(这是本例子用的key)

然后重启 canal 服务。

2 、搭建RabbitMq并且 配置Exchange、Routing key。

3、Spring  Boot 实现代码

   引入依赖

<!--引入rabbitmq集成的依赖-->       
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-amqp</artifactId>
        </dependency>

编辑配置文件

spring:
  #rabbitmq
  rabbitmq:
    #    host: 192.168.31.189
    host: 192.168.1.12
    port: 5672
    virtual-host: /
    username: admin
    password: admin
    listener:
      simple:         #手动确认消息 ack
        acknowledge-mode: manual
        retry:
          enabled: true
          max-attempts: 3
          max-interval: 1000ms
server:
  port: 8888

   定义队列的消息接受的数据对象

@Data
public class CanalMessage<T> {
    private String type;
    private String table;
    private List<T> data;
    private String database;
    private Long es;
    private Integer id;
    private Boolean isDdl;
    private List<T> old;
    private List<String> pkNames;
    private String sql;
    private Long ts;
}

定义rabbitmq的配置,代码如下

@Configuration
@Slf4j
public class RabbitConfig {

    /**
     * 消息序列化配置
     */
    @Bean
    public RabbitListenerContainerFactory<?> rabbitListenerContainerFactory(ConnectionFactory connectionFactory) {
        // SimpleRabbitListenerContainerFactory 是 RabbitMQ 提供的一个实现了 RabbitListenerContainerFactory 接口的简单消息监听器容器工厂。
        // 它的作用是创建和配置 RabbitMQ 消息监听器容器,用于监听和处理消息。
        SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();
        //ConnectionFactory 是 RabbitMQ 提供的一个接口,用于创建 RabbitMQ 的连接
        factory.setConnectionFactory(connectionFactory);
        //使用了 Jackson2JsonMessageConverter 将消息转换为 JSON 格式进行序列化和反序列化
        factory.setMessageConverter(  new Jackson2JsonMessageConverter());
        return factory;
    }
}

消费的代码

@Component
@RabbitListener(queues = "data")
public class MqNotifyReceive {
    @RabbitHandler
    public void receive(CanalMessage CanalMessage) {
        //进行同步业务处理。。。 
        
    }
}

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/1658144.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

使用tkinter开发的一款可扫描并删除本地文件敏感词的Windows软件

大致功能&#xff1a;可指定扫描Windows上的某个目录的所有文件&#xff0c;单个文件扫描&#xff0c;目前适配支持的文件后缀有&#xff1a;"pdf"、"txt、"doc"、"docx"&#xff0c;软件是开源的&#xff0c;大家可以在此基础上扩展更多类…

什么是HTTP?

什么是HTTP&#xff1f; HTTP基本概念HTTP 是什么&#xff1f;HTTP 常见的状态码有哪些&#xff1f;HTTP 常见字段有哪些&#xff1f; HTTP特性HTTP/1.1 的优点有哪些&#xff1f;HTTP/1.1 的缺点有哪些&#xff1f; HTTP基本概念 HTTP 是什么&#xff1f; HTTP 是超文本传输…

44.乐理基础-音符的组合方式-附点

内容参考于&#xff1a; 三分钟音乐社 首先如下图&#xff0c;是之前的音符&#xff0c;但是它不全&#xff0c;比如想要一个三拍的音符改怎样表示&#xff1f; 在简谱中三拍&#xff0c;在以四分音符为一拍的情况下&#xff0c;在后面加两根横线就可以了&#xff0c;称为附点…

C++:重载、重写与重定义

一、重载、重写与重定义的概念 C中&#xff0c;重载、重写和重定义是三个与函数和类成员相关的概念&#xff0c;但它们具有不同的含义和用途。 重载&#xff1a;是指在同一作用域内&#xff0c;可以有多个名称相同但参数列表&#xff08;参数类型、参数个数或参数顺序&#x…

Web3加密空投入门:空投类型有哪些?如何避免限制?

今天分享空投如何避免限制以提高效率&#xff0c;增加成功几率&#xff0c;首先我们来了解什么是空投加密&#xff0c;有哪些空投类型。 一、什么是空投加密&#xff1f; 加密货币空投是一种营销策略&#xff0c;包括向用户的钱包地址发送免费的硬币或代币。 加密货币项目使用…

【算法练级js+java】旋转字符串判断是否相等

每一天一道算法题训练&#xff0c;努力打开编程思维&#xff0c;才能进大厂光明正大的泡心仪的小姐姐&#xff01;&#xff01;(手动捂脸) 题目 /** * 给定字符创A和B * 旋转字符串A,就是把最左边的移动到最右边 * 比如A‘abcde’,在移动一次之后结果就是bcdea * 如果若干次之…

揭秘新时代的内容创作:一键生成的AI黑科技

在数字媒体的浪潮下&#xff0c;内容创作已成为连接人与信息的重要桥梁。然而&#xff0c;头条、公众号等平台上的爆文创作&#xff0c;对很多内容创作者来说却是一项挑战。“从选题到找素材&#xff0c;再到成文&#xff0c;”这个过程不仅耗时至少1到2个小时&#xff0c;而且…

word图片水印

一、word中旧水印如何删除 打开word模板&#xff0c;想要删除旧水印&#xff0c;如下图所示操作&#xff0c;但是旧水印删除不掉。 以为上传新水印图片会替换掉旧水印&#xff0c;结果显示了2个水印&#xff0c;要怎么删除呢&#xff1f; 如下截图所示&#xff0c;双击打开页…

如何在CentOS上解决Python版本冲突和路径问题

在使用CentOS等Linux系统时&#xff0c;安装多个Python版本可能会导致版本冲突和路径问题。当你运行python3命令时&#xff0c;系统可能不会调用你期望的Python版本&#xff0c;这可能会导致运行错误或者其他依赖问题。下面是一篇详细的博客&#xff0c;介绍如何解决这种Python…

微信小程序发布,推广等步骤

发布小程序 一.首先写好小程序后在微信开发者工具的右上角找到上传并点击 2.填写本次开发版本的版本号和备注信息&#xff0c;点击上传 3.登录微信小程序管理后台 微信公众平台 在管理处找到版本管理 4.在版本管理页面可以看到刚刚提交的小程序已经上传到这里&#xff0c;点击…

如何在两台电脑之间共享文件(超详细步骤)

所需物品&#xff1a; 两台以上电脑一个路由器 步骤1&#xff1a; 将两台电脑用网线或无线网连接至同一路由器下 只有在同一路由器下才能形成局域网&#xff0c;从而才能正常共享文件。例如通常打印机和电脑也需要保持在同一路由器下才能正常打印文件。 步骤2&#xff1a;设置…

【全开源】Java洗衣清洁服务同城清洗服务小程序源码

特色功能&#xff1a; 在线预约与支付&#xff1a;用户可以通过洗衣小程序在线预约洗衣服务&#xff0c;并选择支付方式进行支付&#xff0c;如微信支付、支付宝等。这种在线预约和支付的方式极大地方便了用户&#xff0c;提高了服务的便捷性。智能推荐与选择&#xff1a;根据…

Web 安全基础理论

Web 安全基础理论 培训、环境、资料、考证 公众号&#xff1a;Geek极安云科 网络安全群&#xff1a;624032112 网络系统管理群&#xff1a;223627079 网络建设与运维群&#xff1a;870959784 移动应用开发群&#xff1a;548238632 短视频制作群&#xff1a; 744125867极安云…

python编程“常识”【pip安装路径、计算、pycharm中的terminal运行前面的PS修改成自己环境】

一、默认的pip install包路径&#xff1a; pip show pip 二、计算 打开cmd&#xff0c;输入&#xff1a; ipython 例如你要计算2的13次方&#xff1a; ok. 三、pycharm中的terminal运行前面的PS修改成自己环境 未修改前&#xff1a; 修改过程&#xff1a; 打开设置找到too…

安卓模拟器访问主机局域网

误打误撞能够访问主机局域网了 但是不太懂是因为哪一部分成功的 先记录一下 PC&#xff1a;mac系统 安卓编译器&#xff1a;Android Studio 步骤 只需要在PC上进行设置 1. 在【设置】中&#xff0c;打开已连接的Wi-Fi的【详细信息】 2. TCP/IP --> 配置IPv6&#xff0c;修…

STM32--4G DTU 及 阿里云

模块概述 ATK-IDM750C/IDM751C 是正点原子(ALIENTEK)团队开发的一款高性能 4G Cat1 DTU 产品&#xff0c; 支持移动 4G、联通 4G 和电信 4G 手机卡。它以高速率、低延迟和无线数传作为核心功能&#xff0c; 可快速解决应用场景下的无线数传方案。 它支持 TCP/UDP/HTTP/MQTT/DN…

电脑windows系统压缩解压软件-Bandizip

一、软件功能 Bandizip是一款功能强大的压缩和解压缩软件&#xff0c;具有快速拖放、高速压缩、多核心支持以及广泛的文件格式支持等特点。 Bandizip软件的功能主要包括&#xff1a; 1. 支持多种文件格式 Bandizip可以处理多种压缩文件格式&#xff0c;包括ZIP, 7Z, RAR, A…

2024-05-09 Ubuntu上面用ffmpeg把jpeg图像转成yuv、rgb格式文件,通过ffplay命令显示这些文件成图像

一、安装 FFmpeg: 如果你的Ubuntu系统中没有安装 FFmpeg,可以通过以下命令来安装: sudo apt update sudo apt install ffmpeg 二、测试原图,cowboy_girl_1024X1280.jpeg,分辨率是1024X1280. 三、使用 ffmpeg -pix_fmts 命令查看ffmpeg支持的格式 ffmpeg -pix_fmts 四、使…

如何预防最新的Mallox变种rmallox勒索病毒感染您的计算机?

导言&#xff1a; 在数字化浪潮中&#xff0c;网络安全如履薄冰。新兴的.rmallox勒索病毒&#xff0c;以其独特的攻击方式和狡猾的战术&#xff0c;给全球网络安全带来了前所未有的挑战。本文将深入剖析.rmallox勒索病毒的战术&#xff0c;并提出一系列创新的防御策略&#xf…

[uniapp 地图组件] 小坑:translateMarker的回调函数,会调用2次

大概率是因为旋转和移动是两个动画&#xff0c;动画结束后都会分别调用此函数 即使你配置了 【不旋转】它还是会调用两次&#xff0c; 所以此处应该是官方的bug