1.5 Canal 数据同步工具详细教程

news2025/1/1 11:37:38

欢迎来到我的博客,很高兴能够在这里和您见面!欢迎订阅相关专栏:

⭐️ 全网最全IT互联网公司面试宝典:收集整理全网各大IT互联网公司技术、项目、HR面试真题.
⭐️ AIGC时代的创新与未来:详细讲解AIGC的概念、核心技术、应用领域等内容。
⭐️ 全流程数据技术实战指南:全面讲解从数据采集到数据可视化的整个过程,掌握构建现代化数据平台和数据仓库的核心技术和方法。

文章目录

      • Canal概述
      • 架构
      • 基本工作流程
      • 使用场景
      • 优缺点
      • 部署安装
      • 使用案例
        • 实时数据同步
      • 性能优化
      • 总结

Canal概述

Canal是一款由阿里巴巴开源的、用于MySQL数据库binlog增量订阅和消费的中间件。它的设计灵感来源于MySQL主从复制机制,通过模拟MySQL Slave与Master进行交互,从而解析并获取数据库的实时变更数据。Canal可以将这些变更数据实时推送到其他系统,从而实现数据同步、数据监控等功能。

架构

Canal的架构主要包括以下几个组件:

  1. Canal Server:核心组件,负责与MySQL进行交互,解析binlog日志。
  2. Canal Client:消费者,订阅并消费Canal Server推送的binlog数据。
  3. Zookeeper:用于管理Canal Server的集群状态及分布式协调。

架构图如下:

+---------------+     +-------------+
|               |     |             |
|  MySQL Server |<--->| Canal Server|
|               |     |             |
+---------------+     +-------------+
                          |
                          v
                   +-------------+
                   | Canal Client|
                   +-------------+
                          |
                          v
                   +-------------+
                   | Other System|
                   +-------------+

基本工作流程

  1. 连接MySQL:Canal Server以MySQL Slave的身份连接到MySQL Master,获取binlog位置信息。
  2. 拉取binlog:Canal Server从MySQL Master拉取binlog日志。
  3. 解析binlog:Canal Server解析binlog日志,提取数据库变更事件。
  4. 推送数据:Canal Server将解析后的变更事件推送给Canal Client。
  5. 处理数据:Canal Client消费变更事件,并根据需要将数据同步到其他系统。

使用场景

  1. 数据同步:将MySQL数据实时同步到其他数据库或大数据平台,如Elasticsearch、Hadoop等。
  2. 数据监控:实时监控MySQL数据库的变更,进行数据统计、报警等。
  3. 缓存更新:数据库变更后,实时更新缓存数据,确保数据一致性。

优缺点

优点

  1. 实时性强:能够实时获取MySQL数据库的变更数据。
  2. 高效:直接读取binlog日志,性能开销小。
  3. 灵活性高:支持自定义数据处理逻辑,适用于多种使用场景。

缺点

  1. 复杂度高:需要对MySQL binlog机制有一定了解,配置相对复杂。
  2. 依赖性强:依赖于MySQL主从复制机制,MySQL版本不兼容可能会带来问题。

部署安装

  1. 下载Canal:从Canal GitHub下载最新版本。
  2. 配置Canal Server:修改conf目录下的配置文件,配置MySQL连接信息、binlog位置信息等。
  3. 启动Canal Server:通过命令bin/startup.sh启动Canal Server。
  4. 配置Canal Client:编写Canal Client代码,订阅Canal Server的变更事件。

使用案例

实时数据同步

假设我们要将MySQL数据库的订单数据实时同步到Elasticsearch。首先,我们需要在MySQL中配置binlog,并启动Canal Server。

MySQL配置(my.cnf)

[mysqld]
server-id = 1
log-bin = mysql-bin
binlog-format = ROW

Canal Server配置(example/instance.properties)

canal.instance.mysql.slaveId=1234
canal.instance.master.address=127.0.0.1:3306
canal.instance.dbUsername=root
canal.instance.dbPassword=yourpassword
canal.instance.connectionCharset=UTF-8
canal.instance.tsdb.enable=true

Canal Client代码

我们使用Java编写一个Canal Client,将MySQL数据同步到Elasticsearch。

