Canal入门使用

news2025/1/23 13:05:21

说明:canal [kə’næl],译意为水道/管道/沟渠,主要用途是基于 MySQL 数据库增量日志解析,提供增量数据订阅和消费(官方介绍)。一言以蔽之,Canal是一款实现数据同步的组件。可以实现数据库之间、数据库与Redis、ES之间的数据同步。本文介绍Canal的入门使用。

Canal介绍

Canal实现原理是伪装成MySQL主节点的从节点,接收主节点的binlog日志,解析、提取数据库操作,将对数据库的操作通过代码更新到其他组件中,如其他数据库、ES、Redis等,官方解释如下:

在这里插入图片描述

官方提供的结构图如下:

在这里插入图片描述

Canal安装

首先,从官网上下载Canal服务器,地址:https://github.com/alibaba/canal/releases

在这里插入图片描述

下载下来,解压,如下:

在这里插入图片描述

canal配置文件暂时不用管,先修改一下example文件中监测的目前节点配置,修改成自己需要监测的MySQL配置,如下:

在这里插入图片描述

修改完,启动canal服务,双击startup.bat文件,如下:

在这里插入图片描述

Canal使用

只要你的MySQL服务器的IP、账号密码没输错,且测试过能用Navicat或其他数据库连接工具成功连接数据库,那么就可以进行下面的编码工作了。

首先,创建一个Maven项目,pom.xml如下,导个canal依赖就行了

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>

    <groupId>com.hezy</groupId>
    <artifactId>canal_demo</artifactId>
    <version>1.0-SNAPSHOT</version>

    <properties>
        <maven.compiler.source>11</maven.compiler.source>
        <maven.compiler.target>11</maven.compiler.target>
        <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
    </properties>

    <dependencies>
        <!--canal客户端-->
        <dependency>
            <groupId>top.javatool</groupId>
            <artifactId>canal-spring-boot-starter</artifactId>
            <version>1.2.1-RELEASE</version>
        </dependency>
    </dependencies>

</project>

测试代码如下,用来连接canal服务器,打印canal监测到的数据内容;

import com.alibaba.otter.canal.client.CanalConnector;
import com.alibaba.otter.canal.client.CanalConnectors;
import com.alibaba.otter.canal.protocol.CanalEntry;
import com.alibaba.otter.canal.protocol.Message;
import com.google.protobuf.ByteString;
import com.google.protobuf.InvalidProtocolBufferException;

import java.net.InetSocketAddress;
import java.util.HashMap;
import java.util.List;

/**
 * Canal处理器
 * 作用:打印canal服务器监测到的数据
 */
public class CanalHandler {
    public static void main(String[] args) throws InvalidProtocolBufferException {

        // 1.创建连接
        CanalConnector canalConnector = CanalConnectors
                .newSingleConnector(new InetSocketAddress("localhost", 11111), "example", "", "");

        // 2.抓取数据
        while (true) {

            // 3.开始连接
            canalConnector.connect();

            // 4.订阅数据,所有的库和表
            canalConnector.subscribe(".*\\..*");

            // 5.抓取数据,每次抓取100条
            Message message = canalConnector.get(100);

            // 6.获取entry集合
            List<CanalEntry.Entry> entries = message.getEntries();

            // 7.判断是否有数据
            if (entries.size() == 0) {
                System.out.println(">>>暂无数据<<<");
                try {
                    Thread.sleep(1000);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            } else {
                // 8.解析数据
                for (CanalEntry.Entry entry : entries) {
                    // 获取表名
                    String tableName = entry.getHeader().getTableName();
                    // 获取操作类型
                    CanalEntry.EntryType entryType = entry.getEntryType();

                    // 判断entryType是否为ROWDATA
                    if (CanalEntry.EntryType.ROWDATA.equals(entryType)) {
                        // 序列化数据
                        ByteString storeValue = entry.getStoreValue();
                        // 反序列化数据
                        CanalEntry.RowChange rowChange = CanalEntry.RowChange.parseFrom(storeValue);

                        // 获取事件类型
                        CanalEntry.EventType eventType = rowChange.getEventType();
                        // 获取具体的数据
                        List<CanalEntry.RowData> rowDatasList = rowChange.getRowDatasList();
                        // 遍历打印
                        for (CanalEntry.RowData rowData : rowDatasList) {
                            // 获取拉取前后的数据
                            List<CanalEntry.Column> beforeColumnsList = rowData.getBeforeColumnsList();
                            List<CanalEntry.Column> afterColumnsList = rowData.getAfterColumnsList();

                            // 用Map存储每条数据
                            HashMap<String, Object> beforeMap = new HashMap<>();
                            HashMap<String, Object> afterMap = new HashMap<>();

                            // 获取不同操作的数据
                            if (CanalEntry.EventType.INSERT.equals(eventType)) {
                                System.out.println("【" + tableName + "】表插入数据");
                                for (CanalEntry.Column column : afterColumnsList) {
                                    afterMap.put(column.getName(), column.getValue());
                                }
                                System.out.println("新增数据:" + afterMap);
                            } else if (CanalEntry.EventType.UPDATE.equals(eventType)) {
                                System.out.println("【" + tableName + "】表更新数据");
                                for (CanalEntry.Column column : beforeColumnsList) {
                                    beforeMap.put(column.getName(), column.getValue());
                                }
                                System.out.println("更新前:" + beforeMap);
                                System.out.println("----");
                                for (CanalEntry.Column column : afterColumnsList) {
                                    afterMap.put(column.getName(), column.getValue());
                                }
                                System.out.println("更新后:" + afterMap);
                            } else if (CanalEntry.EventType.DELETE.equals(eventType)) {
                                System.out.println("【" + tableName + "】表删除数据");
                                for (CanalEntry.Column column : beforeColumnsList) {
                                    beforeMap.put(column.getName(), column.getValue());
                                }
                                System.out.println("被删除的数据:" + beforeMap);
                            }
                        }
                    }
                }
            }
        }
    }
}

启动程序,查看控制台,检测中……

在这里插入图片描述

使用Navicat连接数据库,查看数据库test库,i_user表内容;

在这里插入图片描述

此时,我们新增一条数据,看控制台,canal成功接收到了这次修改;

在这里插入图片描述

更新数据;

在这里插入图片描述

删除数据;

在这里插入图片描述

头能过身体就能过,接下来不就好办了。将Canal接收到的数据转为对象,根据不同的操作类型分发给自己想要同步的组件,同步给从MySQL,就调用对应的Mapper;同步给Redis,就调用Redis对应的方法,ES同样。

总结

本文介绍了Canal入门使用,参考B站视频:

