SpringBoot 集成 canal

news2024/12/24 8:54:05

 什么是 Canal

        阿里巴巴 B2B 公司,因为业务的特性,卖家主要集中在国内,买家主要集中在国外,所以衍生出了同步杭州和美国异地机房的需求,从 2010 年开始,阿里系公司开始逐步的尝试基于数据库的日志解析,获取增量变更进行同步,由此衍生出了增量订阅&消费的业务。
        Canal 是用 Java 开发的基于数据库增量日志解析,提供增量数据订阅&消费的中间件。目前,Canal 主要支持了 MySQL 的 Binlog 解析,解析完成后才利用 Canal Client 来处理获得的相关数据。(数据库同步需要阿里的 Otter 中间件,基于 Canal)。

MySQL 的 Binlog

        MySQL 的二进制日志可以说 MySQL 最重要的日志了,它记录了所有的 DDL 和 DML(除了数据查询语句)语句,以事件形式记录,还包含语句所执行的消耗的时间,MySQL 的二进制日志是事务安全型的。
        一般来说开启二进制日志大概会有 1%的性能损耗。二进制有两个最重要的使用场景:
        其一:MySQL Replication 在 Master 端开启 Binlog,Master 把它的二进制日志传递给Slaves来达到 Master-Slave 数据一致的目的。
        其二:自然就是数据恢复了,通过使用 MySQL Binlog 工具来使恢复数据。
        二进制日志包括两类文件:二进制日志索引文件(文件名后缀为.index)用于记录所有的二进制文件,二进制日志文件(文件名后缀为.00000*)记录数据库所有的 DDL 和 DML(除了数据查询语句)语句事件。

Binlog 的分类

MySQL Binlog 的格式有三种,分别是 STATEMENT,MIXED,ROW。在配置文件中可以选择配置 binlog_format= statement|mixed|row。三种格式的区别:
        1)statement:语句级,binlog 会记录每次一执行写操作的语句。相对 row 模式节省空间  但是可能产生不一致性,比如“update tt set create_date=now()”,如果用 binlog 日志进行恢复,由于执行时间不同可能产生的数据就不同。
        优点:节省空间。
        缺点:有可能造成数据不一致。
        2)row:行级, binlog 会记录每次操作后每行记录的变化。
        优点:保持数据的绝对一致性。因为不管 sql 是什么,引用了什么函数,他只记录执行后的效果。
        缺点:占用较大空间。
        3)mixed:statement 的升级版,一定程度上解决了,因为一些情况而造成的 statement模式不一致问题,默认还是 statement。

             statement在某些情况下 会按照ROW 的方式进行处理譬如:

              1:当函数中包含 UUID() 时;

              2: 包含AUTO_INCREMENT 字段的表被更新时;

              3: 执行 INSERT DELAYED 语句时;

              4: 用 UDF 时;

        优点:节省空间,同时兼顾了一定的一致性。
        缺点:还有些极个别情况依旧会造成不一致,另外 statement 和 mixed 对于需要对binlog 的监控的情况都不方便。
综合上面对比,Canal 想做监控分析,选择 row 格式比较合适。 

Canal 的工作原理 

MySQL 主从复制过程

        1)Master 主库将改变记录,写到二进制日志(Binary Log)中;

        2)Slave 从库向 MySQL Master 发送 dump 协议,将 Master 主库的 binary log events 拷到它的中继日志(relay log);

        3)Slave 从库读取并重做中继日志中的事件,将改变的数据同步到自己的数据库。 

 Canal 的工作原理

原理相对比较简单:

  1. canal 模拟 mysql slave 的交互协议,伪装自己为 mysql slave,向 mysql master 发送 dump 协议

  2. mysql master 收到 dump 请求,开始推送 binary log 给 slave (也就是 canal)

  3. canal 解析 binary log 对象 (原始为 byte 流)。

Canal使用场景