import com.alibaba.otter.canal.client.CanalConnector;
import com.alibaba.otter.canal.client.CanalConnectors;
import com.alibaba.otter.canal.protocol.CanalEntry;
import com.alibaba.otter.canal.protocol.Message;
import org.elasticsearch.action.index.IndexRequest;
import org.elasticsearch.client.RequestOptions;
import org.elasticsearch.client.RestHighLevelClient;
import org.elasticsearch.client.RestClient;
import org.elasticsearch.common.xcontent.XContentType;

import java.net.InetSocketAddress;
import java.util.List;

public class CanalClient {
    private static final String INDEX_NAME = "orders";

    public static void main(String[] args) {
        CanalConnector connector = CanalConnectors.newSingleConnector(new InetSocketAddress("127.0.0.1", 11111), "example", "", "");
        RestHighLevelClient esClient = new RestHighLevelClient(RestClient.builder(new HttpHost("localhost", 9200, "http")));

        try {
            connector.connect();
            connector.subscribe(".*\\..*");
            connector.rollback();

            while (true) {
                Message message = connector.getWithoutAck(1000);
                long batchId = message.getId();
                List<CanalEntry.Entry> entries = message.getEntries();

                if (batchId != -1 && entries.size() > 0) {
                    processEntries(entries, esClient);
                }

                connector.ack(batchId);
            }
        } finally {
            connector.disconnect();
            try {
                esClient.close();
            } catch (IOException e) {
                e.printStackTrace();
            }
        }
    }

    private static void processEntries(List<CanalEntry.Entry> entries, RestHighLevelClient esClient) {
        for (CanalEntry.Entry entry : entries) {
            if (entry.getEntryType() == CanalEntry.EntryType.ROWDATA) {
                CanalEntry.RowChange rowChange;
                try {
                    rowChange = CanalEntry.RowChange.parseFrom(entry.getStoreValue());
                } catch (Exception e) {
                    throw new RuntimeException("ERROR ## parser of eromanga-event has an error, data:" + entry.toString(), e);
                }

                for (CanalEntry.RowData rowData : rowChange.getRowDatasList()) {
                    if (rowChange.getEventType() == CanalEntry.EventType.INSERT) {
                        handleInsert(rowData, esClient);
                    } else if (rowChange.getEventType() == CanalEntry.EventType.UPDATE) {
                        handleUpdate(rowData, esClient);
                    } else if (rowChange.getEventType() == CanalEntry.EventType.DELETE) {
                        handleDelete(rowData, esClient);
                    }
                }
            }
        }
    }

    private static void handleInsert(CanalEntry.RowData rowData, RestHighLevelClient esClient) {
        // Assuming the table has columns id, order_id, and amount
        String id = "";
        String orderId = "";
        String amount = "";

        for (CanalEntry.Column column : rowData.getAfterColumnsList()) {
            switch (column.getName()) {
                case "id":
                    id = column.getValue();
                    break;
                case "order_id":
                    orderId = column.getValue();
                    break;
                case "amount":
                    amount = column.getValue();
                    break;
            }
        }

        IndexRequest request = new IndexRequest(INDEX_NAME).id(id).source(
                "{ \"order_id\": \"" + orderId + "\", \"amount\": \"" + amount + "\" }", XContentType.JSON);

        try {
            esClient.index(request, RequestOptions.DEFAULT);
        } catch (IOException e) {
            e.printStackTrace();
        }
    }

    private static void handleUpdate(CanalEntry.RowData rowData, RestHighLevelClient esClient) {
        // Handle update similarly to insert, adjusting for changes
        handleInsert(rowData, esClient);
    }

    private static void handleDelete(CanalEntry.RowData rowData, RestHighLevelClient esClient) {
        String id = "";

        for (CanalEntry.Column column : rowData.getBeforeColumnsList()) {
            if (column.getName().equals("id")) {
                id = column.getValue();
                break;
            }
        }

        DeleteRequest request = new DeleteRequest(INDEX_NAME, id);

        try {
            esClient.delete(request, RequestOptions.DEFAULT);
        } catch (IOException e) {
            e.printStackTrace();
        }
    }
}

性能优化

  1. 增加Canal Server实例:通过增加Canal Server实例,提高数据处理能力。
  2. 优化binlog解析:定期清理无用的binlog文件,减少解析时间。
  3. 合理配置内存和线程:根据业务需求,合理配置Canal Server的内存和线程数,提高并发处理能力。

总结

