【Canal】从原理、配置出发,从0到1完成Canal搭建

news2025/1/15 19:53:21

文章目录

    • 简介
    • 工作原理
      • MySQL主备复制原理
      • canal 工作原理
    • Canal架构
    • Canal-HA机制
    • 应用场景
      • 同步缓存 Redis /全文搜索 ES
      • 下发任务
      • 数据异构
    • MySQL 配置
      • 开启 binlog
      • 扩展
        • statement
        • row
        • mixed
      • 配置权限
    • Canal 配置
      • 配置
      • 启动
        • 报错
        • 解决
    • 实战
      • 引入依赖
      • 代码样例
      • 测试

前几天在网上冲浪的时候发现了一个比较成熟的开源中间件——Canal。在了解了它的工作原理和使用场景后,顿时产生了浓厚的兴趣。今天,就让我们跟随阿Q的脚步,一起来揭开它神秘的面纱吧。

简介

canal 翻译为管道,主要用途是基于 MySQL 数据库的增量日志 Binlog 解析,提供增量数据订阅和消费。

早期阿里巴巴因为杭州和美国双机房部署,存在跨机房同步的业务需求,实现方式主要是基于业务 trigger 获取增量变更。从 2010 年开始,业务逐步尝试数据库日志解析获取增量变更进行同步,由此衍生出了大量的数据库增量订阅和消费业务。

基于日志增量订阅和消费的业务包括

  • 数据库镜像;
  • 数据库实时备份;
  • 索引构建和实时维护(拆分异构索引、倒排索引等);
  • 业务 cache 刷新;
  • 带业务逻辑的增量数据处理;

当前的 canal 支持源端 MySQL 的版本包括 5.1.x,5.5.x,5.6.x,5.7.x,8.0.x。

工作原理

MySQL主备复制原理

  • MySQL master 将数据变更写入二进制日志( binary log, 其中记录叫做二进制日志事件 binary log events,可以通过 show binlog events 进行查看);
  • MySQL slave 将 master 的 binary log events 拷贝到它的中继日志(relay log);
  • MySQL slave 重放 relay log 中事件,将数据变更反映它自己的数据;

canal 工作原理

  • canal 模拟 MySQL slave 的交互协议,伪装自己为 MySQL slave ,向 MySQL master 发送 dump 协议;
  • MySQL master 收到 dump 请求,开始推送 binary log 给 slave (即 canal );
  • canal 解析 binary log 对象(原始为 byte 流);

github地址:https://github.com/alibaba/canal

完整wiki地址:https://github.com/alibaba/canal/wiki

Canal架构

一个 server 代表一个 canal 运行实例,对应于一个 jvm,一个 instance 对应一个数据队列。

instance模块:

  • eventParser :数据源接入,模拟 slave 协议和 master 进行交互,协议解析;
  • eventSink :Parser 和 Store 链接器,进行数据过滤、加工、分发的工作;
  • eventStore :数据存储;
  • metaManager :增量订阅&消费信息管理器;

instance 是 canal 数据同步的核心,在一个 canal 实例中只有启动 instace 才能进行数据的同步任务。一个 canal server 实例中可以创建多个 Canal Instance 实例。每一个 Canal Instance 可以看成是对应一个 MySQL 实例。

Canal-HA机制

所谓 HA 即高可用,是 High Available 的简称。通常我们一个服务要支持高可用都需要借助于第三方的分布式同步协调服务,最常用的是zookeeper 。canal 实现高可用,也是依赖了zookeeper 的几个特性:watcher 和 EPHEMERAL 节点。

canal 的高可用分为两部分:canal server 和 canal client

  • canal server: 为了减少对 mysql dump 的请求,不同 server 上的 instance(不同 server 上的相同 instance)要求同一时间只能有一个处于 running,其他的处于 standby 状态,也就是说,只会有一个 canal server 的 instance 处于 active 状态,但是当这个 instance down 掉后会重新选出一个 canal server。
  • canal client: 为了保证有序性,一份 instance 同一时间只能由一个 canal client 进行 get/ack/rollback 操作,否则客户端接收无法保证有序。