1)原始场景: 阿里 Otter 中间件的一部分 Otter 是阿里用于进行异地数据库之间的同步框架,Canal 是其中一部分。

 2)常见场景 1:更新缓存

3)常见场景 2:抓取业务表的新增变化数据,用于制作实时统计。 

Canal使用实战 

 检查Mysql binlog功能是否有开启

mysql> show variables like 'log_bin';
+---------------+-------+
| Variable_name | Value |
+---------------+-------+
| log_bin       | ON    |
+---------------+-------+
1 row in set (0.10 sec)

 如果显示状态为OFF表示该功能未开启,开启binlog功能,修改 mysql 的配置文件my.ini,追加内容:


log-bin=mysql-bin #binlog文件名
binlog_format=ROW #选择row模式
server_id=1 #mysql实例id,不能和canal的slaveId重复

service mysql restart 重启 mysql。

创建具有作为 MySQL slave的MySQL 账号

CREATE USER canal IDENTIFIED BY 'canal';  
GRANT SELECT, REPLICATION SLAVE, REPLICATION CLIENT ON *.* TO 'canal'@'%';
-- GRANT ALL PRIVILEGES ON *.* TO 'canal'@'%' ;
FLUSH PRIVILEGES;

 下载安装Canal服务端

 https://github.com/alibaba/canal/releases

  • canal-adapter(canal-client)

        相当于canal的客户端,会从canal-server中获取数据(需要配置为tcp方式),然后对数据进行同步,可以同步到MySQL、Elasticsearch和HBase等存储中去。相较于canal-server自带的canal.serverMode,canal-adapter提供的下游数据接受更为广泛。

  • canal-admin

        为canal提供整体配置管理、节点运维等面向运维的功能,提供相对友好的WebUI操作界面,方便更多用户快速和安全的操作。

  • canal-deployer(canal-server)

        可以直接监听MySQL的binlog,把自己伪装成MySQL的从库,只负责接收数据,并不做处理。接收到MySQL的binlog数据后可以通过配置canal.serverMode:tcp, kafka, RocketMQ, RabbitMQ连接方式发送到对应的下游。其中tcp方式可以自定义canal客户端进行接受数据,较为灵活。

修改 instance.properties配置文件,

#需要改成数据源mysql数据库的信息
canal.instance.master.address=127.0.0.1:3306
#需要改成自己的数据库创建的从库用户名与密码
canal.instance.dbUsername=canal
canal.instance.dbPassword=canal
#需要改成同步的数据库表规则
canal.instance.filter.regex=.*\\..*

常见的匹配规则:

        所有表:.* or .\…

        canal 数据库下所有表: canal\…*

        canal数据库下的以canal打头的表:canal.canal.*

        canal 数据库下的一张表:canal.test1

        多个规则组合使用:canal\…*,mysql.test1,mysql.test2 (逗号分隔)

监听多个Mysql实例配置

        如果需要监听多个Mysql实例,通过前面 canal 架构,我们可以知道,一个 canal 服务 中可以有多个 instance,conf/下的每一个 example 即是一个实例,每个实例下面都有独立的 配置文件。默认只有一个实例 example,如果需要多个实例处理不同的 MySQL 数据的话,直 接拷贝出多个 example,并对其重新命名,命名和配置文件中指定的名称一致,然后修改 canal.properties 中的 canal.destinations=实例 1,实例 2,实例 3。

运行Canal服务端 sh bin/startup.sh(win下是运行 startup.bat)

Springboot集成Canal客户端

创建canal-clint SpringBoot工程

 在canal-clint 模块中配置 pom.xml

<dependency>
    <groupId>com.alibaba.otter</groupId>
    <artifactId>canal.client</artifactId>
    <version>1.1.2</version>
</dependency>

 创建单机版Canal客户端SimpleCanalClientExampleTest 

package com.canal.clint.clint;

/**
 * <p>
 * </p>
 * @since 2023-03-30 17:13
 */

import java.net.InetSocketAddress;
import java.util.List;

