Mysql实时数据同步工具Alibaba Canal 使用

news2025/1/27 13:06:59

目录

  • Mysql实时数据同步工具Alibaba Canal 使用
    • Canal是什么?
      • 工作原理
      • 重要版本更新说明
    • 环境准备
    • 安装Canal
      • window
    • Java : Canal Client 集成
      • 依赖
      • 编码
    • 工作流程
    • 其他学习canal资料

个人主页: 【⭐️个人主页】
需要您的【💖 点赞+关注】支持 💯


在这里插入图片描述

Mysql实时数据同步工具Alibaba Canal 使用

📖 本文核心知识点:

  • Canal 是什么
  • 安装Canal 服务
  • 使用Canal 客户端
  • 原生集成数据MQ
  • 同步数据客户端服务【业务】

Canal是什么?

canal [kə’næl],译意为水道/管道/沟渠,主要用途是基于 MySQL 数据库增量日志解析,提供增量数据订阅和消费

早期阿里巴巴因为杭州和美国双机房部署,存在跨机房同步的业务需求,实现方式主要是基于业务 trigger 获取增量变更。从 2010 年开始,业务逐步尝试数据库日志解析获取增量变更进行同步,由此衍生出了大量的数据库增量订阅和消费业务。

基于日志增量订阅和消费的业务包括

  • 数据库镜像
  • 数据库实时备份
  • 索引构建和实时维护(拆分异构索引、倒排索引等)
  • 业务 cache 刷新
  • 带业务逻辑的增量数据处理

当前的 canal 支持源端 MySQL 版本包括 5.1.x , 5.5.x , 5.6.x , 5.7.x , 8.0.x

工作原理

MySQL主备复制原理

  • MySQL master 将数据变更写入二进制日志( binary log, 其中记录叫做二进制日志事件binary log events,可以通过 show binlog events 进行查看)
  • MySQL slave 将 master 的 binary log events 拷贝到它的中继日志(relay log)
  • MySQL slave 重放 relay log 中事件,将数据变更反映它自己的数据
    canal 工作原理
  • canal 模拟 MySQL slave 的交互协议,伪装自己为 MySQL slave ,向 MySQL master 发送dump 协议
  • MySQL master 收到 dump 请求,开始推送 binary log 给 slave (即 canal )
  • canal 解析 binary log 对象(原始为 byte 流)

重要版本更新说明

canal 1.1.x 版本(release_note),性能与功能层面有较大的突破,重要提升包括:

  • 整体性能测试&优化,提升了150%. #726 参考: Performance
  • 原生支持prometheus监控 #765 Prometheus QuickStart
  • 原生支持kafka消息投递 #695 Canal Kafka/RocketMQ QuickStart
  • 原生支持aliyun rds的binlog订阅 (解决自动主备切换/oss binlog离线解析) 参考: Aliyun RDS QuickStart
  • 原生支持docker镜像 #801 参考: Docker QuickStart
    canal 1.1.4版本,迎来最重要的WebUI能力,引入canal-admin工程,支持面向WebUI的canal动态管理能力,支持配置、任务、日志等在线白屏运维能力, Canal admin guide

环境准备

安装Canal

DownLoad

版本: 1.1.7

window

  1. 下载 tar.gz包,解压
    GitHub Canal
  2. 配置文件设置:
    解压完后修改配置文件
    查看conf/canal.properties,其中canal.port是客户端连接的端口,需要放开,canal.admin.usercanal.admin.passwd是客户端连接的账号
    在这里插入图片描述
    再打开conf/example/ instance.properties, master.address填数据库地址,dbUsernamedbPassword是数据库账号,flter.regex可以用来过滤数据库,默认是监听所有数据库,如果想监听db_开头的数据可以这么写db_.*\\..*,多个用逗号分隔
    在这里插入图片描述
  3. 启动服务 bin/startup.bat
    log/canal.log
    在这里插入图片描述

Java : Canal Client 集成

依赖

    implementation 'com.alibaba.otter:canal.client:1.1.7'
    implementation 'com.alibaba.otter:canal.protocol:1.1.7'

具体的数据库数据变化 业务实现方面需要 自己手动去实现,仅展示自己使用的部分。

需要注意: 如果是多个客户端同时使用,要注意:多个客户端会出现某个客户端 把消息全部消费,而别的客户端没有消息消费的情况,这里需要特别注意

编码

package com.kongxiang.infrastructure.canal;