server ha 的架构图如下:

大致步骤:

  1. canal server 要启动某个 canal instance 时都先向 zookeeper 进行一次尝试启动判断(实现:创建 EPHEMERAL 节点,谁创建成功就允许谁启动);
  2. 创建 zookeeper 节点成功后,对应的 canal server 就启动对应的 canal instance,没有创建成功的 canal instance 就会处于 standby 状态。
  3. 一旦 zookeeper 发现 canal server A 创建的 instance 节点消失后,立即通知其他的 canal server 再次进行步骤1的操作,重新选出一个 canal server 启动 instance。
  4. canal client 每次进行 connect 时,会首先向 zookeeper 询问当前是谁启动了canal instance,然后和其建立链接,一旦链接不可用,会重新尝试 connect。

Canal Client 的方式和 canal server 方式类似,也是利用 zookeeper 的抢占 EPHEMERAL 节点的方式进行控制。

应用场景

同步缓存 Redis /全文搜索 ES

当数据库变更后通过 binlog 进行缓存/ES的增量更新。当缓存/ES更新出现问题时,应该回退 binlog 到过去某个位置进行重新同步,并提供全量刷新缓存/ES的方法。

下发任务

当数据变更时需要通知其他依赖系统。其原理是任务系统监听数据库变更,然后将变更的数据写入 MQ/kafka 进行任务下发,比如商品数据变更后需要通知商品详情页、列表页、搜索页等相关系统。

这种方式可以保证数据下发的精确性,通过 MQ 发送消息通知变更缓存是无法做到这一点的,而且业务系统中不会散落着各种下发 MQ 的代码,从而实现了下发归集。

数据异构

在大型网站架构中,DB都会采用分库分表来解决容量和性能问题。但分库分表之后带来的新问题,比如不同维度的查询或者聚合查询,此时就会非常棘手。一般我们会通过数据异构机制来解决此问题。

所谓的数据异构,那就是将需要 join 查询的多表按照某一个维度又聚合在一个 DB 中让你去查询,canal 就是实现数据异构的手段之一。

MySQL 配置

开启 binlog

首先在 mysql 的配置文件目录中查找配置文件 my.cnf(Linux环境)

[root@iZ2zebiempwqvoc2xead5lZ mysql]# find / -name my.cnf
/etc/my.cnf
[root@iZ2zebiempwqvoc2xead5lZ mysql]# cd /etc
[root@iZ2zebiempwqvoc2xead5lZ etc]# vim my.cnf

在 [mysqld] 区块下添加配置开启 binlog

server-id=1	#master端的ID号【必须是唯一的】;
log_bin=mysql-bin	#同步的日志路径,一定注意这个目录要是mysql有权限写入的
binlog-format=row	#行级,记录每次操作后每行记录的变化。
binlog-do-db=cheetah	#指定库,缩小监控的范围。

重启 mysql:service mysqld restart,会发现在 /var/lib/mysql 下会生成两个文件 mysql-bin.000001 和 mysql-bin.index,当 mysql 重启或到达单个文件大小的阈值时,新生一个文件,按顺序编号 mysql-bin.000002,以此类推。

扩展

binlog 日志有三种格式,可以通过 binlog_format 参数指定。

statement

记录的内容是 SQL语句 原文,比如执行一条 update T set update_time=now() where id=1,记录的内容如下

同步数据时,会执行记录的 SQL 语句,但是有个问题,update_time=now() 这里会获取当前系统时间,直接执行会导致与原库的数据不一致

row

为了解决上述问题,我们需要指定为 row,记录的内容不再是简单的 SQL 语句了,还包含操作的具体数据,记录内容如下。

row 格式记录的内容看不到详细信息,要通过 mysql binlog 工具解析出来。

update_time=now() 变成了具体的时间 update_time=1627112756247,条件后面的 @1、@2、@3 都是该行数据第1个~3个字段的原始值(假设这张表只有3个字段)。

这样就能保证同步数据的一致性,通常情况下都是指定为 row,这样可以为数据库的恢复与同步带来更好的可靠性。