Canal是一款强大的MySQL binlog增量订阅和消费中间件,通过模拟MySQL Slave与Master的交互,实现实时数据同步和监控。它具有高效、实时的优点,适用于多种数据同步和监控场景。然而,Canal的配置和使用相对复杂,用户需要对MySQL binlog机制有一定了解。通过合理的配置和性能优化,可以充分发挥Canal的优势,实现高效的数据处理和同步。


💗💗💗 如果觉得这篇文对您有帮助,请给个点赞、关注、收藏吧,谢谢!💗💗💗

👇扫👇 码👇+ V👇获取👇更多👇福利👇
在这里插入图片描述

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/1883232.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

面向阿克曼移动机器人(自行车模型)的LQR(最优二次型调节器)路径跟踪方法

线性二次调节器&#xff08;Linear Quadratic Regulator&#xff0c;LQR&#xff09;是针对线性系统的最优控制方法。LQR 方法标准的求解体系是在考虑到损耗尽可能小的情况下, 以尽量小的代价平衡其他状态分量。一般情况下&#xff0c;线性系统在LQR 控制方法中用状态空间方程描…

工程化:Commitlint / 规范化Git提交消息格式

一、理解Commitlint Commitlint是一个用于规范化Git提交消息格式的工具。它基于Node.js&#xff0c;通过一系列的规则来检查Git提交信息的格式&#xff0c;确保它们遵循预定义的标准。 1.1、Commitlint的核心功能 代码规则检查&#xff1a;Commitlint基于代码规则进行检查&a…

16_C语言编程基础

目录 C语言入门 程序段和代码段 C语言入门实例 英文分号(;) 注释 标识符 保留字 C中的空格 C数据类型 基本类型 sizeof获取存储字节 void类型 变量 C数组 C枚举 C中的左值(lvalue)和右值(rvalue) C常量 变量存储类型 C常用关键字(保留字) 宏定义#define co…

视频孪生助力智慧工厂:可视化安防管理与报警告警

在当今快速迭代的工业4.0时代&#xff0c;智慧工厂已成为提升生产效率、优化资源配置的关键所在。面对日益复杂的生产环境和多元化的业务需求&#xff0c;如何构建一个高效、智能且具备强大适应能力的智慧工厂也成为了众多厂商关注的焦点。为了满足工业制造的转型需求&#xff…

element el-table表格切换分页保留分页数据+限制多选数量

el-table表格并没有相关的方法来禁用表头里面的多选按钮 那么我们可以另辟蹊径&#xff0c;来实现相同的多选切换分页&#xff08;保留分页数据&#xff09; 限制多选数量的效果 <el-table:data"tableData"style"width: 100%">// 不使用el-talbe自带…

DX-11A信号继电器 0.5A 柜内板前接线 约瑟JOSEF

DX-11,11A,11B,11C型信号继电器 DX-11信号继电器 DX-11B信号继电器 DX-11A信号继电器 DX-11C信号继电器 1 用途 该继电器用于直流操作的保护线路中&#xff0c;作为信号指示器。 2 结构和原理 该继电器具有电磁铁和带公共点的三付动合触点及一个信号牌&#xff0c;为电…

什么是原始权益人?

摘要&#xff1a;每天学习一点金融小知识 原始权益人&#xff0c;在资产证券化&#xff08;ABS&#xff09;和公募REITs等金融产品中&#xff0c;指的是证券化基础资产的原始所有者&#xff0c;即金融产品的真正融资方。他们是按照相关规定及约定向资产支持专项计划转移其合法拥…

Victor CMS v1.0 SQL 注入漏洞(CVE-2022-28060)

前言 CVE-2022-28060 是 Victor CMS v1.0 中的一个SQL注入漏洞。该漏洞存在于 /includes/login.php 文件中的 user_name 参数。攻击者可以通过发送特制的 SQL 语句&#xff0c;利用这个漏洞执行未授权的数据库操作&#xff0c;从而访问或修改数据库中的敏感信息。 漏洞详细信…

武汉星起航:跨境电商流量红利爆发,2023年出海企业迎突破增长

在数字时代的浪潮中&#xff0c;中国跨境电商以惊人的爆发力崭露头角&#xff0c;成为全球贸易的璀璨新星。2023年数据显示&#xff0c;跨境电商出口额高达1.83万亿元&#xff0c;同比增长19.6%&#xff0c;这一显著增速不仅刷新纪录&#xff0c;更为众多出海企业带来了前所未有…

MySQL-行级锁(行锁、间隙锁、临键锁)