import com.alibaba.fastjson.JSONObject;
import com.alibaba.otter.canal.client.CanalConnector;
import com.alibaba.otter.canal.client.CanalConnectors;
import com.alibaba.otter.canal.protocol.CanalEntry;
import com.alibaba.otter.canal.protocol.Message;
import com.google.protobuf.ByteString;
import com.google.protobuf.InvalidProtocolBufferException;

public class SimpleCanalClientExampleTest {
    public static void main(String[] args) throws InvalidProtocolBufferException {
        // 1.获取 canal 连接对象
        CanalConnector canalConnector =
            CanalConnectors.newSingleConnector(new InetSocketAddress("127.0.0.1", 11111), "example", "", "");
        while (true) {
            // 2.获取连接
            canalConnector.connect();
            //  3.指定要监控的数据库,此处指定了要监听的库,会覆盖instance.properties配置的数据库表规则
            canalConnector.subscribe("intl.*");
            // 4.获取 Message
            Message message = canalConnector.get(100);
            List<CanalEntry.Entry> entries = message.getEntries();
            if (entries.size() <= 0) {
                System.out.println("没有数据,休息一会");
                try {
                    Thread.sleep(1000);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            } else {
                for (CanalEntry.Entry entry : entries) {
                    // TODO 获取表名
                    String tableName = entry.getHeader().getTableName();
                    // TODO Entry 类型
                    CanalEntry.EntryType entryType = entry.getEntryType();
                    // TODO 判断 entryType 是否为 ROWDATA
                    if (CanalEntry.EntryType.ROWDATA.equals(entryType)) {
                        // TODO 序列化数据
                        ByteString storeValue = entry.getStoreValue();
                        // TODO 反序列化
                        CanalEntry.RowChange rowChange = CanalEntry.RowChange.parseFrom(storeValue);
                        // TODO 获取事件类型
                        CanalEntry.EventType eventType = rowChange.getEventType();
                        // TODO 获取具体的数据
                        List<CanalEntry.RowData> rowDataList = rowChange.getRowDatasList();
                        // TODO 遍历并打印数据
                        for (CanalEntry.RowData rowData : rowDataList) {
                            List<CanalEntry.Column> beforeColumnsList = rowData.getBeforeColumnsList();
                            JSONObject beforeData = new JSONObject();
                            for (CanalEntry.Column column : beforeColumnsList) {
                                beforeData.put(column.getName(), column.getValue());
                            }
                            JSONObject afterData = new JSONObject();
                            List<CanalEntry.Column> afterColumnsList = rowData.getAfterColumnsList();
                            for (CanalEntry.Column column : afterColumnsList) {
                                afterData.put(column.getName(), column.getValue());
                            }
                            System.out.println("TableName:" + tableName + ",EventType:" + eventType + ",After:"
                                + beforeData + ",After:" + afterData);
                        }
                    }
                }
            }
        }
    }
}