缺点:占空间、恢复与同步时消耗更多的IO资源,影响执行速度。

mixed

MySQL 会判断这条 SQL 语句是否可能引起数据不一致,如果是,就用 row 格式,否则就用 statement 格式。

配置权限

CREATE USER canal IDENTIFIED BY 'XXXX';   #创建用户名和密码都为 canal 的用户
GRANT SELECT, REPLICATION SLAVE, REPLICATION CLIENT ON *.* TO 'canal'@'%'; #授予该用户对所有数据库和表的查询、复制主节点数据的操作权限
FLUSH PRIVILEGES; #重新加载权限

注意:如果密码设置的过于简单,会报以下错误

ERROR 1819 (HY000): Your password does not satisfy the current policy requirements

MySQL 有密码设置的规范,可以自行百度😃。

Canal 配置

官网下载地址,我下载的版本是 canal.deployer-1.1.6.tar.gz,然后通过 psftp 上传到服务器。

解压:tar -zxvf canal.deployer-1.1.6.tar.gz

配置

通过查看 conf/canal.properties 配置,发现需要暴漏三个端口

canal.admin.port = 11110
canal.port = 11111
canal.metrics.pull.port = 11112

修改 conf/canal.properties 配置

# 指定实例,多个实例使用逗号分隔: canal.destinations = example1,example2
canal.destinations = example

修改 conf/example/instance.properties 实例配置

# 配置 slaveId 自定义,不等于 mysql 的 server Id 即可
canal.instance.mysql.slaveId=10 

# 数据库地址:自己的数据库ip+端口
canal.instance.master.address=127.0.0.1:3306 
 
# 数据库用户名和密码 
canal.instance.dbUsername=xxx 
canal.instance.dbPassword=xxx

#代表数据库的编码方式对应到 java 中的编码类型,比如 UTF-8,GBK , ISO-8859-1
canal.instance.connectionCharset = UTF-8
	
# 指定库和表,这里的 .* 表示 canal.instance.master.address 下面的所有数据库
canal.instance.filter.regex=.*\\..*

如果系统是1个 cpu,需要将 canal.instance.parser.parallel 设置为 false

启动

需要在安装目录 /usr/local 下执行:sh bin/startup.sh 或者 ./bin/startup.sh

报错

发现在 logs 下没有生成 canal.log 日志,在进程命令中 ps -ef | grep canal 也查不到 canal 的进程。

解决

在目录 logs 中存在文件 canal_stdout.log ,文件内容如下:

报错信息提示内存不足,Java 运行时环境无法继续。更详细的错误日志在文件:/usr/local/bin/hs_err_pid25186.log 中。

既然是内存原因,那就检查一下自己的内存,执行命令free -h ,发现可用内存仅为 96M,应该是内存问题,解决方法如下:

  • 杀死运行的一些进程;
  • 增加虚拟机的内存;
  • 修改 canal 启动时所需要的内存;

我就是用的第三种方法,首先用 vim 打开 startup.sh 修改内存参数,可以对照我的进行修改,按照自己服务器剩余内存进行修改,这里我将内存调整到了 80M。

改为
-server -Xms80m -Xmx80m -Xmn80m -XX:SurvivorRatio=2 -XX:PermSize=66m -XX:MaxPermSize=80m -Xss256k -XX:-UseAdaptiveSizePolicy -XX:MaxTenuringThreshold=15 -XX:+DisableExplicitGC -XX:+UseConcMarkSweepGC -XX:+CMSParallelRemarkEnabled -XX:+UseCMSInitiatingOccupancyOnly -XX:+HeapDumpOnOutOfMemoryError

改完之后执行命令发现依旧报错:found canal.pid , Please run stop.sh first ,then startup.sh 意思是找到了 canal.pid,请先运行stop.sh。

这是由于 canal 服务不正常退出服务导致的,比如说虚拟机强制重启。

执行 stop.sh 命令后重新启动,成功运行,成功运行后可以在 canal/logs 文件夹中生成 canal.log 日志。

实战

引入依赖

<dependency>
	<groupId>com.alibaba.otter</groupId>
	<artifactId>canal.client</artifactId>
	<version>1.1.0</version>