文章目录 1、介绍2、查看意向锁及行锁的加锁情况3、行锁的演示3.1、普通的select语句&#xff0c;执行时&#xff0c;不会加锁3.2、select * from stu where id 1 lock in share mode;3.3、共享锁与共享锁之间兼容。3.4、共享锁与排他锁之间互斥。3.5、排它锁与排他锁之间互斥3…

TopK问题与如何在有限内存找出前几最大(小)项(纯c语言版)

目录 0.前言 1.知识准备 2.实现 1.首先是必要的HeapSort 2.造数据 其他注意事项 3.TopK的实现 0.前言 在我们的日常生活中总有排名系统&#xff0c;找出前第k个分数最高的人&#xff0c;而现在让我们用堆来在有限内存中进行实现 1.知识准备 想要实现topk问题首先我们要…

【stm32】大一上学期笔记复制

砌墙单片机 外设是什么&#xff1f; ipage 8 nx轴 128 X0-127 y0-63 PWM脉冲宽度调制 PWM脉冲宽度调制 2023年10月13日 基本特性&#xff1a;脉冲宽度调制PWM是一种对模拟信号进行数字编码的方法。广泛引用于电机控制&#xff0c;灯光的亮度调节&#xff0c;功率控制等领域…

科普文:一文搞懂jvm原理(二)类加载器

概叙 科普文&#xff1a;一文搞懂jvm(一)jvm概叙-CSDN博客 前面我们介绍了jvm&#xff0c;jvm主要包括两个子系统和两个组件&#xff1a; Class loader(类装载器) 子系统&#xff0c;Execution engine(执行引擎) 子系统&#xff1b;Runtime data area (运行时数据区域)组件&am…

类和对象【上】【C++】

P. S.&#xff1a;以下代码均在VS2019环境下测试&#xff0c;不代表所有编译器均可通过。 P. S.&#xff1a;测试代码均未展示头文件stdio.h的声明&#xff0c;使用时请自行添加。 博主主页&#xff1a;LiUEEEEE                        …

试用笔记之-收钱吧安卓版演示源代码,收钱吧手机版感受

首先下载&#xff1a; https://download.csdn.net/download/tjsoft/89499105 安卓手机安装 如果有收钱吧帐号输入收钱吧帐号和密码。 如果没有收钱吧帐号点我的注册 登录收钱吧帐号后就可以把手机当成收钱吧POS机用了&#xff0c;还可以扫客服的付款码哦 源代码技术交流QQ:42…

Nuxt3 的生命周期和钩子函数(七)

title: Nuxt3 的生命周期和钩子函数&#xff08;七&#xff09; date: 2024/6/30 updated: 2024/6/30 author: cmdragon excerpt: 摘要&#xff1a;文章阐述了Nuxt3中Nitro生命周期钩子的使用&#xff0c;如nitro:config自定义配置、nitro:init注册构建钩子、nitro:build:be…

Python自动化,实现自动登录并爬取商品数据,实现数据可视化

关于如何使用Python自动化登录天 猫并爬取商品数据的指南&#xff0c;我们需要明确这是一个涉及多个步骤的复杂过程&#xff0c;且需要考虑到天猫的反爬虫策略。以下是一个简化的步骤指南&#xff1a; 步骤一&#xff1a;准备工作 环境准备&#xff1a;确保你的Python环境已经…

数据沿袭是止痛药还是维生素?

首先&#xff0c;这在很大程度上取决于用户组织当前的使用案例及其成熟度。 在我看来&#xff0c;数据工程师喜欢查看数据流并对依赖关系有直观的了解&#xff0c;但他们最终真的会使用数据沿袭吗&#xff1f;使用频率是多少&#xff1f;具体用例是什么&#xff1f; 从我们的观…

<电力行业> - 《第12课:配电(2)》

5 配网的指标 配电网与广大用户紧密联系&#xff0c;所以配电网是否合格还是十分重要的。 评判配电网的标准&#xff0c;主要有四个指标&#xff1a; 供电可靠性&#xff1a;供电可靠性是指针对用户连续供电的可靠程度。网损率&#xff1a;网损率可定义为电力网的电能损耗量与…

问题-小技巧-专业版Win11怎么启动电脑的休眠模式?

专业版Win11怎么启动电脑的休眠模式&#xff1f; powercfg -a powercfg -hibernate on 启用管理员面板依次输入上述命令就可以了。