Canal对MySQL进行数据迁移

news2025/1/11 4:05:25

Canal简单介绍

贴个官方网址:阿里巴巴MySQL binlog 增量订阅&消费组件

架构图:
基于日志增量订阅和消费的业务包括

  • 数据库镜像
  • 数据库实时备份
  • 索引构建和实时维护(拆分异构索引、倒排索引等)
  • 业务 cache 刷新
  • 带业务逻辑的增量数据处理

当前的 canal 支持源端 MySQL 版本包括 5.1.x , 5.5.x , 5.6.x , 5.7.x , 8.0.x

以上资料来源官网

Canal的简单使用

MySQL配置

首先需要安装MySQL数据库,目前笔者使用的MySQL版本是最新的8.0.33,这个过程就不在赘述了

在Windows下,一般在ProgramData/MySQL文件夹中就能找到配置文件"mysql.ini",打开,搜索修改或新建以下选项:

# 指定服务的id,这个要与canal中的区分开,因为每个服务节点的id都要不一样
server-id=1
# 生成的binlog文件的前缀名称,Windows下binlog文件一般存在ProgramData/MySQL/Data文件夹中
log-bin="NS9052929-bin"
# binlog日志的记录方式:row、statement、mixed
binlog_format=row
# 需要记录binlog的数据库,使用逗号分割可以指定多个,如不配置则是所有
binlog-do-db=canal-demo

然后重启MySQL数据库服务,Windows在服务窗口中就可以重启
在这里插入图片描述

新建数据库

新建一个架构(数据库):canal-demo

CREATE DATABASE `canal-demo` 
/* DEFAULT CHARACTER SET utf8mb4 COLLATE utf8mb4_0900_ai_ci */

向其中添加一张表

-- auto-generated definition
create table user
(
    id       int auto_increment comment '用户id'
        primary key,
    username varchar(100)   not null comment '用户名',
    age      int default -1 not null comment '用户年龄'
);

Canal的配置

在github中的release中下载压缩包:Releases-alibaba/canal
我下载了最新版本
在这里插入图片描述

把它解压到文件中,修改两个配置文件:
在这里插入图片描述
修改cana.properties,将其中的配置项修改:

# 这个要与上面图片中的文件夹名对应起来,其实是对应多个实例
# 如果要新建实例,复制一个example,改名字,并修改其中的配置文件即可
canal.destinations = example
# 设置服务端口,默认为11111
canal.port = 11111
# 设置服务模式,因为下面是对接Java,因此使用TCP
canal.serverMode = tcp
# 设置数据库的连接账号以及密码
canal.instance.tsdb.dbUsername = canal
canal.instance.tsdb.dbPassword = canal

修改实例中的配置文件:instance.properties

# 设置数据库路径
canal.instance.master.address=127.0.0.1:3306
# 设置数据库的账号密码
canal.instance.dbUsername=canal
canal.instance.dbPassword=canal
# 服务id,需要与mysql中的区分开
canal.instance.mysql.slaveId=20

修改完成后,双击bin/startup.bat,启动canal,看到下面页面则说明启动成功:
在这里插入图片描述

结合Java使用

新建一个maven项目, 向pom.xml文件中加入以下依赖:

<!-- https://mvnrepository.com/artifact/com.alibaba.otter/canal.client -->
<dependency>
    <groupId>com.alibaba.otter</groupId>
    <artifactId>canal.client</artifactId>
    <version>1.1.6</version>
</dependency>

编写一个客户端程序,连接canal:

import com.alibaba.otter.canal.client.CanalConnector;
import com.alibaba.otter.canal.client.CanalConnectors;
import com.alibaba.otter.canal.protocol.CanalEntry.*;
import com.alibaba.otter.canal.protocol.Message;
import com.google.protobuf.ByteString;
import lombok.extern.slf4j.Slf4j;

import java.net.InetSocketAddress;
import java.util.HashMap;
import java.util.List;
import java.util.Map;

@Slf4j
public class CanalClient {