</dependency>

代码样例

代码样例来自官网,仅用于测试使用

public class SimpleCanalClientExample {
    public static void main(String args[]) {
        // 创建链接:换成自己的数据库ip地址
        CanalConnector connector = CanalConnectors.newSingleConnector(new InetSocketAddress("127.0.0.1",
                11111), "example", "", "");
        int batchSize = 1000;
        int emptyCount = 0;
        try {
            connector.connect();
            connector.subscribe(".*\\..*");
            connector.rollback();
            int totalEmptyCount = 120;
            while (emptyCount < totalEmptyCount) {
                Message message = connector.getWithoutAck(batchSize); // 获取指定数量的数据
                long batchId = message.getId();
                int size = message.getEntries().size();
                if (batchId == -1 || size == 0) {
                    emptyCount++;
                    System.out.println("empty count : " + emptyCount);
                    try {
                        Thread.sleep(1000);
                    } catch (InterruptedException e) {
                    }
                } else {
                    emptyCount = 0;
                    printEntry(message.getEntries());
                }

                connector.ack(batchId); // 提交确认
            }

            System.out.println("empty too many times, exit");
        } finally {
            connector.disconnect();
        }
    }

    private static void printEntry(List<CanalEntry.Entry> entrys) {
        for (CanalEntry.Entry entry : entrys) {
            if (entry.getEntryType() == CanalEntry.EntryType.TRANSACTIONBEGIN || entry.getEntryType() == CanalEntry.EntryType.TRANSACTIONEND) {
                continue;
            }

            CanalEntry.RowChange rowChage = null;
            try {
                rowChage = CanalEntry.RowChange.parseFrom(entry.getStoreValue());
            } catch (Exception e) {
                throw new RuntimeException("ERROR ## parser of eromanga-event has an error , data:" + entry.toString(),
                        e);
            }

            CanalEntry.EventType eventType = rowChage.getEventType();
            System.out.println(String.format("================&gt; binlog[%s:%s] , name[%s,%s] , eventType : %s",
                    entry.getHeader().getLogfileName(), entry.getHeader().getLogfileOffset(),
                    entry.getHeader().getSchemaName(), entry.getHeader().getTableName(),
                    eventType));

            for (CanalEntry.RowData rowData : rowChage.getRowDatasList()) {
                if (eventType == CanalEntry.EventType.DELETE) {
                    printColumn(rowData.getBeforeColumnsList());
                } else if (eventType == CanalEntry.EventType.INSERT) {
                    printColumn(rowData.getAfterColumnsList());
                } else {
                    System.out.println("-------&gt; before");
                    printColumn(rowData.getBeforeColumnsList());
                    System.out.println("-------&gt; after");
                    printColumn(rowData.getAfterColumnsList());
                }
            }
        }
    }

    private static void printColumn(List<CanalEntry.Column> columns) {
        for (CanalEntry.Column column : columns) {
            System.out.println(column.getName() + " : " + column.getValue() + "    update=" + column.getUpdated());
        }
    }
}

测试

启动项目,打印日志

empty count : 1
empty count : 2
empty count : 3
empty count : 4

手动修改数据库中的字段:

================&gt; binlog[mysql-bin.000002:8377] , name[cheetah,product_info] , eventType : UPDATE
-------&gt; before
id : 3    update=false
name : java开发1    update=false
price : 87.0    update=false
create_date : 2021-03-27 22:43:31    update=false
update_date : 2021-03-27 22:43:34    update=false
-------&gt; after
id : 3    update=false
name : java开发    update=true
price : 87.0    update=false
create_date : 2021-03-27 22:43:31    update=false
update_date : 2021-03-27 22:43:34    update=false

可以看出是在 mysql-bin.000002文件中,数据库名称 cheetah ,表名 product_info,事件类型:update。

看完上面,再看看我为了写这篇文章又日渐稀少的头发,我忍不住哭出声来。可能只有给我点赞,才能平复我的心情吧。

