如何在 Java 中使用 Canal 同步 MySQL 数据到 Redis

news2024/11/14 1:35:04

文章目录

  • 一、引言
  • 二、工作原理
    • 1. MySQL主备复制原理
    • 2. canal 工作原理
  • 三、环境准备
    • 1. 安装和配置 MySQL
    • 2. 安装和配置 Canal
    • 3. 安装和配置 Redis
  • 四、开发 Java 应用
    • 1. 添加依赖
    • 2. 编写 Canal 客户端代码
    • 3. 运行和测试
      • 3.1 启动 Canal 服务:
      • 3.2 启动 Redis 服务:
      • 3.3 启动 Java 应用:
      • 3.4 测试数据同步:
  • 五、注意事项
  • 六、结论
  • 七、参考资料

一、引言

在现代微服务架构中,数据同步是一个常见的需求。特别是将 MySQL 数据实时同步到 Redis,可以显著提升应用的性能和响应速度。本文将详细介绍如何使用 Canal 实现这一目标。Canal 是阿里巴巴开源的一个数据库 Binlog 同步工具,可以实时捕获 MySQL 的 Binlog 日志并将其同步到其他存储系统。
项目地址:alibaba/canal

二、工作原理

1. MySQL主备复制原理

  • MySQL master 将数据变更写入二进制日志( binary log, 其中记录叫做二进制日志事件binary log events,可以通过 show binlog events 进行查看)
  • MySQL slave 将 master 的 binary log events 拷贝到它的中继日志(relay log)
  • MySQL slave 重放 relay log 中事件,将数据变更反映它自己的数据

2. canal 工作原理

  • canal 模拟 MySQL slave 的交互协议,伪装自己为 MySQL slave ,向 MySQL master 发送dump 协议
  • MySQL master 收到 dump 请求,开始推送 binary log 给 slave (即 canal )
  • canal 解析 binary log 对象(原始为 byte 流)

三、环境准备

1. 安装和配置 MySQL

Canal的原理是基于mysql binlog技术,所以这里一定需要开启mysql的binlog写入功能,并且配置binlog模式为row。编辑 MySQL 配置文件 my.cnf 或 my.ini,添加或修改以下内容:

[mysqld]
server-id=1
log-bin=mysql-bin
binlog-format=ROW

授权 canal 链接 MySQL 账号具有作为 MySQL slave 的权限, 如果已有账户可直接 grant:

CREATE USER canal IDENTIFIED BY 'canal';  
GRANT SELECT, REPLICATION SLAVE, REPLICATION CLIENT ON *.* TO 'canal'@'%';
-- GRANT ALL PRIVILEGES ON *.* TO 'canal'@'%' ;
FLUSH PRIVILEGES;

重启 MySQL 服务以使配置生效:

sudo service mysql restart

2. 安装和配置 Canal

下载并解压 Canal 服务端:

wget https://github.com/alibaba/canal/releases/download/canal-1.1.5/canal.deployer-1.1.5.tar.gz
tar -zxvf canal.deployer-1.1.5.tar.gz -C /opt/canal
cd /opt/canal

编辑 Canal 配置文件 conf/example/instance.properties,配置 MySQL 服务器的相关信息:

canal.instance.master.address=127.0.0.1:3306
canal.instance.dbUsername=canal
canal.instance.dbPassword=canal
canal.instance.connectionCharset=UTF-8
canal.instance.filter.regex=.*\\..*

启动 Canal 服务:

sh bin/startup.sh

查看 server 日志

vi logs/canal/canal.log</pre>
2013-02-05 22:45:27.967 [main] INFO  com.alibaba.otter.canal.deployer.CanalLauncher - ## start the canal server.
2013-02-05 22:45:28.113 [main] INFO  com.alibaba.otter.canal.deployer.CanalController - ## start the canal server[10.1.29.120:11111]
2013-02-05 22:45:28.210 [main] INFO  com.alibaba.otter.canal.deployer.CanalLauncher - ## the canal server is running now ......

查看 instance 的日志

vi logs/example/example.log
2013-02-05 22:50:45.636 [main] INFO  c.a.o.c.i.spring.support.PropertyPlaceholderConfigurer - Loading properties file from class path resource [canal.properties]
2013-02-05 22:50:45.641 [main] INFO  c.a.o.c.i.spring.support.PropertyPlaceholderConfigurer - Loading properties file from class path resource [example/instance.properties]
2013-02-05 22:50:45.803 [main] INFO  c.a.otter.canal.instance.spring.CanalInstanceWithSpring - start CannalInstance for 1-example 
2013-02-05 22:50:45.810 [main] INFO  c.a.otter.canal.instance.spring.CanalInstanceWithSpring - start successful....

