【canal 中间件】canal 实时监听 binlog

news2025/1/16 20:02:17

文章目录

    • 一、安装 MySQL
      • 1.1 启动 mysql 服务器
      • 1.2 开启 Binlog 写入功能
        • 1.2.1创建 binlog 配置文件
        • 1.2.2 修改配置文件权限
        • 1.2.3 挂载配置文件
        • 1.2.4 检测 binlog 配置是否成功
      • 1.3 创建账户并授权
    • 二、安装 canal
      • 2.1 安装 canal-admin(可选)
        • 2.1.1 启动 canal-admin 容器
        • 2.1.2 访问页面
      • 2.2 安装 canal-server
        • 2.2.1 启动 canal 容器
        • 2.2.2 查看启动日志
    • 三、客户端代码
      • 3.1 导入依赖
      • 3.2 简单案例代码
    • 四、测试
      • 4.1 创建数据库及表
      • 4.2 插入数据
      • 4.3 更新数据
    • 参考资料

完整案例代码:java-demos/middleware-demos/spring-boot-canal at main · idealzouhu/java-demos

一、安装 MySQL

QuickStart · alibaba/canal Wiki (github.com)

1.1 启动 mysql 服务器

docker run --name mysql-canal ^
-p 3306:3306 ^
-e MYSQL_ROOT_PASSWORD=root ^
-d mysql:5.7.36

1.2 开启 Binlog 写入功能

对于自建 MySQL容器 , 我们需要开启 Binlog 写入功能。

1.2.1创建 binlog 配置文件

在宿主机上创建 my.cnf 文件,配置 binlog-format 为 ROW 模式。my.cnf 的配置内容如下:

[mysqld]
# 开启 binlog
log-bin=mysql-bin 
# 选择 ROW 模式
binlog-format=ROW 
# 配置 MySQL replaction 需要定义,不要和 canal 的 slaveId 重复
server_id=1 
1.2.2 修改配置文件权限

进入 MySQL 容器并修改 MySQL 容器配置文件 /etc/mysql/my.cnf 权限,以避免权限警告:

# 进入 MySQL 容器
$ docker exec -it mysql-canal bash

# 修改文件权限
$ chmod 644 /etc/mysql/my.cnf
$ exit

注意,在没有修改配置文件并启动 MySQL 容器情况下,MySQL 会警告配置文件 /etc/mysql/my.cnf 权限设置不当,允许所有用户写入(world-writable)。由于安全原因,MySQL 会忽略这个配置文件

[Warning] World-writable config file '/etc/mysql/my.cnf' is ignored.
1.2.3 挂载配置文件

在 MySQL 容器运行后,使用以下命令将创建的 my.cnf 文件挂载到容器内的 /etc/mysql/my.cnf

# 将本地的 my.cnf 文件复制到容器的指定目录下
$ docker cp D:\Learning\java-demos\middleware-demos\spring-boot-canal\src\main\resources\conf\my.cnf mysql-canal:/etc/mysql/

# 为了使新的配置生效,重启 MySQL 容器
$ docker restart mysql-canal

注意,MySQL 容器的 /etc/mysql/my.cnf 是一个符号链接,直接指定完整路径时会导致问题。

MySQL 启动时会首先加载主配置文件 /etc/mysql/my.cnf,然后加载 conf.d 目录下的所有配置文件。

1.2.4 检测 binlog 配置是否成功

进入 MySQL, 利用 show variables like 'log_bin'; 查看是否打开 binlog 模式:

$ docker exec -it mysql-canal bash

# 查看挂载后的 my.cnf 文件
$ tail /etc/mysql/my.cnf

# 查看 binlog 是否开启
$ mysql -uroot -proot
mysql> show variables like 'log_bin';
+---------------+-------+
| Variable_name | Value |
+---------------+-------+
| log_bin       | ON    |
+---------------+-------+
1 row in set (0.01 sec)

# 查看 binlog 日志文件列表
mysql> show binary logs;

# 查看正在写入的 binlog 文件
mysql> show master status;

# 查看 Binlog 文件内容
mysql> mysqlbinlog /var/lib/mysql/mysql-bin.000001