好看的皮囊千篇一律,有趣的灵魂万里挑一,让我们在冷漠的城市里相互温暖,我是阿Q,我们下期再见!

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/711962.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

MYSQL03高级_新增用户、授予权限、授权底层表结构、角色理解

文章目录 ①. 登录服务器操作②. 用户的增删改③. 修改用户密码④. MySQL8密码管理⑤. 权限列表及原则⑥. 授予查看回收权限⑦. 底层权限表操作⑧. 角色的理解 ①. 登录服务器操作 ①. 启动MySQL服务后,可以通过mysql命令来登录MySQL服务器,命令如下: mysql –h hostname|hos…

chatgpt赋能python:搜索Python答案的软件

搜索Python答案的软件 介绍&#xff1a;什么是搜索Python答案的软件&#xff1f; 搜索Python答案的软件是一种工具&#xff0c;可以帮助编程人员快速地找到他们在编写Python代码时遇到的问题的答案。这种软件可以搜索各种不同的网站&#xff0c;以帮助用户找到最适合他们问题…

实例006 菜级联菜单

实例说明 如果管理程序功能菜单非常多&#xff0c;一些功能中又包括许多子功能&#xff0c;这时可以使用级联菜单来组织系统的各个功能。实例运行结果如图1.6所示。 图1.6 级联菜单 技术要点 制作级联菜单需要使用MenuStrip控件。 注意&#xff1a;在使用级联菜单时最好不要…

Redis三种模式——主从复制、哨兵、集群

目录 一、概述 二、 Redis 主从复制 1.主从复制的作用 2. 主从复制流程 3. 搭建Redis 主从复制 3.1准备环境 3.2安装redis 3.3创建redis工作目录 3.4环境变量 3.5定义systemd服务管理脚本 3.6修改 Redis 配置文件&#xff08;Master节点操作&#xff09; 3.7修改 …

我在「亚马逊云科技中国峰会」做讲师 - 「程序员的社区成长史」

文章目录 ⭐️ Part - 〇&#xff1a;开场的自我介绍⭐️ Part - ①&#xff1a;程序员的学习从技术社区开始&#x1f31f; 编程初学者共同面对的迷茫&#x1f31f; 加入一个适合自己的技术社区&#x1f31f; 反哺社区做有价值的贡献者 ⭐️ Part - ②&#xff1a;与技术社区的…

STM32F4 WiFi上传温度【ds18b20传感器、网络通信】

通过WIFI或GPRS上传温度到云端 本篇博客将介绍如何使用WIFI或GPRS模块将温度数据上传到云端。我们将涵盖连接网络的过程、上传数据的过程以及相关代码。 准备工作 在开始之前&#xff0c;我们需要准备以下材料&#xff1a; STM32F4开发板温度传感器&#xff08;例如18B20&a…

ch0_汇编介绍

1. 汇编作用 1.1 1.2 1.3 2.  机器语言到汇编语言 2.1 2.2 2.3 3.  计算机的组成 3.1 指令和数据是存放在存储器中的&#xff0c; 而计算机包含多种存储器&#xff1b; 但是&#xff0c;在计算机工作的过程中&#xff0c; 指令和数据则必须存放到内存中。 而对于…

代码随想录二刷day41 | 动态规划之 343. 整数拆分 96.不同的二叉搜索树

day41 343. 整数拆分确定dp数组&#xff08;dp table&#xff09;以及下标的含义确定递推公式dp的初始化确定遍历顺序举例推导dp数组 96.不同的二叉搜索树确定dp数组&#xff08;dp table&#xff09;以及下标的含义确定递推公式dp数组如何初始化确定遍历顺序举例推导dp数组 34…

【每日一题Day254】LC445两数相加Ⅱ | 链表反转 栈

两数相加Ⅱ【LC445】 给定两个 非空链表 l1和 l2 来代表两个非负整数。数字最高位位于链表开始位置。它们的每个节点只存储一位数字。将这两数相加会返回一个新的链表。 可以假设除了数字 0 之外&#xff0c;这两个数字都不会以零开头。 原来是专题模拟 反转链表 2022/11/4 思…

MySQL 记一个调优记录:最大化获取 uid 和 mobile