如果启动失败,注意检查配置文件conf/example/instance.properties的内容,还要注意JDK版本及配置。建议使用1.6.25。我用openjdk 21启动报错,改回JDK8u421启动成功。

3. 安装和配置 Redis

确保 Redis 服务已经安装并启动。可以在 Redis 客户端中执行以下命令检查:

redis-cli
ping

四、开发 Java 应用

1. 添加依赖

在你的 pom.xml 文件中添加 Canal 客户端和 Redis 客户端的依赖。以下是一个示例:

<dependencies>
    <dependency>
        <groupId>com.alibaba.otter</groupId>
        <artifactId>canal.client</artifactId>
        <version>1.1.5</version>
    </dependency>
    <dependency>
        <groupId>redis.clients</groupId>
        <artifactId>jedis</artifactId>
        <version>5.1.5</version>
    </dependency>
</dependencies>

2. 编写 Canal 客户端代码

创建一个 Java 类来连接 Canal 服务并处理 Binlog 事件,将数据同步到 Redis:

package org.hbin.canal;

import com.alibaba.otter.canal.client.CanalConnector;
import com.alibaba.otter.canal.client.CanalConnectors;
import com.alibaba.otter.canal.protocol.CanalEntry;
import com.alibaba.otter.canal.protocol.Message;
import redis.clients.jedis.Jedis;

import java.net.InetSocketAddress;
import java.util.List;

public class CanalToRedisSync {