1.3 创建账户并授权

授权 canal 链接 MySQL 账号具有作为 MySQL slave 的权限, 如果已有账户可直接 grant

# 进入 mysql 容器
$ docker exec -it mysql-canal mysql -uroot -proot

# 创建用户名和密码都为 canal 的账户
mysql> CREATE USER canal IDENTIFIED BY 'canal';

# 授予权限 GRANT ALL PRIVILEGES ON *.* TO 'canal'@'%' ;
mysql> GRANT SELECT, REPLICATION SLAVE, REPLICATION CLIENT ON *.* TO 'canal'@'%';
FLUSH PRIVILEGES;

二、安装 canal

canal-server 和 canal-admin 在 Docker 里面的详细教程查看 Docker QuickStart · alibaba/canal Wiki 和 Canal Admin Docker · alibaba/canal Wiki。

2.1 安装 canal-admin(可选)

2.1.1 启动 canal-admin 容器
$ docker pull canal/canal-admin:v1.1.7

$ docker run -d ^
--name canal-admin ^
--privileged=true ^
--restart always ^
-p 8089:8089 ^
-e server.port=8089 ^
-e canal.adminUser=admin ^
-e canal.adminPasswd=admin ^
-m 512m ^
canal/canal-admin:v1.1.7

在 canal 启动成功后,查看 admin 日志

2024-10-28 21:35:01 DOCKER_DEPLOY_TYPE=VM
2024-10-28 21:35:01 ==> INIT /alidata/init/02init-sshd.sh
2024-10-28 21:35:01 ==> EXIT CODE: 0
2024-10-28 21:35:01 ==> INIT /alidata/init/fix-hosts.py
2024-10-28 21:35:01 ==> EXIT CODE: 0
2024-10-28 21:35:01 ==> INIT DEFAULT
2024-10-28 21:35:01 ==> INIT DONE
2024-10-28 21:35:01 ==> RUN /home/admin/app.sh
2024-10-28 21:35:01 ==> START ...
2024-10-28 21:35:01 Failed to get D-Bus connection: Operation not permitted
2024-10-28 21:35:01 Failed to get D-Bus connection: Operation not permitted
2024-10-28 21:35:01 start mysql ...
2024-10-28 21:35:10 mysql: [Warning] Using a password on the command line interface can be insecure.
2024-10-28 21:35:10 start mysql successful
2024-10-28 21:35:10 start admin ...
2024-10-28 21:35:15 start canal successful
2024-10-28 21:35:15 ==> START SUCCESSFUL ...
2.1.2 访问页面

访问 http://127.0.0.1:8089/ ,默认账号密码为 admin/123456

在这里插入图片描述

2.2 安装 canal-server

2.2.1 启动 canal 容器
$ docker pull canal/canal-server:v1.1.7

$ docker run -d ^
  --name canal-server ^
  --restart always ^
  -p 11111:11111 ^
  --privileged=true ^
  -e canal.destinations=test ^
  -e canal.instance.mysql.slaveId=1234  ^
  -e canal.instance.master.address=172.17.0.4:3306 ^
  -e canal.instance.dbUsername=canal ^
  -e canal.instance.dbPassword=canal ^
  -e canal.instance.connectionCharset=UTF-8 ^
  -e canal.instance.tsdb.enable=true ^
  -e canal.instance.gtidon=false ^
  -e canal.instance.filter.regex=.\*\\\\..\* ^
  -m 4096m ^
  canal/canal-server:v1.1.7

2.2.2 查看启动日志

在 canal 启动成功后,查看启动日志

2024-10-28 21:29:00 DOCKER_DEPLOY_TYPE=VM
2024-10-28 21:29:00 ==> INIT /alidata/init/02init-sshd.sh
2024-10-28 21:29:00 ==> EXIT CODE: 0
2024-10-28 21:29:00 ==> INIT /alidata/init/fix-hosts.py
2024-10-28 21:29:00 ==> EXIT CODE: 0
2024-10-28 21:29:00 ==> INIT DEFAULT
2024-10-28 21:29:00 ==> INIT DONE
2024-10-28 21:29:00 ==> RUN /home/admin/app.sh
2024-10-28 21:29:01 ==> START ...
2024-10-28 21:29:01 start canal ...
2024-10-28 21:29:00 Failed to get D-Bus connection: Operation not permitted
2024-10-28 21:29:00 Failed to get D-Bus connection: Operation not permitted
2024-10-28 21:29:36 start canal successful
2024-10-28 21:29:36 ==> START SUCCESSFUL ...