 创建数据库user表

CREATE TABLE `user` (
  `id` int(11) NOT NULL,
  `name` varchar(255) COLLATE utf8mb4_bin DEFAULT NULL,
  `remark` varchar(255) COLLATE utf8mb4_bin DEFAULT NULL,
  PRIMARY KEY (`id`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_bin;

插入数据

INSERT INTO `intl`.`user`(`id`, `name`, `remark`) VALUES (1, '哈喽', 'Canal测试');

输出结果

注意坑:

  • 如果是基于阿里云服务器安装的Canal,记得开放11111端口(Canal的默认端口号);
  • 如果客户端调用了connector.subscribe("intl.*")方法,指定要监听的库,会覆盖instance.properties配置的数据库表规则;
  • 如果Mysq binlog日志类型设置为mixed可能会导致connector.subscribe("intl.*")方法失效,进而监听整个Mysql实例。

Canal的部署也是支持集群的,需要配合ZooKeeper进行集群管理。Canal还有一个Web管理界面。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/644837.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

Linux常用指令和知识

ls 显示工作目录底下的所有文件/文件夹 使用命令ls, 会直接显示HOME目录下的所有文件 如果不加任何参数,那么ls指定的目录是初始的HOME目录(因为初始的工作目录为HOME目录): 使用ls / 会显示根目录底下的所有文件 如何查看根目录: 三个参数: -a -h -l -a 选项表示all的意思,列…

Python 基于招聘数据可视化系统

1 简介 Python 基于招聘数据可视化系统&#xff0c;视频效果如下&#xff1a; 基于Python的招聘信息可视化系统&#xff0c;附源码 随着国内的经济不断的快速发展&#xff0c;现在学生的就业压力也在逐年增加&#xff0c;网络上的招聘信息非常的丰富&#xff0c;但是对于学生而…

百城巡展 | 人大金仓6月阔步新征程全力开新局

6月上旬&#xff0c;人大金仓“百城巡展”走过天津、杭州、成都&#xff0c;吸引线上线下逾6660人参与&#xff0c;并有14家新成员单位加入金兰生态组织&#xff0c;共同支撑用户更多关键性应用需求&#xff0c;为人大金仓开拓新市场、赋能新行业、构建新生态迈出坚实的一步。 …

驾驶舱数据指标体系设计

大数据时代下&#xff0c;各行各业面对众多的顾客和复杂多变的市场需求&#xff0c;要想及时适应市场变化&#xff0c;掌握市场动态&#xff0c;就需要对各个环节的数据进行分析&#xff0c;得到科学有效的结论来指导决策&#xff0c;这就离不开领导驾驶舱。 — 01 — 什么是…

Vue中如何进行数据可视化图表展示

Vue中如何进行数据可视化图表展示 数据可视化是现代化的数据分析和展示方式&#xff0c;可以使数据更加直观、易于理解和传达。Vue作为一款流行的前端框架&#xff0c;提供了丰富的插件和工具来实现数据可视化图表展示&#xff0c;其中最常用的是Echarts和D3.js。 本文将介绍…

Mind2Web: 首个全面衡量大模型上网能力的数据集

夕小瑶科技说 原创 作者 | 智商掉了一地、ZenMoore 在互联网的浩瀚世界中&#xff0c;存在着无数复杂而扑朔迷离的任务等待我们去解决。如果要设计一个解决很多问题的通用智能体&#xff08;AI agent&#xff09;&#xff0c;无论是关于购物、旅行、学习还是娱乐&#xff0c;…

张驰咨询:如何评估六西格玛咨询公司的专业水平和实际效果?

六西格玛是一个能够帮助企业改进业务流程&#xff0c;提高质量和效率的方法论和工具&#xff0c;也是一种经营管理思想。在选择六西格玛咨询公司时&#xff0c;就需要考虑以下几个方面。 1、咨询公司的信誉和口碑 首先要查明咨询公司的资质和信誉。可以从市场上那些知名度比较…

微信小程序反编译报SyntaxError: Unexpected token ‘}‘ 不完美的解决方法

文章目录 1.反编译报错2.分析已反编译出来的文件3.错误原因4.没有完美解决的方法5.小笔记6.相关链接 1.反编译报错 最近在搞小程序&#xff0c;参考Csdn博客的微信小程序反编译Blog&#xff0c;一步一步操作&#xff0c;获取到了.wxapkg&#xff0c;在wxappUnpacker目录下执行…

Vue中如何进行分布式错误日志收集与监控

Vue中如何进行分布式错误日志收集与监控 随着前端界面的复杂化&#xff0c;前端错误日志的收集和监控也成为了一个重要的问题。在分布式应用中&#xff0c;需要跨多个前端应用和后端服务收集和监控错误日志。本文将介绍如何在 Vue 中使用 Sentry 进行分布式错误日志收集和监控…

应急响应:系统入侵排查指南

目录 系统基本信息排查 Windows系统排查 Linux系统排查 CPU信息 操作系统信息 载入模块排查 用户排查 Windows系统用户排查 排查所有账户 Linux用户排查 root账户排查 查看所有可登录账户 查看用户错误的登录信息 查看所有用户最后登录信息 排查空口令账户 启…

从开发到部署:一站式指南创建个性化 Slack App 问答机器人

从开发到部署&#xff1a;一站式指南创建个性化 Slack App 问答机器人 01 简介 做这个教程是因为看别人拿免费的割韭菜很不爽&#xff0c;所以准备做个教程来教大家如何搭建一个问答机器人 内核其实就是利用了slack提供的官方api&#xff0c;自己创建app然后获取艾特信息&#…

Python3数据分析与挖掘建模(15)特征选择与特征变换

1 特征选择 1. 1 概述 特征选择是一种剔除与标注不相关或冗余的特征的方法&#xff0c;以减少特征集的维度和复杂性&#xff0c;并提高模型的性能和解释能力。特征选择的目标是选择那些对目标变量有预测能力且与其他特征不冗余的特征。 特征选择的方法可以分为三类&#xff…

【国产虚拟仪器】基于ARM+FPGA+8通道高速AD代替美国国家仪器的电能质量分析仪设计(一)NI方案介绍

一、背景&#xff1a;基于美国国家仪器的采集方案介绍 本文设计的电能质量分析仪数据分析系统以NI公司的National Instruments LabVIEW2018作为软件开发平台&#xff0c;结合硬件平台&#xff0c;实现数据的采集、波形显示和数据 分析。硬件电路的主要作用是对电网信号进行降幅…

聚焦产品研发,极米科技创新能力领跑行业

近年来&#xff0c;在消费升级、线上渠道迅速放量的背景下&#xff0c;家用智能投影已成为中国投影仪的第一大细分市场。有数据显示&#xff0c;2017年以来&#xff0c;中国消费级投影机出货量持续提升。根据第三方机构IDC&#xff08;国际数据公司&#xff09;统计&#xff0c…

Matter实战系列-----3.Matter Light和Switch配网和控制实验

专有名词&#xff1a; OT-RCP&#xff1a; Open Thread Radio Co-Processor 。 Thread 无线协处理器 OTBR&#xff1a; Open Thread Board Router 。 Thread 边界路由器 chip-tool&#xff1a; Linux 应用程序。用于 Matter 协议控制 ot-ctl&#xff1a; Thread 网络控…

爬虫一定要用代理ip吗?

使用代理IP可以帮助爬虫隐藏真实IP地址&#xff0c;防止被网站封禁或限制访问。此外&#xff0c;使用代理IP还可以帮助爬虫绕过一些地区或国家的访问限制&#xff0c;获取更多的数据。因此&#xff0c;对于一些需要频繁爬取数据的爬虫&#xff0c;使用代理IP是一个不错的选择。…

2023如何选择适合自己的浪涌保护器

浪涌保护器对许多人来说并不熟悉&#xff0c;但是如果您担心您拥有的电子设备和家用电器的安全性并保护它们免受电涌的影响&#xff0c;那么您必须了解电涌保护器以及它们的工作原理。 在没有电涌保护器的情况下直接使用昂贵的电子家用电器是危险的&#xff0c;即使发生电涌的…

【taro react】---- 解决H5接入uni-app版本的IM

1. 问题 由于项目开发比较紧张&#xff0c;腾讯 IM 的接入就使用了 TUIKit 含UI集成方案&#xff0c;遇到的问题&#xff0c;uni-app的UI本来就是一个单独的项目&#xff0c;需要集成到现有的 Taro React 中&#xff0c;就只能作为一个独立的项目&#xff0c;不跳转时不影响原有…

原型模式(七)

不管怎么样&#xff0c;都要继续充满着希望 上一章简单介绍了抽象工厂模式(六), 如果没有看过,请观看上一章 一. 原型模式 引用 菜鸟教程里面的原型模式介绍: https://www.runoob.com/design-pattern/prototype-pattern.html 原型模式&#xff08;Prototype Pattern&#xf…