    public static void main(String[] args) {
        // hostname, port, destination, username, password,username和password默认为空
        CanalConnector connector = CanalConnectors.newSingleConnector(new InetSocketAddress("127.0.0.1", 11111), "example", "", "");
        try {
            connector.connect();
            // 监听的表,格式为数据库.表名,数据库.表名
            connector.subscribe("canal-demo.*");
            // 不断循环获取
            while (true) {
                Message message = connector.getWithoutAck(100); // 获取指定数量的数据
                List<Entry> entries = message.getEntries();
                // 如果没有数据的话就等待1秒
                if (entries.isEmpty()) {
                    log.info("没有数据,休息一下");
                    try {
                        Thread.sleep(1000);
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                } else {
                    // 有数据的话循环打印出数据
                    for (Entry entry : entries) {
                        String tableName = entry.getHeader().getTableName();
                        log.info("表名:{}", tableName);
                        // 判断Entry类型是否为ROW变换
                        EntryType entryType = entry.getEntryType();
                        if (EntryType.ROWDATA.equals(entryType)) {
                            log.info("ROW变换");
                            // 序列化数据
                            ByteString storeValue = entry.getStoreValue();
                            // 反序列化数据
                            RowChange rowChange = RowChange.parseFrom(storeValue);
                            // 获取事件类型
                            log.info("事件类型:{}", rowChange.getEventType());
                            // 获取具体数据
                            List<RowData> rowDatasList = rowChange.getRowDatasList();
                            for (RowData rowData : rowDatasList) {
                                List<Column> beforeColumnsList = rowData.getBeforeColumnsList();
                                Map<String, Object> beforeMap = new HashMap<>();
                                for (Column column : beforeColumnsList) {
                                    beforeMap.put(column.getName(), column.getValue());
                                }
                                log.info("变化前的数据:{}", beforeMap);
                                List<Column> afterColumnsList = rowData.getAfterColumnsList();
                                Map<String, Object> afterMap = new HashMap<>();
                                for (Column column : afterColumnsList) {
                                    afterMap.put(column.getName(), column.getValue());
                                }
                                log.info("变化后的数据:{}", afterMap);
                            }
                        }
                    }
                }
            }
        } catch (Exception e) {
            e.printStackTrace();
        } finally {
            // 最后关闭连接
            connector.disconnect();
        }
    }
}

运行程序,insert、update、delete监控的数据库中的数据,就会看到控制台中有打印消息
在这里插入图片描述

结合Spring Boot使用

新建一个Spring Boot应用,向其中添加以下依赖

    <dependencies>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-web</artifactId>
        </dependency>

        <dependency>
            <groupId>org.projectlombok</groupId>
            <artifactId>lombok</artifactId>
            <optional>true</optional>
        </dependency>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-test</artifactId>
            <scope>test</scope>
        </dependency>
        <dependency>
            <groupId>top.javatool</groupId>
            <artifactId>canal-spring-boot-starter</artifactId>
            <version>1.2.1-RELEASE</version>
        </dependency>
        <dependency>
            <groupId>mysql</groupId>
            <artifactId>mysql-connector-java</artifactId>
            <version>8.0.33</version>
        </dependency>
        <dependency>
            <groupId>com.google.protobuf</groupId>
            <artifactId>protobuf-java</artifactId>
            <version>4.0.0-rc-2</version>
        </dependency>
    </dependencies>

Spring Booot版本是2.7.11,使用的JDK8
依赖中的canal-spring-boot-starter是其他开源者对canal在Spring Boot中的集成,测试不支持JDK17的版本,因此Spring Boot版本只能为3.0以下。

如果需要使用JDK17的话,也就是Spring Boot 3.0以上或者Spring 6的话,可以用另外一个开发者的包:behappy-canal,当然还有其他开发者也做了canal的集成,大家自己尝试下吧~

<dependency>
    <groupId>io.github.behappy-project</groupId>
    <artifactId>behappy-canal-spring-boot-starter</artifactId>
    <version>3.0.2</version>
</dependency>

配置文件:

# 数据库连接信息
spring.datasource.url=jdbc:mysql://localhost:3306/canal-demo?useUnicode=true&characterEncoding=utf-8&serverTimezone=UTC&useSSL=false
spring.datasource.username=root
spring.datasource.password=root
# Spring 服务名称
spring.application.name=canal-spring-boot-demo
# canal的服务地址
canal.server=127.0.0.1:11111
# 需要监控的实例
canal.destination=example
# 关闭日志,不然一秒打印一个日志,浪费空间
logging.level.top.javatool.canal.client=OFF

首先新建一个实体类User对应数据库中的字段

@Data
public class User {
    private Integer id;
    private String username;
    private Integer age;
}

然后书写一个CanalHandler实现接口EntryHandler<T>,在里面对insert、update、delete这几种操作加入自己的处理

import com.example.springdemo.model.User;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Component;
import top.javatool.canal.client.annotation.CanalTable;
import top.javatool.canal.client.handler.EntryHandler;
@Component
@CanalTable("user")
@Slf4j
public class CanalHandler implements EntryHandler<User> {
    @Override
    public void insert(User user) {
        log.info("插入用户:{}", user.toString());
    }