看到 successful 之后,就代表 canal-server 启动成功,然后就可以在 canal-admin 上进行任务分配了。

三、客户端代码

3.1 导入依赖

创建 Spring Boot 项目,并导入以下依赖。

<dependency>
    <groupId>com.alibaba.otter</groupId>
    <artifactId>canal.client</artifactId>
    <version>1.1.7</version>
</dependency>

<!-- Message、CanalEntry.Entry等来自此安装包 -->
<dependency>
    <groupId>com.alibaba.otter</groupId>
    <artifactId>canal.protocol</artifactId>
    <version>1.1.7</version>
</dependency>

3.2 简单案例代码

编写简单的案例打印 canal server 解析 binlog 获得的实体类信息, 具体代码如下:

public class SimpleCanalClientExample {
    /**
     * 主函数入口
     * <p>
     *     建立与Canal服务器的连接,订阅数据库变化,并处理接收到的消息
     * </p>
     *
     * @param args 命令行参数
     */
    public static void main(String[] args) {
        // 创建链接
        CanalConnector connector = CanalConnectors.newSingleConnector(new InetSocketAddress("127.0.0.1",
                11111), "test", "", "");
        // 批处理大小,即每次获取的最大消息数量
        int batchSize = 1000;
        // 连续接收到空消息的次数
        int emptyCount = 0;
        try {
            // 建立连接
            connector.connect();
            // 订阅所有数据库和表的变化
            connector.subscribe(".*\\..*");
            // 回滚到未确认的消息
            connector.rollback();
            // 最大连续接收到空消息的次数
            int totalEmptyCount = 1200;
            // 循环获取消息,直到连续接收到空消息的次数超过totalEmptyCount
            while (emptyCount < totalEmptyCount) {
                // 获取指定数量的数据
                Message message = connector.getWithoutAck(batchSize);
                // 获取消息ID
                long batchId = message.getId();
                // 获取消息中的数据条目数量
                int size = message.getEntries().size();
                // 如果消息ID为-1或数据条目数量为0,则增加空消息计数
                if (batchId == -1 || size == 0) {
                    emptyCount++;
                    System.out.println("empty count : " + emptyCount);
                    // 空消息过多时休眠1秒
                    try {
                        Thread.sleep(1000);
                    } catch (InterruptedException e) {
                    }
                } else {
                    // 如果接收到数据,则重置空消息计数
                    emptyCount = 0;
                    // 打印接收到的消息信息
                    // System.out.printf("message[batchId=%s,size=%s] \n", batchId, size);
                    printEntry(message.getEntries());
                }
                // 提交确认
                connector.ack(batchId);
                // 处理失败, 回滚数据
                // connector.rollback(batchId);
            }
            // 如果连续接收到的空消息次数过多,则退出
            System.out.println("empty too many times, exit");
        } finally {
            // 断开连接
            connector.disconnect();
        }
    }

