Java开发 - Canal的基本用法

news2025/1/24 11:36:45

前言

今天给大家带来的是Canal的基本用法,Canal在Java中常被我们用来做数据的同步,当然不是MySQL与MySQL,Redis与Redis之间了,如果是他们,那就好办了,我们可以直接通过配置来完成他们之间的主从、主主,级联等的同步,为什么要用Canal呢?主要是为了完成MySQL与Redis、MySQL与ES之间的数据同步,其本质是同步的过程中降低代码的耦合度,否则我们完全可以通过代码分别往几种不同的存储方存储数据。

认识Canal

什么是Canal

canal [kə'næl],译意为水道/管道/沟渠,主要用途是基于 MySQL 数据库增量日志解析,提供增量数据订阅和消费。

下面这张图可以代表Canal的用途,就染我们来一起瞻仰一下:

在看到这张图后,我们要感谢开发者的付出,提供给我们这么好的工具,目前来说,很多公司做数据同步都是采用的这种方式,可以通过Canal分别向MySQL,ES里同步数据。

基于日志增量订阅和消费的业务包括

  • 数据库镜像
  • 数据库实时备份
  • 索引构建和实时维护(拆分异构索引、倒排索引等)
  • 业务 cache 刷新
  • 带业务逻辑的增量数据处理

当前的 canal 支持源端 MySQL 版本包括 5.1.x , 5.5.x , 5.6.x , 5.7.x , 8.0.x

基本原理

Canal的实现主要利用了MySQL主从复制的原理,细分如下:

  • MySQL master 将数据变更写入二进制日志( binary log, 其中记录叫做二进制日志事件binary log events,可以通过 show binlog events 进行查看)
  • MySQL slave 将 master 的 binary log events 拷贝到它的中继日志(relay log)
  • MySQL slave 重放 relay log 中事件,将数据变更反映它自己的数据

也就是说,Canal将自己伪装成一个MySQL的从库,像其他的Slava一样,向Master发送dump 协议,MySQL master 收到 dump 请求,开始推送 binary log 给 slave (即 canal ),canal 解析 binary log 对象(原始为 byte 流)。

Canal准备

第一次接触Canal的小伙伴点击下面链接下载Canal:

Releases · alibaba/canal · GitHub

不要使用太新的版本,我们就用1.1.4的版本: 

下载完成之后放在一个英文路径下,我们改下文件夹的名字canal,下有四个文件夹:

 

MySQL配置

这里,我们不需要去配置MySQL的主从,如果你想了解,不妨去看这篇博客:

Java开发 - MySQL主从复制初体验

这里有你想要的主从配置,和对主从配置的一些心得体会。

在此处,我们只需要开启一个MySQL服务,设置一个连接的用户和密码,整体上和配置MySQL主从的步骤差不多,因为本质上也是要把Canal配置成MySQL的Slava的。

MySQL服务开启了吧?那么登陆MySQL服务,我们先来创建并授权一个用户.

创建用户:

CREATE USER 'canal'@'%' IDENTIFIED WITH 'mysql_native_password' BY '123456';

mysql8.0和5.x其中一个改动就是加密认证方式发生改变,这个在上面提到的MySQL主从复制里有提到,caching_sha2_password是8.0, mysql_native_password是5.x,canal我们这里都采用mysql_native_password的方式创建密码。

远程授权: 

GRANT ALL PRIVILEGES ON *.* TO 'canal'@'%'  WITH GRANT OPTION;

刷新权限:

FLUSH PRIVILEGES;

修改my.cnf文件,这个根据自己mysql安装位置的路径去找,但似乎这个文件大多情况是不存在的,所以我们直接在etc目录下创建一个用就行,实在害怕,可以运行如下命令查看my.cnf的默认运行位置:

 mysql --help | grep 'my.cnf'

 

所以在默认路径下:/usr/local/Cellar/mysql/版本号/ ,此处没有etc文件,自己手动创建吧,不要怂,接着:

进入etc文件,在这里运行:

vim my.cnf

输入:

[mysqld]
# 打开binlog
log-bin=mysql-bin
# 选择ROW(行)模式
binlog-format=ROW
# 不要和canal的slaveId重复即可
server_id=1

 退出并保存,然后重启mysql。

检查mysql的binlog是否开启:

show variables like 'log_bin';

 

已开启。

检查binlog_format:

show variables like "%binlog_format%";

 

显示ROW,代表我们设置生效。

检查server_id:

show variables like "%server_id%";

 

我们设置的1,已生效。 

查看当前正在写入的binlog文件:

show master status;

 

我们主要看的就是这两个参数,记住,到此为止,不要再动数据库的任何东西,否则这两个数据会改变,对我们配置canal会有影响。 上面的两个参数,我们在稍后配置canal的时候需要。

额。。。。。不过,这俩参数其实可以不用设置,不设置就代表从最新的地方开始同步,博主已经试过了,没问题。

Canal配置

我们打开刚刚下载的canal文件夹,打开这个路径下的文件:conf/example/instance.properties:

#################################################
## mysql serverId , v1.0.26+ will autoGen
# canal.instance.mysql.slaveId=0

# enable gtid use true/false
canal.instance.gtidon=false

# position info
canal.instance.master.address=127.0.0.1:3306
canal.instance.master.journal.name=mysql-bin.000001
canal.instance.master.position=157
canal.instance.master.timestamp=
canal.instance.master.gtid=

# rds oss binlog
canal.instance.rds.accesskey=
canal.instance.rds.secretkey=
canal.instance.rds.instanceId=

# table meta tsdb info
canal.instance.tsdb.enable=true
#canal.instance.tsdb.url=jdbc:mysql://127.0.0.1:3306/canal_tsdb
#canal.instance.tsdb.dbUsername=canal
#canal.instance.tsdb.dbPassword=canal

#canal.instance.standby.address =
#canal.instance.standby.journal.name =
#canal.instance.standby.position =
#canal.instance.standby.timestamp =
#canal.instance.standby.gtid=

# username/password
canal.instance.dbUsername=canal
canal.instance.dbPassword=123456
canal.instance.connectionCharset = UTF-8
# enable druid Decrypt database password
canal.instance.enableDruid=false
#canal.instance.pwdPublicKey=MFwwDQYJKoZIhvcNAQEBBQADSwAwSAJBALK4BUxdDltRRE5/zXpVEVPUgunvscYFtEip3pmLlhrWpacX7y7GCMo2/JM6LeHmiiNdH1FWgGCpUfircSwlWKUCAwEAAQ==

# table regex
canal.instance.filter.regex=.*\\..*
# table black regex
canal.instance.filter.black.regex=
# table field filter(format: schema1.tableName1:field1/field2,schema2.tableName2:field1/field2)
#canal.instance.filter.field=test1.t_product:id/subject/keywords,test2.t_company:id/name/contact/ch
# table field black filter(format: schema1.tableName1:field1/field2,schema2.tableName2:field1/field2)
#canal.instance.filter.black.field=test1.t_product:subject/product_image,test2.t_company:id/name/contact/ch

# mq config
canal.mq.topic=example
# dynamic topic route by schema or table regex
#canal.mq.dynamicTopic=mytest1.user,mytest2\\..*,.*\\..*
canal.mq.partition=0
# hash partition config
#canal.mq.partitionsNum=3
#canal.mq.partitionHash=test.table:id^name,.*\\..*
#################################################

我们需要改的核心参数暂时不多,如下:

canal.instance.master.address=127.0.0.1:3306
canal.instance.master.journal.name=mysql-bin.000001
canal.instance.master.position=157

canal.instance.dbUsername=canal
canal.instance.dbPassword=123456

其他的暂时先不用改,后续将到实际应用的时候会讲,这几个参数不用博主说大家也应该知道什么意思了吧?保存一下。

现在我们来启动canal,canal的启动很简单,打开一个命令行工具,直接把bin/startup.sh文件拖进去回车就可以了,方式不固定:

 

命令行输出了一大段内容,但我们不知道canal启动成功了没,我们来看下:

 

通过jps可以看到CanalLauncher的进程号,看来应该是没问题的。 

单纯的Canal监听测试

下面我们创建一个最简单的Spring Boot工程,过程就不赘述了:

首先我们引入依赖:

<dependency>
    <groupId>com.alibaba.otter</groupId>
    <artifactId>canal.client</artifactId>
    <version>1.1.4</version>
</dependency>

版本号要和我们使用的一致。  

添加配置:

canal:
  serverAddress: 127.0.0.1
  serverPort: 11111
  instance:
    - example

在CannalClient类使用Spring Bean的生命周期函数afterPropertiesSet(),切记,这里只是监听,并不是真正项目上使用,不要照搬,此处知识单传让大家看到canal监听的效果:

package com.codingfire.canal.Client;

import com.alibaba.fastjson.JSONObject;
import com.alibaba.otter.canal.client.CanalConnector;
import com.alibaba.otter.canal.client.CanalConnectors;
import com.alibaba.otter.canal.protocol.CanalEntry;
import com.alibaba.otter.canal.protocol.Message;
import com.google.protobuf.ByteString;
import org.springframework.beans.factory.InitializingBean;
import org.springframework.stereotype.Component;

import java.net.InetSocketAddress;
import java.util.List;

@Component
public class CanalClient implements InitializingBean {
    private final static int BATCH_SIZE = 1000;

    @Override
    public void afterPropertiesSet() throws Exception {
        // 创建链接
        CanalConnector connector = CanalConnectors.newSingleConnector(new InetSocketAddress("127.0.0.1", 11111), "example", "", "");
        try {
            //打开连接
            connector.connect();
            //订阅数据库表,全部表
            connector.subscribe(".*\\..*");
            //回滚到未进行ack的地方,下次fetch的时候,可以从最后一个没有ack的地方开始拿
            connector.rollback();
            while (true) {
                // 获取指定数量的数据
                Message message = connector.getWithoutAck(BATCH_SIZE);
                System.out.println(message.getEntries().size());
                //获取批量ID
                long batchId = message.getId();
                //获取批量的数量
                int size = message.getEntries().size();
                //如果没有数据
                if (batchId == -1 || size == 0) {
                    try {
                        //线程休眠2秒
                        Thread.sleep(2000);
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                } else {
                    System.out.println("----------------");
                    //如果有数据,处理数据
                    //遍历entries,单条解析
                    for (CanalEntry.Entry entry : message.getEntries()) {
                        //获取表名
                        String tableName = entry.getHeader().getTableName();
                        //获取类型
                        CanalEntry.EntryType entryType = entry.getEntryType();
                        //获取序列化后的数据
                        ByteString storeValue = entry.getStoreValue();
                        //判断entry类型是否为ROWDATA类型
                        if (CanalEntry.EntryType.ROWDATA.equals(entryType)){
                            //反序列化
                            CanalEntry.RowChange rowChange = CanalEntry.RowChange.parseFrom(storeValue);
                            //获取当前事件操作类型
                            CanalEntry.EventType eventType = rowChange.getEventType();
                            //获取数据集
                            List<CanalEntry.RowData> rowDatasList = rowChange.getRowDatasList();
                            //遍历
                            for (CanalEntry.RowData rowData : rowDatasList) {
                                //改变前数据
                                JSONObject jsonObjectBefore = new JSONObject();
                                List<CanalEntry.Column> beforeColumnsList = rowData.getBeforeColumnsList();
                                for (CanalEntry.Column column : beforeColumnsList) {
                                    jsonObjectBefore.put(column.getName(),column.getValue());
                                }
                                //改变后数据
                                JSONObject jsonObjectAfter = new JSONObject();
                                List<CanalEntry.Column> afterColumnsList = rowData.getAfterColumnsList();
                                for (CanalEntry.Column column : afterColumnsList) {
                                    jsonObjectAfter.put(column.getName(),column.getValue());
                                }
                                System.out.println("Table:"+tableName+",EventTpye:"+eventType+",Before:"+jsonObjectBefore+",After:"+jsonObjectAfter);
                            }
                        }else {
                            System.out.println("当前操作类型为:"+entryType);
                        }
                    }
                }
                //进行 batch id 的确认。确认之后,小于等于此 batchId 的 Message 都会被确认。
                connector.ack(batchId);
            }
        } catch (Exception e) {
            e.printStackTrace();
        } finally {
            connector.disconnect();
        }
    }
}

下面,就到了最激动人心的时刻,请运行我们的Spring Boot工程:

 

看到这里,就代表启动成功了,下面,我们连接数据库:

mysql -uroot -p123456

随便你是哪个用户连接的都行,没有数据库,你就创建新的数据库,如果已经有了,那么你直接操作里面的数据库表即可,博主目前有一个canal数据库,我们就用这个数据库:

use canal;

博主里面有一张用户表,操作里面的表:

insert into user value(null ,'小明','123456',20,'13812345678');

现在查看控制台有没有监听到数据库变化:

  

可以看到控制台已经打印出了我们刚刚操作的SQL,测试成功。

注意:这里只是监控,并不是真实使用场景,只是让大家直观看到SQL语句被监听到的场景,实际应用中,我们会结合MQ来使用,但不在这篇讲解。 

结语

这篇博客只是canal 的基本配置和监听机制的讲解,旨在帮助大家了解canal的工作方式,在下一篇博客中,我们将结合MQ来做数据的同步,所以大家也不要着急,咱们慢慢来,一步一给脚印,一定要把基础知识学扎实,canal的配置相较于MySQL的主从还是很相似的,也比较简单,主要都是配置项,所以更需要我们细心,不要出错,否则一个参数的错误都是导致系统无法正常运行。好了,咱们下篇再见。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/703143.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

【AUTOSAR】BMS开发实际项目讲解(三十)----电池管理系统电池SOH和SOE估算

电池SOH估算 关联的系统需求 Sys_Req_4004、Sys_Req_4005; 功能实现描述 SOH主要包括以下内容&#xff1a; SOH模块输入信息 序号 参数 说明 1 满电电压 4.14V 3 电芯OCV曲线 [CELL] 4 充放电循环次数 [CELL] 2 电芯循环衰减数据表 [CELL] SOH算法 ID Descr…

在docker中使用tomcat

检查本地操作系统版本&#xff1a; [rootnode ~]# cat /etc/os-release NAME"CentOS Linux" VERSION"7 (Core)" ID"centos" ID_LIKE"rhel fedora" VERSION_ID"7" PRETTY_NAME"CentOS Linux 7 (Core)" ANSI_COLO…

第8届Python编程挑战赛初赛真题剖析-2022年全国青少年信息素养大赛

[导读]&#xff1a;超平老师计划推出《全国青少年信息素养大赛Python编程真题解析》50讲&#xff0c;这是超平老师解读Python编程挑战赛系列的第1讲。 全国青少年信息素养大赛&#xff08;原全国青少年电子信息智能创新大赛&#xff09;是“世界机器人大会青少年机器人设计与信…

如何做好云渗透测试的威胁建模(上)

NO.1 威胁建模相关定义 微软针对威胁建模&#xff08;Threat modeling&#xff09;的描述&#xff1a;威胁建模是帮助保护系统、应用程序、网络和服务的有效方法。这是一种工程技术&#xff0c;用于识别潜在的威胁和建议&#xff0c;以帮助降低风险并在开发生命周期的早期实现…

nodejs接口联动获取req的各种类型数据,搭建可视化流程引擎平台

nodejs接口联动获取req的各种类型数据&#xff0c;搭建可视化流程引擎平台 搭建nodejs服务创建对外开放端口&#xff0c;获取基础数据GET—queryGET—paramsPOST 联动MYSQL数据库获取websites表的所有书链接数据库 插入 搭建nodejs服务 考虑跨域等性能&#xff0c;简易的工程化…

基于SpringBoot和Mybatis用两种方式实现分页查询

上一篇文章中封装了通用的返回结果、通用分页结果。本文具体记录一下如何基于Mybatis实现分页查询。 参考文章&#xff1a; Github:pagehelper-spring-boot Github:HowToUse.md mybatis-plus分页查询三种方法 MyBatis-Plus分页插件 系列文章指路&#x1f449; 系列文章-基于Sp…

这会是下一代的 Java 程序员的技术栈吗?

Servlet 与 Reactive 技术栈 打开 Spring 的官方文档我们在 Reactive 一栏中可以看到下面的架构图&#xff0c;其中可以很明显的看到 Reactive 的技术栈跟 Servlet 技术栈是完全并行的。意思是说我们日常开发的 Servlet web 类型只是一半的内容&#xff0c;还有另外一半世界就…

TI AM64x开发板规格书(双核ARM Cortex-A53 + 单/四核Cortex-R5F + 单核Cortex-M4F,主频1GHz)

1 评估板简介 创龙科技TL64x-EVM是一款基于TI Sitara系列AM64x双核ARM Cortex-A53 单/四核Cortex-R5F 单核Cortex-M4F多核处理器设计的高性能评估板&#xff0c;由核心板和评估底板组成。核心板经过专业的PCB Layout和高低温测试验证&#xff0c;高性能低功耗&#xff0c;稳…

3.2C++抽象类

C 抽象类 C的抽象类是指至少包含一个纯虚函数的类&#xff0c;不能被实例化。 抽象类的作用是为了实现接口的统一规范&#xff0c;使得不同的子类可以通过实现相同的纯虚函数来实现不同的行为。 定义抽象类时&#xff0c;需要在类中至少包含一个纯虚函数。 抽象类不能被实例…

idea如何连接数据库

输入数据库的用户名和密码就行 Database这里些数据库的名字 快速打开数据库设计 这句话是说时区有问题&#xff1a;因为Mysql用的是美国时区&#xff0c;要搞成中国时区&#xff1a; set global time_zone8:00; 打开cmd窗口&#xff1a; 输入 mysql -u root -p 密码&…

MySql高级篇-006 MySQL架构篇-02MySQL的数据目录:数据库下的主要目录结构、文件系统如何存储数据

第02章_MySQL的数据目录 1.MySQL8的主要目录结构 # 查询名称叫做mysql的文件目录都有哪些[rootatguigu07 ~]# find / -name mysql安装好MySQL 8之后&#xff0c;我们查看如下的目录结构&#xff1a; 1.1 数据库文件的存放路径 MySQL数据库文件的存放路径&#xff1a;/var/…

华为手环8 六月免费表盘上线,让你的腕上表盘更丰富多彩

在如今这个科技与艺术相互融合的时代&#xff01;工程师们给华为手环8花粉朋友准备的6月这一期好看又免费表盘已经上线。你可以根据自己的审美&#xff0c;选择一款最适合自己的表盘&#xff0c;展现出自己的个性和品味。快来选择你的专属表盘&#xff0c;让华为手环8成为你独特…

移动WEB开发之rem适配布局

css基础&#xff08;一&#xff09;css基础&#xff08;一&#xff09;_上半场结束&#xff0c;中场已休息&#xff0c;下半场ing的博客-CSDN博客Emmet语法Emmet语法_上半场结束&#xff0c;中场已休息&#xff0c;下半场ing的博客-CSDN博客css基础&#xff08;二&#xff09;c…

途乐证券|股市里的游资是什么?有什么特点?

股市是一个充满风险和机会的地方&#xff0c;吸引了各类投资者的目光&#xff0c;游资就是其中一种。那么股市里的游资是什么&#xff1f;有什么特点&#xff1f;途乐证券也为大家准备了相关内容&#xff0c;以供参考。 股市里的游资是什么&#xff1f; 一般来说&#xff0c;股…

【JavaScript】使用qrcode生成二维码

QRCode简介&#xff1a; QRCode.js 是一个用于生成二维码的 JavaScript 库。主要是通过获取 DOM 的标签,再通过 HTML5 Canvas 绘制而成,不依赖任何库。 使用步骤&#xff1a; 点击下载&#xff1a;https://github.com/davidshimjs/qrcodejs 或者mirrors / davidshimjs / qrco…

selenium 自动化测试 1-如何搭建自动化测试环境,搭建环境过程应该注意的问题

目录 前言&#xff1a; 1、selenium的介绍 2、selenium的版本介绍 3. selenium工作原理 4. selenium安装 4.1 安装selenium 4.2 安装浏览器驱动 4.3 安装浏览器 4.4 验证安装结果 5. 浏览器基本操作 前言&#xff1a; 搭建Selenium自动化测试环境是进行Web应用程序自…

京东到家小程序-在性能及多端能力的探索实践 | 京东云技术团队

一、前言 京东到家小程序最初只有微信小程序&#xff0c;随着业务的发展&#xff0c;同样的功能需要支持容器越来越多&#xff0c;包括支付宝小程序、京东小程序、到家APP、京东APP等&#xff0c;然而每个端分开实现要面临研发成本高、不一致等问题。 为了提高研发效率&#…

关于栈和队列的几个题

思维导图&#xff1a; 1.匹配括号 题目如下&#xff1a; 给定一个只包括 (&#xff0c;)&#xff0c;{&#xff0c;}&#xff0c;[&#xff0c;] 的字符串 s &#xff0c;判断字符串是否有效。 有效字符串需满足&#xff1a; 左括号必须用相同类型的右括号闭合。 左括号必须以…

【灾报警主机联网问题】

火灾报警主机联网问题一直是各消防项目中的难点&#xff0c;特别是管廊等长距离通讯中&#xff0c;如何保证通讯信号长期稳定可靠的运行是需要工程重点解决的问题&#xff0c;而LCAN-FOBR系列环网冗余式CAN光纤转换器提供二路光通道和一路CAN通道&#xff0c;实现CAN与光纤之间…

基于ENVI的遥感影像的非监督分类

ENVI包括了ISODATA和K-Mean两种非监督分类方法。 ISODATA&#xff08;Iterative Self-Orgnizing Data Analysize Technique&#xff09;是一种重复自组织数据分析技术&#xff0c;计算数据空间中均匀分布的类均值&#xff0c;然后用最小距离技术将剩余像元进行迭代聚合&#x…