目录 前言调优过程总结 前言 环境&#xff1a;MySQL 5.6、windows 11 前阵子&#xff0c;有一个 BI 看板跑不起来&#xff0c;每次执行跑了很久&#xff0c;还不一定有结果&#xff0c;急需维护迭代。 经过调试&#xff0c;发现看板的SQL 逻辑中有一个开销非常大的逻辑影响了整…

2 Prometheus 简介

目录 1. 起源 2. Prometheus 架构 2.1 指标收集 2.2 服务发现 2.3 聚合和警报 2.4 查询数据 2.5 服务自治 2.6 冗余和高可用性 2.7 可视化 3. Prometheus数据模型 3.1 指标名称 3.2 标签 3.3 采样数据 3.4 符号表示 3.5 保留时间 4. 安全模型 5. Prometheus生态…

AI会取代程序员吗?这几个事实告诉你真相

人工智能&#xff08;AI&#xff09;的迅猛发展引起了许多关于其对各行各业的影响的讨论&#xff0c;其中包括程序员的未来。有人认为&#xff0c;AI的出现可能会使程序员岗位面临消失的风险&#xff0c;因为它们可以自动化编码和解决问题的过程。然而&#xff0c;在我们下结论…

MySQL:UNION的使用

UNION的使用 前言一、合并查询结果二、语法格式&#xff1a;三、UNION操作符四、UNION ALL操作符五、使用 前言 本博主将用CSDN记录软件开发求学之路上亲身所得与所学的心得与知识&#xff0c;有兴趣的小伙伴可以关注博主&#xff01; 也许一个人独行&#xff0c;可以走的很快…

全志V3S嵌入式驱动开发(full image制作和资料汇总)

【 声明&#xff1a;版权所有&#xff0c;欢迎转载&#xff0c;请勿用于商业用途。 联系信箱&#xff1a;feixiaoxing 163.com】 所谓的full image制作&#xff0c;就是制作一个image&#xff0c;上面包含了所有的嵌入式软件、库和配置文件。之前虽然我们也构建了spi-nor、spi-…

AC/DC(二): 整流

一、全波整流电路 全波整流电路可以看作是由两个半波整流电路组合而成&#xff0c;如图1所示&#xff0c; 图1 变压器次级线圈中间引出一个中心抽头&#xff0c;把次级线圈分成两个对称的绕组&#xff0c;从而引出大小相等但极性相反的两个电压VD1、VD2&#xff0c;构成VD1、…

14 MFC多进程

文章目录 创建进程win32子进程内容创建进程传递参数关闭进程通过配置文件读取全部代码 打开进程便利进程 创建进程 分别创建MFC应用程序和Win32应用程序 MFC应用程序界面设置 win32子进程内容 #include <Windows.h> int WINAPI wWinMain(HINSTANCE hInstance, HINSTAN…

分享在Linux下使用OSGi.NET插件框架快速实现一个分布式服务集群的方法

在这篇文章我分享了如何使用分层与模块化的方法来设计一个分布式服务集群。这个分布式服务集群是基于DynamicProxy、WCF和OSGi.NET插件框架实现的。我将从设计思路、目标和实现三方面来描述。 1 设计思路 首先&#xff0c;我来说明一下设计思路。我们先来看看目前OSGi.NET插件…

C++中生成二维码-libqrencode

文章目录 前言libqrencode在qt中调用libqrencode其他 前言 二维码的种类很多。本文仅介绍&#xff0c;如何用C生成QR码(QRcode)。通常而言&#xff0c;我们不需要知道QR码的详细结构&#xff0c;如QrCode的结构原理与实战 | 张展鹏的博客。我们只需要&#xff0c;可以将文本转…

Python——— 字符串

&#xff08;一&#xff09;字符串 字符串是 Python 中最常用的数据类型。我们可以使用引号 ( 或 " ) 来创建字符串。顾名思义&#xff0c;羊肉串是由羊肉做成的串&#xff0c;而字符串就是由字符组成的。 字符串的本质是&#xff1a;字符序列。 2 Python 不支持单字符…