  • Canal极简入门:一小时让你快速上手Canal数据同步神技~

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/1630350.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

网络相关知识总结

1、网口设置 网口设置IP&#xff0c;即操作/etc/sysconfig/network-scripts路径下的ifcfg-xx文件 主要参数详解&#xff1a; DEVICE:网口名 ONBOOT&#xff1a;表示启动系统时是否激活网卡&#xff0c;yes为激活&#xff0c;no不激活 HWADDR:mac值 DEFROUTE://默认路由设置…

[C++基础学习]----01-C++数据类型详解

前言 C是一种静态类型的编程语言&#xff0c;它提供了丰富的数据类型来存储和操作数据。这些数据类型为C程序员提供了丰富的选择&#xff0c;可以根据具体需求来选择最合适的类型来存储和操作数据。下面详细解释一些常见的C数据类型&#xff0c;包括其原理和使用方法&#xff1…

ADOP带您科普什么是光纤网卡,它跟普通网卡有什么区别?

光纤网卡&#xff0c;也称为网络适配器或网络接口卡&#xff08;NIC&#xff09;&#xff0c;是一种用于将计算机和服务器等设备连接到数据网络的硬件设备。它通常装有一个或多个端口&#xff0c;可以通过这些端口连接不同类型的网络线缆&#xff0c;如RJ45接口的网络跳线或SFP…

云贝餐饮连锁V2-2.9.9源码

云贝餐饮连锁V2独立版、版本更新至2.9.9&#xff0c;小程序、公众号版本&#xff0c;全插件&#xff0c;公众号小程序端&#xff0c;独立版&#xff1b; 带商家端&#xff0c;修复收银台、排队点餐、堂食点餐&#xff1b;最新版更新 搭建环境教程: 系统环境&#xff1a;CentO…

Wi-Fi HaLow:重塑物联网的未来

Wi-Fi HaLow&#xff1a;引领物联网连接的革命 数字时代的蓬勃发展正在引发一场深刻的变革&#xff0c;物联网已经融入到我们的日常生活和工作中&#xff0c;成为不可或缺的一部分。随着新一代Wi-Fi技术一Wi-Fi HaLow的崭露头角&#xff0c;有望在2024年及未来&#xff0c;重新…

stm32f4单片机强制类型转换为float程序跑飞问题

如题&#xff0c;在一个数据解析函数中使用了*(float *)&data[offset]&#xff0c;其中data为uint8类型指针&#xff0c;指向的value地址为 可以看到地址0x20013A31非对齐&#xff0c;最终在执行VLDR指令时导致跑飞 VLDR需要使用对齐访问 跑飞后查看SCB寄存器发现确实是非…

磁盘未格式化,数据恢复大揭秘

一、磁盘未格式化现象概述 在日常使用电脑的过程中&#xff0c;我们有时会遇到磁盘未格式化的提示&#xff0c;这意味着我们的磁盘突然间变得不可识别&#xff0c;所有的数据和文件都似乎消失了。这种情况常常发生在外接硬盘、U盘等存储设备上&#xff0c;给我们的工作和生活带…

LC 142. 环形链表 II

142. 环形链表 II 给定一个链表的头节点 head &#xff0c;返回链表开始入环的第一个节点。 如果链表无环&#xff0c;则返回 null。 如果链表中有某个节点&#xff0c;可以通过连续跟踪 next 指针再次到达&#xff0c;则链表中存在环。 为了表示给定链表中的环&#xff0c;评…

C++ ─── 隐式类型转换、static成员、友元、内部类

目录 1. explicit关键字 2. static成员 2.1 概念 2.2 特性 3. 友元 3.1 友元函数 3.2 友元类 4. 内部类 5. 再次理解类和对象 1. explicit关键字 构造函数不仅可以构造与初始化对象&#xff0c;对于接收单个参数的构造函数&#xff0c;还具有类型转换的作用。接收单个…

基因组组装:NextDenovo2 使用大全

简介 NextDenovo 是一种针对长序列读取&#xff08;包括CLR和ONT技术&#xff09;的新型基因组组装工具。它采取了一种“先校正错误再进行组装”的方法&#xff0c;这与canu工具类似&#xff0c;但对于PacBio HiFi读取数据则无需进行校正。相较于其他工具&#xff0c;NextDenov…

使用 Vitepress 构建博客并部署到 github 平台

前言 最近写了好多篇 Chrome 浏览器插件相关的文章&#xff0c;有十几二十篇&#xff0c;就想着构建个博客&#xff0c;用来放置相应的文章。 正好前段时间看到 VitePress 1.0.0 发布了&#xff0c;而且是用 markdown 写文章&#xff0c;正好写插件文章的时候文章都是 md 格式…

达梦数据查询语句不带模式名称,报错无效的表或视图名[某某表]

[执行语句1]: select * from sys_config 执行失败(语句1) -2106: 第2 行附近出现错误: 无效的表或视图名[SYS_CONFIG]1条语句执行失败 解决方案&#xff1a; 保证模式名和用户名一致&#xff0c;而且你当前登录的用户要和模式名一致 把用户换成一样的&#xff0c;查询就可以不 …

如何利用 GPT 自我提高写作能力

GPT革命&#xff1a;如何用AI技术重新定义写作 介绍 在我们的数字时代&#xff0c;了解自我提高写作的必要性至关重要。 随着 GPT 的兴起&#xff0c;我们正在见证书写的变革时代。 这篇扩展文章深入探讨了 GPT 如何显着提高写作技能。 拥抱未来&#xff1a; 人工智能时代的写…

HarmonyOS 应用开发——入门

首先当然是华为的官方文档了&#xff0c;要认真学习: https://developer.huawei.com/consumer/cn/doc/harmonyos-guides-V2/start-overview-0000001478061421-V2 不想花时间看&#xff0c;可以看我下面总结的干货&#xff0c;哈哈 第一个问题&#xff1a;stage架构和fa架构的区…

Linux下的常用基本指令

基本指令 前言ls 指令语法功能常用选项举例注意要点关于拼接关于 -a关于文件ls与/的联用ls与根目录ls与任意文件夹ls与常用选项与路径 ls -d与ls -ldls与ll pwd命令语法功能常用选项注意要点window与Linux文件路径的区别家目录 cd 指令语法功能举例注意要点cd路径.. .相对路径与…

【中级软件设计师】上午题12-软件工程(1):软件工程模型、敏捷方法、软件需求、系统设计

上午题12-软件工程&#xff08;1&#xff09; 1 软件过程1.1 CMM 能力成熟度模型1.1 CMMI (建议直接看思维导图&#xff09; 2 软件过程模型2.1 瀑布模型2.2 增量模型2.3 演化模型2.3.1 原型模型2.3.2 螺旋模型 2.5 喷泉模型 3 统一过程&#xff08;UP&#xff09;模型4 敏捷方…

Kafka报错ERROR Exiting Kafka due to fatal exception during startup

报错&#xff1a; ERROR Exiting Kafka due to fatal exception during startup. (kafka.Kafka$) kafka.common.InconsistentClusterIdException: The Cluster ID FSzSO50oTLCRhRnRylihcg doesnt match stored clusterId Some(0oSLohwtQZWbIi73YUMs8g) in meta.properties. Th…

zabbix监控Tongweb7企业版(by lqw+sy)

此贴参考zabbix通过jmx监控Tongweb7企业版&#xff08;by lqw&#xff09;&#xff0c;是在此帖子的基础和同事整理的文档基础上重新部署验证的优化版&#xff0c;使用的是centos7。 优点&#xff1a; 1.不需要通过jmx配置进行监控。&#xff08;jmx配置需要修改tongweb的配置…

鸿蒙开发实战

问题&#xff1a; 1&#xff0c;鸿蒙DevEco Studio 机测试Failure[INSTALL_FAILED_APP_SOURCE_NOT_TRUSTED] 勾选☑️ 勾选自动签名&#xff0c;然后自动跳转登录华为网站&#xff0c;登录即可。 //持续更新&#xff01;

长图高效切割新体验:支持按随机宽度灵活裁切,释放无限创意与效率

图像的传播已经成为我们日常生活的一部分。而长图&#xff0c;作为一种特殊的图像形式&#xff0c;其独特的展示方式能够吸引更多的目光。但是&#xff0c;如何将长图高效切割&#xff0c;以展现其独特的魅力呢&#xff1f;现在&#xff0c;我们为您带来了一款支持按随机宽度切…