    public static void main(String[] args) {
        // 创建 Canal 连接
        InetSocketAddress address = new InetSocketAddress("127.0.0.1", 11111);
        CanalConnector connector = CanalConnectors.newSingleConnector(address, "example", "", "");

        // 连接到 Canal 服务
        connector.connect();
        connector.subscribe(".*\\..*");
        connector.rollback();

        // 创建 Redis 客户端
        Jedis jedis = new Jedis("127.0.0.1", 6379);

        while (true) {
            Message message = connector.getWithoutAck(100); // 获取最多 100 条记录
            long batchId = message.getId();
            int size = message.getEntries().size();
            if (batchId == -1 || size == 0) {
                try {
                    Thread.sleep(1000);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            } else {
                handleEntry(message.getEntries(), jedis);
            }

            connector.ack(batchId); // 提交确认
            // connector.rollback(batchId); // 处理失败, 回滚数据
        }
    }

    private static void handleEntry(List<CanalEntry.Entry> entries, Jedis jedis) {
        for (CanalEntry.Entry entry : entries) {
            if (entry.getEntryType() == CanalEntry.EntryType.TRANSACTIONBEGIN || entry.getEntryType() == CanalEntry.EntryType.TRANSACTIONEND) {
                continue;
            }

            CanalEntry.RowChange rowChange = null;
            try {
                rowChange = CanalEntry.RowChange.parseFrom(entry.getStoreValue());
            } catch (Exception e) {
                throw new RuntimeException("ERROR ## parser of eromanga-event has an error , data:" + entry.toString(), e);
            }

            CanalEntry.EventType eventType = rowChange.getEventType();
            System.out.println(String.format("================> binlog[%s:%s] , name[%s,%s] , eventType : %s",
                    entry.getHeader().getLogfileName(), entry.getHeader().getLogfileOffset(),
                    entry.getHeader().getSchemaName(), entry.getHeader().getTableName(),
                    eventType));

            for (CanalEntry.RowData rowData : rowChange.getRowDatasList()) {
                if (eventType == CanalEntry.EventType.DELETE) {
                    syncDelete(rowData.getBeforeColumnsList(), jedis, entry.getHeader().getSchemaName(), entry.getHeader().getTableName());
                } else if (eventType == CanalEntry.EventType.INSERT) {
                    syncInsert(rowData.getAfterColumnsList(), jedis, entry.getHeader().getSchemaName(), entry.getHeader().getTableName());
                } else {
                    System.out.println("-------> before");
                    syncUpdate(rowData.getBeforeColumnsList(), jedis, entry.getHeader().getSchemaName(), entry.getHeader().getTableName());
                    System.out.println("-------> after");
                    syncUpdate(rowData.getAfterColumnsList(), jedis, entry.getHeader().getSchemaName(), entry.getHeader().getTableName());
                }
            }
        }
    }

    private static void syncInsert(List<CanalEntry.Column> columns, Jedis jedis, String schema, String table) {
        StringBuilder key = new StringBuilder();
        StringBuilder value = new StringBuilder();
        for (CanalEntry.Column column : columns) {
            if (column.getName().equals("id")) {
                key.append(column.getValue());
            } else {
                value.append(column.getName()).append(":").append(column.getValue()).append(",");
            }
        }
        System.out.println("Insert: " + key.toString() + " -> " + value.toString());
        jedis.hset(schema + ":" + table, key.toString(), value.toString());
    }

    private static void syncUpdate(List<CanalEntry.Column> columns, Jedis jedis, String schema, String table) {
        StringBuilder key = new StringBuilder();
        StringBuilder value = new StringBuilder();
        for (CanalEntry.Column column : columns) {
            if (column.getName().equals("id")) {
                key.append(column.getValue());
            } else {
                value.append(column.getName()).append(":").append(column.getValue()).append(",");
            }
        }
        System.out.println("Update: " + key.toString() + " -> " + value.toString());
        jedis.hset(schema + ":" + table, key.toString(), value.toString());
    }

    private static void syncDelete(List<CanalEntry.Column> columns, Jedis jedis, String schema, String table) {
        StringBuilder key = new StringBuilder();
        for (CanalEntry.Column column : columns) {
            if (column.getName().equals("id")) {
                key.append(column.getValue());
            }
        }
        System.out.println("Delete: " + key.toString());
        jedis.hdel(schema + ":" + table, key.toString());
    }
}

3. 运行和测试

3.1 启动 Canal 服务:

sh /opt/canal/bin/startup.sh

3.2 启动 Redis 服务:

确保 Redis 服务已经启动,可以在 Redis 客户端中执行以下命令检查:

redis-cli
ping

3.3 启动 Java 应用:

编译并运行上述 Java 应用,确保 Canal 服务和 MySQL 服务器正常运行。

3.4 测试数据同步:

在 MySQL 中插入、更新或删除数据,观察 Java 应用是否能够实时捕获这些变化并将数据同步到 Redis。
相关SQL如下:

drop database if exists canal;
create database canal;
use canal;

drop table if exists user;
create table user(
  `id` bigint AUTO_INCREMENT primary key,
  `name` varchar(20) NOT NULL,
  `age` tinyint DEFAULT 0,
  `detail` varchar(100) DEFAULT '',
  `create_time` date,
  `update_time` date
);

insert into user value(1, 'Tom1', 25, 'canal', '2024-11-07', '2024-11-07');
insert into user value(2, 'Tom2', 25, 'canal', '2024-11-07', '2024-11-07');
insert into user value(3, 'Tom3', 25, 'canal', '2024-11-07', '2024-11-07');
update user set age=26 where id=2;
delete from user where id=3;

输出信息:

================> binlog[binlog.000008:6390] , name[canal,user] , eventType : CREATE
================> binlog[binlog.000008:6899] , name[canal,user] , eventType : INSERT
Insert: 1 -> name:Tom1,age:25,detail:canal,create_time:2024-11-07,update_time:2024-11-07,
================> binlog[binlog.000008:7213] , name[canal,user] , eventType : INSERT
Insert: 2 -> name:Tom2,age:25,detail:canal,create_time:2024-11-07,update_time:2024-11-07,
================> binlog[binlog.000008:7527] , name[canal,user] , eventType : INSERT
Insert: 3 -> name:Tom3,age:25,detail:canal,create_time:2024-11-07,update_time:2024-11-07,
================> binlog[binlog.000008:7850] , name[canal,user] , eventType : UPDATE
-------> before
Update: 2 -> name:Tom2,age:25,detail:canal,create_time:2024-11-07,update_time:2024-11-07,
-------> after
Update: 2 -> name:Tom2,age:26,detail:canal,create_time:2024-11-07,update_time:2024-11-07,
================> binlog[binlog.000008:8193] , name[canal,user] , eventType : DELETE
Delete: 3

五、注意事项