    /**
     * 打印 canal server 解析 binlog 获得的实体类信息
     * <p>
     *     遍历给定的 entry 列表,解析并打印每个 entry 的详细信息除非 entry 类型是事务开始或结束
     *     对于非事务 entry,解析其存储值为 RowChange 对象,并根据事件类型打印变更信息
     * </p>
     *
     * @param entrys 条目列表,代表一系列数据库变更事件
     */
    private static void printEntry(List<Entry> entrys) {
        for (Entry entry : entrys) {
            // 跳过事务开始和事务结束的 entry
            if (entry.getEntryType() == EntryType.TRANSACTIONBEGIN || entry.getEntryType() == EntryType.TRANSACTIONEND) {
                continue;
            }

            RowChange rowChage = null;
            try {
                // 从 entry 的存储值中解析出 RowChange 对象
                rowChage = RowChange.parseFrom(entry.getStoreValue());
            } catch (Exception e) {
                // 如果解析失败,抛出运行时异常,并提供错误信息和原始异常
                throw new RuntimeException("ERROR ## parser of eromanga-event has an error , data:" + entry.toString(),
                        e);
            }

            // 获取事件类型,并打印 entry 的基本信息
            EventType eventType = rowChage.getEventType();
            System.out.println(String.format("================&gt; binlog[%s:%s] , name[%s,%s] , eventType : %s",
                    entry.getHeader().getLogfileName(), entry.getHeader().getLogfileOffset(),
                    entry.getHeader().getSchemaName(), entry.getHeader().getTableName(),
                    eventType));

            // 遍历 RowChange 中的所有行数据,并根据事件类型打印相应的列信息
            for (RowData rowData : rowChage.getRowDatasList()) {
                switch (eventType) {
                    case DELETE:
                        // 对于删除事件,打印行数据的"之前"状态
                        printColumn(rowData.getBeforeColumnsList());
                        break;
                    case UPDATE:
                        // 对于插入事件,打印行数据的"之后"状态
                        printColumn(rowData.getAfterColumnsList());
                        break;
                    default:
                        // 对于其他事件类型,打印行数据的"之前"和"之后"状态
                        System.out.println("-------&gt; before");
                        printColumn(rowData.getBeforeColumnsList());
                        System.out.println("-------&gt; after");
                        printColumn(rowData.getAfterColumnsList());
                }
            }
        }
    }


    /**
     * 打印列信息
     * 此方法接收一个Column对象列表作为参数,并遍历该列表,将每个Column对象的名称、值和更新状态打印到控制台
     * 主要用途是用于调试或日志记录,以直观地展示每个列的信息及其更新状态
     *
     * @param columns Column对象列表,包含要打印的列信息每个Column对象都应提供getName、getValue和getUpdated方法
     */
    private static void printColumn(List<Column> columns) {
        // 遍历Column对象列表
        for (Column column : columns) {
            // 打印每个Column对象的名称、值和更新状态
            System.out.println(column.getName() + " : " + column.getValue() + "    update=" + column.getUpdated());
        }
    }
}

四、测试

4.1 创建数据库及表

数据库变更:

CREATE DATABASE test_db;
USE test_db;
CREATE TABLE users (
    id INT PRIMARY KEY AUTO_INCREMENT,
    name VARCHAR(100),
    email VARCHAR(100)
);

控制台输出:

================&gt; binlog[mysql-bin.000004:1234] , name[test_db,] , eventType : QUERY
================&gt; binlog[mysql-bin.000004:219] , name[test,users] , eventType : CREATE

4.2 插入数据

插入语句:

INSERT INTO users (name, email) VALUES ('Alice', 'alice@example.com');
INSERT INTO users (name, email) VALUES ('Bob', 'bob@example.com');

控制台输出:

================&gt; binlog[mysql-bin.000004:595] , name[test,users] , eventType : INSERT
id : 1    update=true
name : Alice    update=true
email : alice@example.com    update=true

================&gt; binlog[mysql-bin.000004:883] , name[test,users] , eventType : INSERT
id : 2    update=true
name : Bob    update=true
email : bob@example.com    update=true

4.3 更新数据

更新语句:

UPDATE users SET email = 'newemail@example.com' WHERE name = 'Bob';

控制台输出:

================&gt; binlog[mysql-bin.000004:2370] , name[test_db,users] , eventType : UPDATE
id : 2    update=false
name : Bob    update=false
email : newemail@example.com    update=true

参考资料

ClientExample · alibaba/canal Wiki

超详细的canal入门,看这篇就够了-阿里云开发者社区 (aliyun.com)

【Canal】Canal Admin Docker部署 - H__D - 博客园

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/2231905.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

在阿里云快速启动Umami玩转网页分析