import com.alibaba.otter.canal.client.CanalConnector;
import com.alibaba.otter.canal.client.CanalConnectors;
import com.alibaba.otter.canal.protocol.CanalEntry;
import com.alibaba.otter.canal.protocol.Message;
import com.google.protobuf.ByteString;
import com.google.protobuf.InvalidProtocolBufferException;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.ThreadUtils;
import org.springframework.scheduling.annotation.Async;
import org.springframework.stereotype.Component;

import java.net.InetSocketAddress;
import java.time.Duration;
import java.util.List;
import java.util.function.Consumer;


/**
 * @author 孔翔
 * @since 2023-12-27
 * copyright for author : 孔翔 at 2023-12-27
 * study-spring3
 */
@Component
@Slf4j
public class CanalService {

    private String canalMonitorHost = "localhost";
    private int canalMonitorPort = 11111;

    private String filterRegexTable = "xkongdb\\..*";


    private final static int BATCH_SIZE = 10000;


    @Async("canalTask")
    public void startCanal() {
        Consumer<CanalConnector> connectorConsumer = new ConsumerTask();
        while (true) {
            executeCanal(connectorConsumer);
            try {
                //防止频繁访问数据库链接: 线程睡眠 10秒
                ThreadUtils.sleep(Duration.ofSeconds(10));
                log.debug("防止频繁访问数据库链接: 线程睡眠 10秒");
            } catch (InterruptedException e) {
                e.printStackTrace();
                Thread.currentThread().interrupt();
            }
        }
    }

    public void executeCanal(Consumer<CanalConnector> runnable) {
        CanalConnector connector = CanalConnectors.newSingleConnector(new InetSocketAddress(canalMonitorHost, canalMonitorPort), "example", "admin", "4ACFE3202A5FF5CF467898FC58AAB1D615029441");
        try {
            //打开连接
            connector.connect();
            log.debug("数据库检测连接成功!" + filterRegexTable);
            //订阅数据库表,全部表q
            connector.subscribe(filterRegexTable);
            //回滚到未进行ack的地方,下次fetch的时候,可以从最后一个没有ack的地方开始拿
            connector.rollback();
            if (runnable != null) {
                runnable.accept(connector);
            }
        } catch (Exception e) {
            e.printStackTrace();
            log.error("成功断开监测连接!尝试重连");
        } finally {
            connector.disconnect();
        }
    }

    public static class ConsumerTask implements Consumer<CanalConnector> {
        public void handleMessage(List<CanalEntry.Entry> entries) throws InvalidProtocolBufferException {
            for (CanalEntry.Entry entry : entries) {
                if (entry.getEntryType() == CanalEntry.EntryType.TRANSACTIONBEGIN || entry.getEntryType() == CanalEntry.EntryType.TRANSACTIONEND) {
                    continue;
                }
                //根据数据库名获取租户名
                String databaseName = entry.getHeader().getSchemaName();
                String tableName = entry.getHeader().getTableName();
                log.info("数据库: {}, 表名: {}", databaseName, tableName);
                // 获取类型
                CanalEntry.EntryType entryType = entry.getEntryType();

                // 获取序列化后的数据
                ByteString storeValue = entry.getStoreValue();
                if (CanalEntry.EntryType.ROWDATA.equals(entryType)) {
                    // 反序列化数据
                    CanalEntry.RowChange rowChange = CanalEntry.RowChange.parseFrom(storeValue);
                    // 获取当前事件的操作类型
                    CanalEntry.EventType eventType = rowChange.getEventType();
                    if (eventType == CanalEntry.EventType.INSERT || eventType == CanalEntry.EventType.UPDATE
                            || eventType == CanalEntry.EventType.DELETE) {
                        // 获取数据集
                        List<CanalEntry.RowData> rowDataList = rowChange.getRowDatasList();
                        // 遍历rowDataList,并打印数据集
                        for (CanalEntry.RowData rowData : rowDataList) {
                            List<CanalEntry.Column> afterColumnsList = rowData.getAfterColumnsList();
                            List<CanalEntry.Column> beforeColumnsList = rowData.getBeforeColumnsList();
                            // 变更前数据
                            for (CanalEntry.Column column : beforeColumnsList) {
                                log.info("变更前数据: name: {}, value: {} ,update {}", column.getName(), column.getValue(), column.getUpdated());
                            }
                            // 变更后数据
                            for (CanalEntry.Column column : afterColumnsList) {
                                log.info("变更后数据: name: {}, value: {} ,update {}", column.getName(), column.getValue(), column.getUpdated());
                            }
                        }
                    }
                }
            }
        }