    @Override
    public void update(User before, User after) {
        log.info("用户修改前:{}", before.toString());
        log.info("用户修改后:{}", after.toString());
    }

    @Override
    public void delete(User user) {
        log.info("删除用户:{}", user.toString());
    }
}

运行结果
在这里插入图片描述

总结

通过Canal可以没有侵入的,即时的将数据库的改动同步到Redis、ElasticSearch或者其他数据存储库中,如果是在大数据方面需要数据聚合的话,推荐使用Flink CDC。目前Canal还有一个问题就是似乎不再维护了,但还是为我们提供了一个轻量化的数据迁移、同步工具。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/701451.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

CSS知识点汇总(十一)--回流重绘

文章目录 怎么理解回流跟重绘&#xff1f;什么场景下会触发&#xff1f;1、回流和重绘是什么&#xff1f;2、如何触发回流和重绘3、如何避免回流和重绘的发生 怎么理解回流跟重绘&#xff1f;什么场景下会触发&#xff1f; 1、回流和重绘是什么&#xff1f; 在HTML中&#xf…

二十、socket套接字编程(二)——TCP

文章目录 一、TCP套接字接口&#xff08;一&#xff09;inet_aton &#xff08;和inet_addr一样&#xff0c;换一种方式而已&#xff09;&#xff08;二&#xff09;socket()&#xff08;三&#xff09;bind()&#xff08;四&#xff09;listen()&#xff08;五&#xff09;acc…

ASP.NET Core MVC -- 入门

先决条件&#xff08;开发配置二选一&#xff09;&#xff1a; 带有 ASP.NET 和 Web 开发工作负载的Visual Studio Visual Studio Code Visual Studio Code用于 Visual Studio Code 的 C#&#xff08;最新版本&#xff09;.NET 7.0 SDK 创建Web应用 visual studio ctrl F5 …

云原生之深入解析Kubernetes网络流量的流转路径

一、Kubernetes 网络要求 Kubernetes 网络模型定义了一组基本规则&#xff1a; 在不使用网络地址转换 (NAT) 的情况下&#xff0c;集群中的 Pod 能够与任意其他 Pod 进行通信&#xff1b; 在不使用网络地址转换 (NAT) 的情况下&#xff0c;在集群节点上运行的程序能与同一节点…

王道《计算机网络》思维导图汇总

第一章 1.1.1 概念与功能 1.1.2 组成与分类 1.1.3 标准化工作及相关组织 1.1.4 性能指标 速率 带宽 吞吐量 时延 时延带宽积 往返时延RTT 利用率 1.2.1 分层结构、协议、接口、服务 1.2.2 OSI参考模型 应用层 表示层 会话层 传输层 网络层 数据链路层 物理层 1.2.4 TCP/IP 参…

内核角度看IO模型

聊聊Netty那些事儿之从内核角度看IO模型 网络包接收流程 当网络数据帧通过网络传输到达网卡时&#xff0c;网卡会将网络数据帧通过DMA的方式放到环形缓冲区RingBuffer中。RingBuffer是网卡在启动的时候分配和初始化的环形缓冲队列。当RingBuffer满的时候&#xff0c;新来的数据…

【AUTOSAR】BMS开发实际项目讲解(十三)----电池管理系统碰撞安全功能和SFR

SG-BMS-7 : BMS系统应避免碰撞保护功能异常引起的安全事故&#xff08;ASIL A&#xff09; 功能框图&#xff08;SG-BMS-7&#xff09; 功能组件说明 功能组件ID 功能组件名称 描述 ASIL等级 FSC-FC-05 Relay Drive 驱动继电器开启和关断 ASIL A FSC-FC-11 Detection …

【vue】可选链运算符(?.)和空值合并运算符(??):

文章目录 一、问题一:二、问题二:三、使用:【1】空值合并运算符&#xff08;??&#xff09;【2】可选链运算符&#xff08;?.&#xff09; 一、问题一: http://www.codebaoku.com/question/question-sd-1010000042870944.html //1、npm安装 npm install babel/plugin-propo…

批量修改文件命名的shell脚本

Android 制作开机动画的方法参考&#xff1a;linux开机动画制作教程 其中往往会把里面的png图片命名位XX_0001.png , 002.png……等 Window批量修改文件名时会带有空格和括号。 这里写了一个脚本&#xff0c;可以在批量修改文件名后&#xff0c;将文件名转换为XX_00001 格式&…

基于matlab使用 YOLO V2深度学习进行多类对象检测(附源码)

一、前言 此示例演示如何训练多类对象检测器。 深度学习是一种强大的机器学习技术&#xff0c;可用于训练强大的多类对象检测器&#xff0c;例如 YOLO v2、YOLO v4、SSD 和 Faster R-CNN。此示例使用该函数训练 YOLO v2 多类室内对象检测器。经过训练的物体检测器能够检测和识…

ModaHub魔搭社区:Milvus 监控指标和使用 Grafana 展示 Milvus 监控指标

目录 Milvus 监控指标 Milvus 性能指标 系统运行指标 硬件存储指标 Milvus 监控指标 Milvus 会生成关于系统运行状态的详细时序 metrics。你可以通过 Prometheus、Grafana 或任何可视化工具展现以下指标&#xff1a; Milvus 性能指标系统运行指标&#xff1a;CPU/GPU 使用…

单片机学习 11-中断系统(定时器中断+外部中断)

中断系统 中断介绍 ​ 中断是为使单片机具有对外部或内部随机发生的事件实时处理而设置的&#xff0c;中断功能的存在&#xff0c;很大程度上提高了单片机处理外部或内部事件的能力。它也是单片机最重要的功能之一&#xff0c;是我们学习单片机必须要掌握的。很多初学者被困在…

全网超全,Pytest自动化测试框架pytest-xdist分布式测试插件(实战)

目录&#xff1a;导读 前言一、Python编程入门到精通二、接口自动化项目实战三、Web自动化项目实战四、App自动化项目实战五、一线大厂简历六、测试开发DevOps体系七、常用自动化测试工具八、JMeter性能测试九、总结&#xff08;尾部小惊喜&#xff09; 前言 平常我们功能测试…

Markdown 扩展语法

✅作者简介&#xff1a;人工智能专业本科在读&#xff0c;喜欢计算机与编程&#xff0c;写博客记录自己的学习历程。 &#x1f34e;个人主页&#xff1a;小嗷犬的个人主页 &#x1f34a;个人网站&#xff1a;小嗷犬的技术小站 &#x1f96d;个人信条&#xff1a;为天地立心&…

Python在命令行模式下如何退出命令行

文章目录 python退出命令行模式结语 刚学习python的时候是用的命令行的方式&#xff0c;刚接触不知道如何退出命令行&#xff0c;百度参考了好几篇文章&#xff0c;这里记录一下&#xff0c;希望能帮助到有需要的小伙伴们。 python退出命令行模式 总结一下&#xff0c;共有三种…

Redis处理⾼并发 实现分布式锁

Redisson Redisson是架设在Redis基础上的⼀个Java驻内存数据⽹格&#xff08;In-Memory Data Grid&#xff09;。 Redisson在基于NIO的Netty框架上&#xff0c;充分的利⽤了Redis键值数据库提供的⼀系列优势&#xff0c;在Java实⽤⼯具包中常⽤ 接⼝的基础上&#xff0c;为使⽤…

高压功率放大器在光学测量中的应用有哪些

高压功率放大器在光学测量中有许多应用&#xff0c;例如在激光器和LED驱动、光电探测器和光电转换器中等。这些应用大多需要将输入信号放大到高电平输出&#xff0c;以便驱动高电压或大功率负载。 在激光器和LED驱动应用中&#xff0c;高压功率放大器可以将低电平的控制信号放大…

nginx纳入skywalking调用链监控

nginx纳入skywalking调用链监控 一、说明二、nginx部署2.1 OpenResty介绍2.2 准备SkyWalking Nginx Agent2.3 docker方式部署OpenResty2.3.1 修改配置文件2.3.2 启动OpenResty容器 2.4 验证 一、说明 服务器中已部署好skywalking&#xff0c;并将tomcat纳入skywalking监控(tom…

JavaSE基础语法--接口

接口在现实生活中比比皆是。比如电脑的USB接口&#xff0c;插座的接口。这些接口我们发现都是一样的规范。比如插座的有双孔插&#xff0c;有三孔插。那么对应就有双脚设备&#xff0c;和三脚的设备。从这我们就能摸清楚规律&#xff1a;接口就是统一规范的提供服务。Java中接口…

七年老Android推荐 : 日常开发中好用的工具 (二)

1. 前言 作为一名拥有七年经验的Android开发工程师&#xff0c;在日常开发中&#xff0c;总希望能提升自己的开发效率&#xff0c;对此也积累了一些工具&#xff0c;本文对此总结了一些好用的工具。 2. draw.io draw.io用来编写流程图非常好用&#xff0c;是一个免费的在线图…