阿里云计算巢提供了Umami快速部署能力&#xff0c;使用者不需要自己下载代码&#xff0c;不需要自己安装复杂的依赖&#xff0c;不需要了解底层技术&#xff0c;只需要在控制台图形界面点击几下鼠标就可以快速部署并启动Umami&#xff0c;非技术同学也能轻松搞定。 什么是Umam…

【模型学习之路】手写+分析bert

手写分析bert 目录 前言 架构 embeddings Bertmodel 预训练任务 MLM NSP Bert 后话 netron可视化 code2flow可视化 fine tuning 前言 Attention is all you need! 读本文前&#xff0c;建议至少看懂【模型学习之路】手写分析Transformer-CSDN博客。 毕竟Bert是tr…

stm32移植LVGL(LVGL 8.2.0)

目录 1.下载LVGL源码 2.修改LVGL文件夹 (1)文件夹 examples 改动 (2)文件夹 demos 改动 3.最终LVGL文件夹内容 4.软件Keil配置、添加头文件 5.程序配置 6.其它配置参考链接 1.下载LVGL源码 LVGL源码地址&#xff1a;https://github.com/lvgl/lvgl 2.修改LVGL文件夹…

海南华志亿星电子商务有限公司电商服务的璀璨新星

在这个全民直播、短视频带货风起云涌的时代&#xff0c;抖音电商以其独特的魅力成为了众多商家竞相追逐的蓝海市场。而在这片波澜壮阔的商海中&#xff0c;海南华志亿星电子商务有限公司犹如一颗璀璨的新星&#xff0c;以其专业的服务、创新的策略&#xff0c;为无数商家照亮了…

动手学深度学习66 使用注意力机制的seq2seq

1. 使用注意力机制的seq2seq key value等价 是一个东西 第i个词rnn的输出 根据加权的不同&#xff0c;解码器前面用编码器前面的输出&#xff0c;到后面用后面的输出。 2. code 核心代码: context 怎么算 embedding没变&#xff0c;Decoder加了attention层 class Seq2SeqAt…

高校大数据实训平台介绍

高校大数据实验室架构 具体实训平台介绍 编程实训平台 1、大数据开发实训平台 大数据开发实训平台是面向实训课和课后训练的编程实训平台&#xff0c;平台底层基于Docker技术&#xff0c;采用容器云部署方案&#xff0c;预装大数据相关课程教学所需的实训环境…

【快速上手】pyspark 集群环境下的搭建(Yarn模式)

目录 前言&#xff1a; 一、安装步骤 安装前准备 1.第一步&#xff1a;安装python 2.第二步&#xff1a;在bigdata01上安装spark 3.第三步&#xff1a;同步bigdata01中的spark到bigdata02和03上 二、启动 三、可打开yarn界面查看任务 前言&#xff1a; 上一篇介绍的是…

【ARM Linux 系统稳定性分析入门及渐进 1.2 -- Crash 工具依赖内容】

文章目录 Prerequisites1. 内核对象文件2. 内存镜像3. 平台处理器类型4. Linux 内核版本 Prerequisites crash 工具需要依赖下面的内容&#xff1a; 1. 内核对象文件 vmlinux 文件&#xff1a;需要一个 vmlinux 内核对象文件&#xff0c;在本文中称为命名列表&#xff08;na…

【Canal 中间件】Canal 实现 MySQL 增量数据的异步缓存更新

文章目录 一、安装 MySQL1.1 启动 mysql 服务器1.2 开启 Binlog 写入功能1.2.1创建 binlog 配置文件1.2.2 修改配置文件权限1.2.3 挂载配置文件1.2.4 检测 binlog 配置是否成功 1.3 创建账户并授权 二、安装 RocketMQ2.1 创建容器共享网络2.2 启动 NameServer2.3 启动 Broker2.…

Spring Boot2.x教程:(十)从Field injection is not recommended谈谈依赖注入

从Field injection is not recommended谈谈依赖注入 1、问题引入2、依赖注入的三种方式2.1、字段注入&#xff08;Field Injection&#xff09;2.2、构造器注入&#xff08;Constructor Injection&#xff09;2.3、setter注入&#xff08;Setter Injection&#xff09; 3、为什…

解决 ClickHouse 高可用集群中 VRID 冲突问题:基于 chproxy 和 keepalived 的实践分析