        @Override
        public void accept(CanalConnector connector) {
            while (true) {
                // 获取指定数量的数据
                Message message = connector.getWithoutAck(BATCH_SIZE);
                long batchId = message.getId();
                int size = message.getEntries().size();
                if (batchId == -1 || size == 0) {
                } else {
                    try {
                        log.debug("从canal接收到: {} 条消息,消息批次: {},开始处理", size, message.getId());
                        handleMessage(message.getEntries());
                    } catch (Exception e) {
                        connector.rollback(batchId); // 处理失败, 回滚数据
                    }
                }
                // 提交确认
                connector.ack(batchId);
            }
        }
    }
}

测试代码

@Test
public class CanalTest {

    @Test
    public void testListener() {
        CanalService canalService = new CanalService();
        canalService.startCanal();
    }
}

测试结果

  1. xkongdb的数据表的数据进行 insert,update,delete的时候,就会触发canal任务执行。
  2. 日志 在这里插入图片描述

工作流程

在这里插入图片描述

其他学习canal资料

【开源实战】阿里开源MySQL中间件Canal快速入门
mysql的binlog开启方式

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/1338704.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

7.7、kali linux环境下搭建DVWA

目录 一、资料下载准备工作 1.1、DVWA源代码下载 二、开启Apache、mysql服务 2.1、下载Apache2文件 2.2、开启Apache2服务 方法一&#xff1a;开启Apache2服务&#xff08;手动&#xff09; 方法二&#xff1a;开启Apache2服务&#xff08;系统自启动&#xff09; 2.3、…

Linux进行模型微调前的环境准备

在Linux机器上对模型进行微调前&#xff0c;首先需要准备环境&#xff0c;即安装相关的软件。因为linux是一个无界面操作系统&#xff0c;软件安装完成后&#xff0c;还需要有便捷的交互方式编写脚本&#xff0c;调试脚本。此篇博客将专门介绍如何快速安装所需依赖软件&#xf…

黑马程序员SSM框架-Spring

视频链接&#xff1a;Spring-00-Spring课程介绍_哔哩哔哩_bilibili Spring Framework系统架构以及学习顺序 核心概念&#xff08;IoC、DI、Bean&#xff09; IoC入门案例 导入坐标 提供需要被管理的类&#xff08;Dao&#xff09;和需要被注入的类&#xff08;Service&#x…

Unity中Shader裁剪空间推导(在Shader中实现)

文章目录 前言一、在Shader中&#xff0c;手动把正交相机的坐标转化到裁剪空间1、我们在属性面板定义一个变量&#xff0c;用于传入摄像机的信息2、获取h、r、w、n、f3、获取OpenGL下的转化矩阵4、 获取DirectX下的转化矩阵5、手动将观察空间下的坐标转换到裁剪空间下6、这里为…

Java 对象内存布局

在虚拟机中&#xff0c;Java对象在内存中的布局可以分为三块&#xff1a; 对象头&#xff08;Header&#xff09; &#xff1a;包含 markword 标记字段和类型指针&#xff0c;32 位上大小是 8 个字节&#xff0c;64 位 16 个字节&#xff0c;实例数据&#xff08;Instance Dat…

sql_lab之sqli中的搜索型注入

搜索型注入 原理是运用模糊查询&#xff1a; select * from users where username like %a% 1.找到具有模糊查询的搜索框的注入点 2.构造闭合 因为模糊查询的代码是 select * from users where username like %a% 所以应该 鱼%’ -- s 判断构造闭合的函数是否正确 鱼%…

【DeepLearning】Deep Residual Learning for Image Recognition恺神大作学习

[TOC] Deep Residual Learning for Image Recognition 论文 1. 文章主要想解决什么问题&#xff0c;用了什么方法 深度神经网络在训练过程中的3个关键问题&#xff1a; 梯度消失/爆炸问题&#xff1a;随着网络层数的增加&#xff0c;梯度在反向传播过程中可能会变得非常小&a…

浅谈政企风险防控体系

政企风控体系是集团客户部在生产运营过程中对政企产品可能面临的风险进行监测&#xff0c;针对监测发现的风险&#xff0c;制定相应的管控措施&#xff0c;并跟进落实&#xff0c;以最大限度地减少或消除风险对组织的影响的管控体系。 本文之所以称为“浅谈”是因为文中主要围绕…

一条查询SQL是如何执行的?更新、新增、删除呢?

你好&#xff0c;我是田哥 本文共14629字&#xff0c;读完预计需要37分钟&#xff0c;建议先收藏。 大部分朋友估计都只知道写sql然后执行&#xff0c;但是并不知道MySQL背后到底是怎么实现的。 八股文中也有这么一道题&#xff1a;在MySQL中&#xff0c;一条SQL到底是如何执行…