  • 性能优化:根据实际需求调整 Canal 和 Redis 的配置,以优化性能。
  • 错误处理:在生产环境中,需要增加错误处理和重试机制,确保数据同步的可靠性。
  • 安全性:确保 Canal 和 Redis 的连接是安全的,使用适当的认证和授权机制。

六、结论

通过使用 Canal,我们可以轻松地将 MySQL 数据实时同步到 Redis、Kafka 或其他系统。这不仅提高了数据的一致性和实时性,还为应用提供了更高的性能和响应速度。希望本文对你有所帮助。

七、参考资料

  • canal QuickStart
  • canal ClientExample

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/2238365.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

ThinkBook 14+ 2024 Ubuntu 触控板失效 驱动缺失问题解决

首先我的电脑是thinkbook14 2024&#xff0c;从ubuntu18到ubuntu24&#xff0c;笔者整个都试了一遍&#xff0c;触摸板都没反应&#xff0c;确认不是linux系统内核问题&#xff0c;原因为驱动缺失。 解决步骤&#xff1a; &#xff08;1&#xff09;下载驱动&#xff0c;网址如…

如何使用 Web Scraper API 高效采集 Facebook 用户帖子信息

目录 前言一、什么是Web Scraper API二、Web Scraper API 的优势&#xff1a;三、Web Scraper API 适用场景四、实践案例目标需求视频讲解1、选择Web Scraper API2、登录注册3、进入用户控制面板4、选择API5、触发数据收集 API6、获取爬虫结果7、分析爬虫结果&#xff08;1&…

Qt_day3_信号槽

目录 信号槽 1. 概念 2. 函数原型 3. 连接方式 3.1 自带信号 → 自带槽 3.2 自带信号 → 自定义槽 3.3 自定义信号 4. 信号槽传参 5. 对应关系 5.1 一对多 5.2 多对一 信号槽 1. 概念 之前的程序界面只能看&#xff0c;不能交互&#xff0c;信号槽可以让界面进行人机…

Elastic 通用分析:提高性能并降低成本

作者&#xff1a;来自 Elastic Luca Wintergerst•Tim Rhsen 在这篇博客中&#xff0c;我们将介绍我们的一位工程师的一项发现如何帮助我们在 QA 环境中节省数千美元的成本&#xff0c;并且一旦我们将这一变化部署到生产中&#xff0c;还可以节省更多的成本。 在当今的云服务和…

【WRF理论第十一期】检查WPS输出:geogrid和metgrid 的输出nc数据+ungrib输出WPS格式

【WRF理论第十一期】检查WPS输出&#xff1a;geogrid和metgrid输出nc数据ungrib输出WPS格式 检查WPS输出WPS 输出检查的重要性使用 NetCDF 格式查看 geogrid 和 metgrid 的输出检查和可视化数据的工具 ungrib 输出数据的格式使用 plotfmt 工具查看 ungrib 输出 参考 上一篇博客…

万字长文解读深度学习——卷积神经网络CNN

推荐阅读&#xff1a; 卷积神经网络&#xff08;CNN&#xff09;详细介绍及其原理详解 CNN笔记&#xff1a;通俗理解卷积神经网络 文章目录 &#x1f33a;深度学习面试八股汇总&#x1f33a;主要组件输入层卷积层 (Convolutional Layer)批归一化层&#xff08;Batch Normalizat…

Redis生产问题(缓存穿透、击穿、雪崩)——针对实习面试

目录 Redis生产问题什么是缓存穿透&#xff1f;如何解决缓存穿透&#xff1f;什么是缓存击穿&#xff1f;如何解决缓存击穿&#xff1f;缓存穿透和缓存击穿有什么区别&#xff1f;什么是缓存雪崩&#xff1f;如何解决缓存雪崩&#xff1f; Redis生产问题 什么是缓存穿透&#x…

19、centos7优化

优化条目&#xff1a; 优化条目&#xff1a; 1.sudo管理用户授权 &#xff08;不用root管理,以普通用户的名义通过sudo提权&#xff09; 2.更改默认的远程连接SSH服务端口,禁止root用户远程连接,&#xff08;提前建立普通用户&#xff09;&#xff08;甚至更改为只监听内网IP…

河北省内首台心磁图仪正式落户河北梅奥心血管病医院

河北省内首台心磁图仪正式落户河北梅奥心血管病医院。 2024年11月9日&#xff0c;河北梅奥心血管病医院迎来了一场激动人心的历史时刻——河北省首台心磁图仪启用仪式在医院内隆重举行&#xff0c;标志着这一顶尖医疗设备正式入驻&#xff0c;为医院心脏影像诊断技术开启了全新…

【C语言刷力扣】283.移动零

题目&#xff1a; 解题思路&#xff1a; 将不为 0 的元素依次放在数组前面&#xff0c;再在数组末尾补上 0。 时间复杂度&#xff1a; 空间复杂度&#xff1a; void moveZeroes(int* nums, int numsSize) {int i 0, j 0;for (; i < numsSize; i) {if (nums[i]) {nums…

网络初阶——应用层:HTTPS 协议

一、HTTPS & HTTP 的区别 从协议的名字来看&#xff0c;HTTP 比 HTTPS 少了一个 S。而这个 “S”&#xff0c;其实可以理解成 “Safe”&#xff0c;所以不难看出&#xff0c;其实 HTTPS 就是 HTTP 的安全版。就是为了保证客户端 cookie 的传输安全的。 二、相关概念 1、明…

怎么禁止Ubuntu自动更新升级

怎么禁止Ubuntu自动更新升级 笔者在做MIT 6.S081的时候发现他给我的qemu自动更新了又卡住了&#xff0c;故关闭了自动更新 文章目录 怎么禁止Ubuntu自动更新升级一、图形化修改二、基于命令行修改配置文件的方法 一、图形化修改 1.打开设置->软件和更新->更新 2.选择自…

Spring Boot框架:构建符合工程认证的计算机课程

2相关技术 2.1 MYSQL数据库 MySQL是一个真正的多用户、多线程SQL数据库服务器。 是基于SQL的客户/服务器模式的关系数据库管理系统&#xff0c;它的有点有有功能强大、使用简单、管理方便、安全可靠性高、运行速度快、多线程、跨平台性、完全网络化、稳定性等&#xff0c;非常…

机器学习—选择激活函数

可以为神经网络中的不同神经元选择激活函数&#xff0c;我们将从如何为输出层选择它的一些指导开始&#xff0c;事实证明&#xff0c;取决于目标标签或地面真相标签y是什么&#xff0c;对于输出层的激活函数&#xff0c;将有一个相当自然的选择&#xff0c;然后看看激活函数的选…

【学习记录】使用CARLA录制双目摄像头SLAM数据

一、数据录制 数据录制的部分参考了网上的部分代码&#xff0c;代码本身并不复杂&#xff0c;基本都是简单的CARLA语法&#xff0c;关键的一点在于&#xff0c;CARLA内部本身并没有预设的双目摄像头&#xff0c;需要我们添加两个朝向相同的摄像头来组成双目系统&#xff0c;这…

[论文粗读][REALM: Retrieval-Augmented Language Model Pre-Training

引言 今天带来一篇检索增强语言模型预训练论文笔记——REALM: Retrieval-Augmented Language Model Pre-Training。这篇论文是在RAG论文出现之前发表的。 为了简单&#xff0c;下文中以翻译的口吻记录&#xff0c;比如替换"作者"为"我们"。 语言模型预训练…

【人工智能】ChatGPT多模型感知态识别

目录 ChatGPT辅助细化知识增强&#xff01;一、研究背景二、模型结构和代码任务流程一&#xff1a;启发式生成 三、数据集介绍三、性能展示实现过程运行过程训练过程 ChatGPT辅助细化知识增强&#xff01; 多模态命名实体识别&#xff08;MNER&#xff09;最近引起了广泛关注。…

【黑马点评debug日记】

q1:登录无session跳转主页 p30&#xff0c;页面登录后返回&#xff0c;然后点击我的&#xff0c;需要重新设置&#xff0c;拦截器都没有问题。 参考&#xff1a; redis 黑马点评p30 login没有正常跳转&#xff0c;修改前端代码后还是一直跳转主界面_黑马点评登录后跳转到主页…

地面远阴影对光伏电站的影响

影响因素 1、太阳高度角和方位角 太阳高度角是指太阳光的入射方向和地平面之间的夹角。太阳高度角随时间、季节和地理位置的变化而变化。 方位角是指太阳光线在水平面上的投影与正南方向的夹角。方位角也随时间和地理位置的变化而变化。 可以通过天文公式或者专业的太阳位置…

消息队列高级

目录 消息可靠性 生产者消息确认 第一步&#xff1a;修改application.yml配置文件信息 第二步&#xff1a;定义发送者确认confirm回调方法 第三步&#xff1a;创建消息发送者回执return回调方法&#xff08;确保消息从交换机到消息队列&#xff09; 总结&#xff1a; 消息持…