Part1背景描述 近期&#xff0c;我们部署了两套 ClickHouse 生产集群&#xff0c;分别位于同城的两个数据中心。这两套集群的数据保持一致&#xff0c;以便在一个数据中心发生故障时&#xff0c;能够迅速切换应用至另一个数据中心的 ClickHouse 实例&#xff0c;确保服务连续性…

【Android】View的事件分发机制

文章目录 分发顺序ActivityViewGroupView 协作方法整体流程注意 Activity事件分发ViewGroup事件分发View点击事件总结 分发顺序 Activity->ViewGroup->View Activity 分发事件&#xff1a;Activity 通过 dispatchTouchEvent 方法分发事件&#xff0c;首先尝试将事件传递…

java项目之微服务在线教育系统设计与实现(springcloud)

风定落花生&#xff0c;歌声逐流水&#xff0c;大家好我是风歌&#xff0c;混迹在java圈的辛苦码农。今天要和大家聊的是一款基于springboot的闲一品交易平台。项目源码以及部署相关请联系风歌&#xff0c;文末附上联系信息 。 项目简介&#xff1a; 微服务在线教育系统设计与…

ChatGPT:真如吹的那般神乎其神吗?

ChatGPT的确是个神奇的东西。短短600多天&#xff0c;就已成全球访问量最大的网站之一。 ChatGPT已经出现在与这些大佬顶级大佬Google、Youtube、X.com、Baidu、Yahoo、amazon、Tiktok一起。 当然ChatGPT很优秀&#xff0c;这没有疑问&#xff0c;主要问题还是对度的把握上。…

【深度学习】实验 — 动手实现 GPT【二】:注意力机制、注意力掩码、多头注意力机制

【深度学习】实验 — 动手实现 GPT【二】&#xff1a;注意力机制、多头注意力机制 注意力机制简单示例&#xff1a;单个元素的情况简单示例&#xff1a;计算所有输入词元的注意力权重推广到所有输入序列词元&#xff1a; 注意力掩码代码实现多头注意力测试 注意力机制 简单示例…

简单的kafkaredis学习之kafka

简单的kafka&redis学习整理之kafka 1. kafka 1.1 什么是消息队列 在学习Kafka之前我们先来看一下什么是消息队列&#xff0c;消息队列(Message Queue)&#xff1a;可以简称为MQ 例如&#xff1a;Java中的Queue队列&#xff0c;也可以认为是一个消息队列 消息队列&#x…

基于人工智能的搜索和推荐系统

互联网上的搜索历史分析和用户活动是个性化推荐的基础&#xff0c;这些推荐已成为电子商务行业和在线业务的强大营销工具。随着人工智能的使用&#xff0c;在线搜索也在改进&#xff0c;因为它会根据用户的视觉偏好提出建议&#xff0c;而不是根据每个客户的需求和偏好量身定制…

ssm042在线云音乐系统的设计与实现+jsp(论文+源码)_kaic

摘 要 随着移动互联网时代的发展&#xff0c;网络的使用越来越普及&#xff0c;用户在获取和存储信息方面也会有激动人心的时刻。音乐也将慢慢融入人们的生活中。影响和改变我们的生活。随着当今各种流行音乐的流行&#xff0c;人们在日常生活中经常会用到的就是在线云音乐系统…

TVS 静电管 选型

参数选型举例: 静电管选型举例: 针对信号引脚一般只需ESD防护,关注其在IEC 61000−4−2波形下的测试结果:最大耐压值、钳位电压等,注意此时钳位电压的限值就不是Absolute maximum ratings值了,原因有2 1、Absolute maximum ratings值是指持续加压会损坏芯片 2、如果关…

监控调度台在交通运输行业的优势?

在当今快速发展的交通运输行业中&#xff0c;高效、安全的管理成为确保运营顺畅和乘客满意的关键。监控调度台作为这一领域的核心设备&#xff0c;正发挥着越来越重要的作用。它集成了视频监控、数据分析、实时通讯等多种功能&#xff0c;为交通运输行业带来了诸多优势。下面我…