【大数据存储与处理】开卷考试总复习笔记

文章目录 实验部分一、 HBase 的基本操作1. HBase Shell入门2. HBase创建数据库表3. HBase数据操作4. HBase删除数据库表5. HBase Python基本编程 before二、 HBase 过滤器操作1.创建表和插入数据2.行键过滤器3.列族与列过滤器4.值过滤器5.其他过滤器6.python hbase 过滤器编程…

通过 conda 安装 的 detectron2

从 detectron2官网 发现预编译的版本最高支持 pytorch1.10、cuda11.3。&#xff08;2023-12-26&#xff09; 1、安装 conda 环境。 conda create --name detectron2 python3.8 2、安装 pytorch1.10 和 cuda11.3。 pip3 install torch1.10.0cu113 torchvision0.11.1cu113 torc…

Hadoop之HDFS 详细教程

1、HDFS概述 Hadoop 分布式系统框架中&#xff0c;首要的基础功能就是文件系统&#xff0c;在 Hadoop 中使用FileSystem 这个抽象类来表示我们的文件系统&#xff0c;这个抽象类下面有很多子实现类&#xff0c;究竟使用哪一种&#xff0c;需要看我们具体的实现类&#xff0c;在…

CTFshow-pwn入门-栈溢出pwn41-pwn42

pwn41 我们首先将pwn文件下载下来&#xff0c;拖入到虚拟机查看一下文件的保护信息。 chmod x pwn checksec pwn该文件只开启了栈不可执行&#xff0c;并且文件是32位的。 我们把文件托到ida32中查看一下反编译代码。 int __cdecl main(int argc, const char **argv, const c…

大数据开发之Sqoop详细介绍

测试环境 CDH 6.3.1 Sqoop 1.4.7 一.Sqoop概述 Apache Sqoop&#xff08;SQL-to-Hadoop&#xff09;项目旨在协助RDBMS与Hadoop之间进行高效的大数据交流。用户可以在 Sqoop 的帮助下&#xff0c;轻松地把关系型数据库的数据导入到 Hadoop 与其相关的系统 (如HBase和Hive)中&…

呼叫中心知识库管理

呼叫中心向客户提供所需服务与支持的过程中&#xff0c;会遇到形形色色的客户和各式各样的问题&#xff0c;需要客服人员做出回答。客户对于客服人员的答复是否满意直接关系着呼叫中心的业绩&#xff0c;甚至企业的兴衰。而OKCC的知识库管理正是基于呼叫中心的这一需求而存在&a…

Python range函数新手指南:详细解析内部机制

更多资料获取 &#x1f4da; 个人网站&#xff1a;ipengtao.com range函数是Python中常用的内置函数之一&#xff0c;用于生成一系列连续的整数。本文将深入探讨range函数的内部实现&#xff0c;以揭示其工作原理和效率。 range函数的基本用法 range函数的基本用法。它通常使…

async和await的使用

async和await是promise的一种语法糖,也就是更简单易懂的写法。 在很多项目中,你会经常看到async和await的配合使用,看到原始的promise写法反而不多,就是因为async-await这种写法是用同步的语法去实现异步的逻辑。 基础使用 原生promise写法 let value nulllet proFn new P…

最新腾讯云2核2G轻量服务器性价比首选62元一年!

继昨天阿里云2核2G3M带宽轻量应用服务器降价到63元后&#xff0c;腾讯云迅速做出响应&#xff0c;腾讯云轻量2核2G3M服务器降价到62元一年&#xff0c;1元之差&#xff0c;你选择阿里云还是腾讯云&#xff1f;值得一提的是&#xff0c;阿里云不限制月流量&#xff0c;但是腾讯云…

【廖雪峰Java】Java基础知识

学习课程&#xff1a;廖雪峰的官方网站&#xff1a;https://www.liaoxuefeng.com/ 1、简介 1.1、Java之父 詹姆斯高斯林&#xff08;James Gosling&#xff09; 1.2、Java三个不同版本 Java SE&#xff1a;Standard EditionJava EE&#xff1a;Enterprise EditionJava ME&a…

openGauss学习笔记-174 openGauss 数据库运维-备份与恢复-导入数据-管理并发写入操作

文章目录 openGauss学习笔记-174 openGauss 数据库运维-备份与恢复-导入数据-管理并发写入操作174.1 事务隔离说明174.2 写入和读写操作174.3 并发写入事务的潜在死锁情况 openGauss学习笔记-174 openGauss 数据库运维-备份与恢复-导入数据-管理并发写入操作 174.1 事务隔离说…