实战:MySQL数据同步神器之Canal

news2024/9/22 9:36:01

1.概叙

场景一:数据增量实时同步

项目中业务数据量比较大,每类业务表都达到千万级别,虽然做了分库分表,每张表数据控制在300W以下,但是效率还是达不到要求,为了提高查询效率,打算使用ES进行数据查询。

那么这个时候问题来了,怎么把MySQL中增量数据同步到ES?

场景二:缓存一致性问题

Java web应用性能分析之【高并发之缓存-多级缓存】_java多级缓存-CSDN博客

在前面文章中有提到这个场景,如何保证redis、EhCache、MySQL数据一致?

我们都知道作为数据库写操作,是不通过缓存的。假设商品服务实例 1 将 1 号商品价格调整为 80 元,这会衍生一个新问题:如何主动向应用程序推送数据变更的消息来保证它们也能同步更新缓存呢?

针对上面两种场景,可以看看canal的解决方案。

什么是Canal?

canal [kə'næl],译意为水道/管道/沟渠,主要用途是基于 MySQL 数据库增量日志解析,提供增量数据订阅和消费。项目起源于阿里巴巴内部对于跨机房数据同步的需求,通过解析MySQL的二进制日志(Binary Log),Canal能够捕获并推送数据库的变更事件,满足了诸如数据库镜像、实时备份、索引实时维护等多种业务场景的需求。

GitHub - alibaba/canal: 阿里巴巴 MySQL binlog 增量订阅&消费组件

Home · alibaba/canal Wiki · GitHub

支持范围

Canal当前支持MySQL数据库的多个版本,包括但不限于5.1.x、5.5.x、5.6.x、5.7.x及8.0.x,同时也兼容阿里云RDS等云数据库服务,为用户提供了广泛的数据库兼容性保障。

部分支持MySQL体系数据库:Mariadb 10.x、PolarDB-X

主要特性

高性能与低延迟:Canal 1.1.x版本进行了深度优化,性能提升高达150%。
Prometheus监控:原生集成Prometheus监控,便于系统健康状况的跟踪。
消息系统集成:直接支持Kafka、RocketMQ消息投递,便于与大数据平台对接。
云数据库支持:无缝对接阿里云RDS,解决了自动主备切换及离线Binlog解析问题。
Docker部署:提供Docker镜像,简化部署流程。
WebUI管理:Canal-Admin工程引入WebUI,实现动态配置、任务管理与日志查看等功能。

2.Canal原理

MySQL主备复制原理

  • MySQL master 将数据变更写入二进制日志(binary log), 日志中的记录叫做二进制日志事件(binary log events,可以通过 show binlog events 进行查看)

  • MySQL slave 将 master 的 binary log events 拷贝到它的中继日志(relay log)

  • MySQL slave 重放 relay log 中事件,将数据变更反映到它自己的数据

Canal工作原理

Canal巧妙地模拟了MySQL主从复制的机制。具体而言:

  1. 伪装为MySQL Slave:canal 模拟 MySQL slave 的交互协议,伪装自己为 MySQL slave ,向 MySQL master 发送 dump 协议
  2. 获取Binary Log:MySQL Master接收到请求后,开始推送Binary Log给Canal。
  3. 解析日志事件:Canal解析接收到的Binary Log,将数据变更信息转换为易于处理的结构化数据。canal 解析 binary log 对象(原始为 byte 流),转换为json格式。
  4. 数据同步:Canal 客户端通过 TCP 协议或 MQ 形式监听 Canal 服务端,同步数据到 ES、KFK、HBase、RocketMQ、Pulsar等。

优点: 可以完全和业务代码解耦,增量日志订阅。

缺点:实时性不高,订阅mysql日志,DB中数据事务成功后,开始同步至canal。

Canal总体架构

对应这些软件包:

  1. deployer包:即canal server,负责从master库同步binlog,将数据通过tcp的方式同步给Adapter适配器,经过Adapter适配同步给目标库;通过MQ将数据同步给canal client或目标库,canal client也可以再同步给目标库。
    1. server包官方介绍:https://github.com/alibaba/canal/wiki/DevGuide
  2. adapter包:给目标库同步数据。目前支持:Hbase、RDB、ES。
    1. adapter包官方介绍:https://github.com/alibaba/canal/wiki/ClientAdapter
  3. admin包:canal 1.1.4版本,迎来最重要的WebUI能力,引入canal-admin工程,支持面向WebUI的canal动态管理能力,支持配置、任务、日志等在线白屏运维能力,具体文档:Canal Admin Guide
    1. admin包官方介绍:https://github.com/alibaba/canal/wiki/Canal-Admin-Guide
  4. canal.client:消费server数据
    1. Canal设计了client-server架构,支持多种语言客户端通过protobuf 3.0协议与之交互,官方及社区提供了以下客户端:
    2. Java客户端:ClientExample
    3. C#客户端:CanalSharp
    4. Go客户端:canal-go
    5. Python客户端:canal-python
    6. PHP客户端:canal-php
    7. Rust客户端:canal-rs
    8. Node.js客户端:canal-nodejs
  5. 除了基础功能,Canal还支持丰富的进阶特性和周边生态工具,如:

    Canal-Admin:提供Web界面管理Canal实例,实现配置、监控和运维的可视化操作。
    canal2sql:一个工具项目,能根据Binlog生成SQL,便于数据迁移或备份。
    Otter:Canal的消费端开源项目,用于数据同步与数据集成。

Canal高可用架构

整个 HA 机制的控制主要是依赖了zookeeper的两个特性:watcher、EPHEMERAL节点。canal的 HA 机制实现分为两部分,canal server 和 canal client分别有对应的实现。

canal server实现流程如下:

  • 1、canal server 要启动某个 canal instance 时都先向 zookeeper 进行一次尝试启动判断 (实现:创建 EPHEMERAL 节点,谁创建成功就允许谁启动)。
  • 2、创建 zookeeper 节点成功后,对应的 canal server 就启动对应的 canal instance,没有创建成功的 canal instance 就会处于 standby 状态。
  • 3、一旦 zookeeper 发现 canal server A 创建的节点消失后,立即通知其他的 canal server 再次进行步骤1的操作,重新选出一个 canal server 启动instance。
  • 4、canal client 每次进行connect时,会首先向 zookeeper 询问当前是谁启动了canal instance,然后和其建立链接,一旦链接不可用,会重新尝试connect。

为了减少对mysql dump的请求,不同server上的instance要求同一时间只能有一个处于running,其他的处于standby状态。

依赖:

  • JDK1.8
  • MySQL:用于canal-admin存储配置和节点等相关数据
  • Zookeeper

3.Canal实战

准备环境

名称版本
MySQL5.7
elasticsearch7.17.9
canal1.1.7
jdk1.8
  • 对于自建 MySQL , 需要先开启 Binlog 写入功能,配置 binlog-format 为 ROW 模式,my.cnf 中配置如下

    [mysqld]
    log-bin=mysql-bin # 开启 binlog
    binlog-format=ROW # 选择 ROW 模式
    server_id=1 # 配置 MySQL replaction 需要定义,不要和 canal 的 slaveId 重复
    
    • 注意:针对阿里云 RDS for MySQL , 默认打开了 binlog , 并且账号默认具有 binlog dump 权限 , 不需要任何权限或者 binlog 设置,可以直接跳过这一步
  • 授权 canal 链接 MySQL 账号具有作为 MySQL slave 的权限, 如果已有账户可直接 grant

    CREATE USER canal IDENTIFIED BY 'canal';  
    GRANT SELECT, REPLICATION SLAVE, REPLICATION CLIENT ON *.* TO 'canal'@'%';
    -- GRANT ALL PRIVILEGES ON *.* TO 'canal'@'%' ;
    FLUSH PRIVILEGES;

    下载 canal

本次操作只需要下载admin、deployer两个包。

下载后解压即可用,当然,要运行起来的话,需要配置jdk1.8,这里就不再多说。

创建admin的数据库:conf/canal_manager.sql

启动admin前,先完成建库和见表(包括admin的默认登录账号密码  admin/123456),否则启动报错。主要是如下六张表。

修改配置文件

修改server的配置文件:conf/canal.properties 和conf/example/instance.properties

切记数据库和配置文件中的密码要一致。

修改admin的配置文件:conf/application.yml

分别启动admin和server

启动server:bin/startup.bat

启动admin:bin/startup.bat

登录admin:http://192.168.1.4:8089/#/login?redirect=%2Fdashboard

默认账号密码:admin/123456

配置java客户端

引入依赖

        <dependency>
            <groupId>com.alibaba.otter</groupId>
            <artifactId>canal.client</artifactId>
            <version>1.1.4</version>
        </dependency>

客户端代码

package com.zxx.study.base.canal;


import com.alibaba.fastjson.JSONObject;
import com.alibaba.otter.canal.client.CanalConnector;
import com.alibaba.otter.canal.client.CanalConnectors;
import com.alibaba.otter.canal.protocol.CanalEntry;
import com.alibaba.otter.canal.protocol.Message;
import com.google.protobuf.ByteString;
import com.google.protobuf.InvalidProtocolBufferException;
import lombok.SneakyThrows;

import java.net.InetSocketAddress;
import java.util.List;

/**
 * @author zhouxx
 * @create 2024-08-01 21:59
 */
public class CanalClient  {

    @SneakyThrows
    public static void main(String[] args) {
        try {
            // 创建canal客户端,单链接模式
            CanalConnector canalConnector = CanalConnectors.newSingleConnector(new InetSocketAddress("192.168.1.4",
                    11111), "example", "", "");
            // 创建连接
            canalConnector.connect();
            while (true) {
                // 订阅数据库
                // canalConnector.subscribe("mall");

                // 获取数据
                Message message = canalConnector.get(100);

                // 获取Entry集合
                List<CanalEntry.Entry> entries = message.getEntries();

                // 判断集合是否为空,如果为空,则等待一会继续拉取数据
                if (entries.size() <= 0) {
//                System.out.println("当次抓取没有数据,休息一会。。。。。。");
                    Thread.sleep(1000);
                } else {
                    // 遍历entries,单条解析
                    for (CanalEntry.Entry entry : entries) {

                        //1.获取表名
                        String tableName = entry.getHeader().getTableName();

                        //2.获取类型
                        CanalEntry.EntryType entryType = entry.getEntryType();

                        //3.获取序列化后的数据
                        ByteString storeValue = entry.getStoreValue();

                        //4.判断当前entryType类型是否为ROWDATA
                        if (CanalEntry.EntryType.ROWDATA.equals(entryType)) {

                            //5.反序列化数据
                            CanalEntry.RowChange rowChange = CanalEntry.RowChange.parseFrom(storeValue);

                            //6.获取当前事件的操作类型
                            CanalEntry.EventType eventType = rowChange.getEventType();

                            //7.获取数据集
                            List<CanalEntry.RowData> rowDataList = rowChange.getRowDatasList();

                            //8.遍历rowDataList,并打印数据集
                            for (CanalEntry.RowData rowData : rowDataList) {

                                JSONObject beforeData = new JSONObject();
                                List<CanalEntry.Column> beforeColumnsList = rowData.getBeforeColumnsList();
                                for (CanalEntry.Column column : beforeColumnsList) {
                                    beforeData.put(column.getName(), column.getValue());
                                }

                                JSONObject afterData = new JSONObject();
                                List<CanalEntry.Column> afterColumnsList = rowData.getAfterColumnsList();
                                for (CanalEntry.Column column : afterColumnsList) {
                                    afterData.put(column.getName(), column.getValue());
                                }

                                //数据打印
                                System.out.println("Table:" + tableName +
                                        ",EventType:" + eventType +
                                        ",Before:" + beforeData +
                                        ",After:" + afterData);
                                /**
                                 * Table:test_user,EventType:UPDATE,Before:{"name":"1111221","id":"1"},After:{"name":"11tom","id":"1"}
                                 * Table:test_user,EventType:INSERT,Before:{},After:{"name":"zhouxx","id":"17"}
                                 * Table:test_user,EventType:INSERT,Before:{},After:{"name":"zhouxx1","id":"18"}
                                 * */
                            }
                        }
                    }
                }
            }
        }catch (Exception e){
            e.printStackTrace();
        }

    }

}

运行效果:在数据库里面忝删改查,均可以在客户端中打印出来

cancel框架同步mysql数据到kafka

参考:cancel框架同步mysql数据到kafka_mysql cancel-CSDN博客

利用canal进行MySQL到ES的数据实时同步

参考:利用canal进行MySQL到ES的数据实时同步_canal es-CSDN博客

结合上图,至此,场景一和场景二中的问题,均可以解决。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/1974067.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

Java面试题--JVM大厂篇之破解Java性能瓶颈!深入理解Parallel GC并优化你的应用

目录 引言&#xff1a; 正文&#xff1a; 1. 理解Parallel GC的工作原理 2. 配置Parallel GC 3. 监控和分析GC日志 4. 常见调优技巧 5. 持续迭代和优化 结束语&#xff1a; 补充考虑 1. 综合考虑吞吐量与响应时间 2. 评估和优化垃圾回收频率 3. 动态调整与自适应策…

定期自动巡检,及时发现机房运维管理中的潜在问题

随着信息化技术的迅猛发展&#xff0c;机房作为企业数据处理与存储的核心场所&#xff0c;其运维管理的复杂性和挑战性也与日俱增。为确保机房设备的稳定运行和业务的连续性&#xff0c;运维团队必须定期进行全面的巡检。然而&#xff0c;传统的手工巡检方式不仅效率低下&#…

【卷积神经网络】基于CIFAR10数据集实现图像分类【构建、训练、预测】

文章目录 1、内容简介2、CIFAR10 数据集2.1、数据集概述2.2、代码使用2.2.1、查看数据集基本信息2.2.2、数据加载器2.2.3、完整代码 3、搭建图像分类网络&#x1f53a;3.1、网络结构⭐3.2、代码构建网络⭐ 4、编写训练函数4.1、多分类交叉熵损失函数&#x1f53a;4.2、Adam&…

泛微开发修炼之旅--41Ecology基于触发器实现增量数据同步(人员、部门、岗位、人员关系表、人岗关系表)

一、需求背景 我们在项目上遇到一个需求&#xff0c;需要将组织机构数据&#xff08;包含人员信息、部门信息、分部信息、人岗关系&#xff09;生成的增量数据&#xff0c;实时同步到三方的系统中&#xff0c;三方要求&#xff0c;只需要增量数据即可。 那么基于ecology系统&a…

【C++高阶】:C++11的深度解析上

✨ 心似白云常自在&#xff0c;意如流水任东西 &#x1f30f; &#x1f4c3;个人主页&#xff1a;island1314 &#x1f525;个人专栏&#xff1a;C学习 &#x1f680; 欢迎关注&#xff1a;&#x1f44d;点赞 &#x1f4…

数说故事|引爆社媒的森贝儿IP,品牌如何实现流量变现?

以可爱、雅痞、贱萌......的外表加魔性舞姿出圈的可爱小狗——森贝儿贵宾犬Milo&#xff0c;用“可爱微怒”的表情演绎着当代打工人的“疯态”&#xff0c;并迅速晋升成不少打工人高频使用的表情包。 最近几年&#xff0c;“萌系”爆款IP频出&#xff0c;用小动物的形象、可爱…

一键生成视频并批量上传视频抖音、bilibili、腾讯(已打包)

GenerateAndAutoupload Github地址&#xff1a;https://github.com/cmdch2017/GenerateAndAutoupload 如何下载&#xff08;找到最新的release&#xff09; https://github.com/cmdch2017/GenerateAndAutoupload/releases/download/v1.0.1/v1.0.1.zip 启动必知道 conf.py …

Redis学习[5] ——Redis过期删除和内存淘汰

六、Redis过期键值删除 6.1 Redis的过期键值删除策略 6.1.1 什么是过期键值删除&#xff1f; Redis中是可以对key设置过期时间的&#xff0c;所以需要有相应的机制将已过期的键值对删除&#xff0c;也就是**过期键值删除策略。Redis会用一个过期字典&#xff08;expires dic…

如何改网络的ip地址:实用方法与步骤解析

在数字化时代&#xff0c;网络IP地址作为设备在互联网上的唯一标识&#xff0c;其重要性不言而喻。然而&#xff0c;在某些特定场景下&#xff0c;如网络测试、隐私保护或突破地域限制等&#xff0c;我们可能需要更改网络IP地址。那么&#xff0c;如何安全、有效地实现这一操作…

学习日志:update 没加索引会锁全表

文章目录 前言一、为什么会发生这种的事故如何避免这种事故的发生&#xff1f;总结 前言 在线上执行一条 update 语句修改数据库数据的时候&#xff0c;where 条件没有带上索引&#xff0c;导致业务直接崩了 为什么会发生这种的事故&#xff1f; 又该如何避免这种事故的发生&a…

html+css練習:iconfont使用

1.網址地址&#xff1a;https://www.iconfont.cn/search/index 2.註冊登錄&#xff0c;將需要的圖標添加到購物車 3.下載代碼 4.下載后的代碼有一個html頁面&#xff0c;裡面有詳細的使用方式

Linux进程间通信学习2

文章目录 共享内存信号信号概述以及种类信号的处理信号相关函数&#xff08;简单&#xff09;运用小demo实现ctrlc无法终止进程使用kill函数在程序内部实现一个进程杀死另外一个进程 信号相关函数高级版运用函数小demo 信号量信号量相关函数运用小demo: 共享内存 相比于前三个…

基于微信小程序的宠物服务平台(系统源码+lw+部署文档+讲解等)

文章目录 目录 详细视频演示 系统详细设计截图 微信小程序系统的实现 1.1系统前台功能的实现 2.1微信小程序开发环境搭建 2.2微信开发者工具 2.3程序应用相关技术和知识 2.3.1小程序目录结构以及框架介绍 2.3.2 Java技术 2.3.3 MySQL数据库 2.3.4 SSM框架 源码获…

构建铁路安全防线:EasyCVR视频+AI智能分析赋能铁路上道作业高效监管

一、方案背景 随着我国铁路特别是高速铁路的快速发展&#xff0c;铁路运营里程不断增加&#xff0c;铁路沿线的安全环境对保障铁路运输的安全畅通及人民群众的生命财产安全具有至关重要的作用。铁路沿线安全环境复杂多变&#xff0c;涉及多种风险因素&#xff0c;如人员入侵、…

函数递归超详解!

目录 1.什么是递归调用&#xff1f; 直接调用 间接调用 2.什么是递归&#xff1f; 3.递归举例 3.1求n!的阶乘 3.1.1.非递归法 3.1.2.递归法 3.1.2.1分析和代码实现 3.2顺序打印一个整数的每一位 3.2.1分析和代码实现 4.递归与迭代 4.1举例&#xff1a;斐波那契数列 …

开放式耳机更适合运动的时候使用?开放式耳机推荐指南

开放式耳机确实非常适合运动时使用&#xff0c;原因主要有以下几点。 首先&#xff0c;保持对外界的感知是很重要的一点。在运动的时候&#xff0c;我们需要听到周围的环境声音&#xff0c;比如车辆的行驶声、行人的呼喊等&#xff0c;以便及时做出反应&#xff0c;保证自身安全…

【MySQL】索引概念解析

1.什么是索引&#xff1f; MySQL中的索引是一种数据结构&#xff0c;用于帮助MySQL数据库管理系统快速查询数据。索引的主要目的是提高数据检索的速度&#xff0c;减少数据库系统需要扫描的数据量。 优点&#xff1a; 索引可以极大的提高数据检索效率&#xff0c;降低数据库…

【Nuxt】配置

Nuxt 配置 nuxt.config.ts 里面可以添加相关配置&#xff1a; runtimeConfig 运行时配置。 // https://nuxt.com/docs/api/configuration/nuxt-config export default defineNuxtConfig({compatibilityDate: 2024-04-03,devtools: {enabled: true},runtimeConfig: {appKey: …

手拉手模型笔记and一线三角笔记

手拉手模型 基本需要&#xff1a;两个 顶角相等 的 等腰 三角形 共 顶点 反手拉手: 等边 等腰 R t △ 等边\\等腰Rt△ 等边等腰Rt△ 左手拉左手&#xff0c;右手拉右手( − 红线 − \textcolor{red}{-红线-} −红线−): △ A B D ≅ △ A C E ( S A S ) △ABD \cong △ACE(S…

一次多波束和浅地层处理的经历—信标机出问题?

最近处理多波束和浅地层时&#xff0c;一个从来没有过的问题出现了。 多波束数据(.pds)是由PDS2000采集的&#xff0c;使用设备型号为T50P。浅地层数据(.raw)是有SESWIN采集的&#xff0c;使用设备型号为SES2000 Standard。 1、多波束处理 多波束数据采用CARIS11.3处理的。船…