中间件Canal之Canal简单使用

news2024/11/13 9:35:15

一. 简单介绍

CanalJava开发的基于数据库增量日志解析,提供增量数据订阅&消费的中间件。目前,Canal主要支持了MySQLBinlog解析,解析完成后才能利用Canal Client来处理获得的相关数据。

二. MySQL的Binlog

2.1. Binlog是什么?

MySQL的二进制可以说MySQL最重要的日志了,它记录了所有DDLDML(除了数据查询语句)语句,以事件形式记录,还包括所有执行的消耗时间,MySQL的二进制日志是事务安全型的。

一般来说开启二进制日志大概会有1%的性能损耗。二进制有两个最重要的使用场景:

  • MySQL ReplicationMaster端开启BinlogMaster把它的二进制日志传递给Slaves来达到Master-Slave数据一致性的目的;
  • 数据恢复,通过使用MySQL Binlog工具来恢复数据

二进制日志包括两类文件:二进制索引文件(文件名后缀为.index)用于记录所有的二进制文件,二进制日志文件(文件名后缀为.00000*)记录数据库所有的DDLDML(除数据查询语句)语句事件。

2.2. Binlog的分类

MySQL Binlog的格式有三种,分别是statementmixedrow。在配置文件中可以选择配置binlog_format=statement|mixed|row。三种格式的区别:

  • statement:语句级,binlog会记录每一次执行写操作的语句。相对row模式节省空间,但是可能产生不一致性,比如“update tt set create_time=now()”,如果用binlog日志进行恢复,由于执行时间不同可能产生的数据就不同;优点是节省空间;缺点就是可能造成数据不一致;
  • row:行级,binlog会记录每次操作后每行记录的变化;优点是保持数据的绝对一致性(因为不管SQL是什么,引用了什么函数,它只记录执行后的结果);缺点是占用空间大。
  • mixedstatement的升级版,一定程度上解决了,因为一些情况而造成的statement模式不一致问题,默认还是statement,在某些情况下譬如:当函数包含UUID()时、包含auto_increment字段的表被更新时、执行insert delayed语句时、用duf时,会按照row的方式进行处理;优点是节省空间,同时兼顾了一定的一致性;缺点是还是存在极个别的情况依旧会造成数据不一致,另外statementmixed对于需要对binlog的监控的情况都不方便。

所有从上面比较来看Canal想做监控分析,选择row格式比较合适。

三. 工作原理

3.1. MySQL主从复制过程

简单过程如下:

  • Master主库将改变记录,写到二进制日志(Binary Log)中;
  • Slave从库向MySQL Master发送dump协议,将Master主库的binary log events拷贝到它的中继日记(relay log);
  • Slave从库读取并重做中继日志中的事件,将改变的数据同步到自己的数据库;

3.2. Canal的工作原理

Canal的工作原理很简单,就是把自己伪装成Slave,假装从Master复制数据。

四. 使用场景

Canal使用场景如下:

  • 原始场景:阿里Otter中间件的一部分,Otter是阿里用于异地数据库之间的同步框架,Canal是其中的一部分;
  • 更新缓冲
  • 抓取业务表的新增变化数据,用于制作实时统计

五. MySQL环境配置

关于MySQL环境的搭建可以参看上一篇文章:MySQL之主从复制集群搭建

六. 安装Canal

在canal中下载:https://github.com/alibaba/canal/releases
在这里插入图片描述
在服务器中创建一个canal的工作目录,解压到此目录:

tar -zxvf canal.deployer-1.1.7-SNAPSHOT.tar.gz -C canal-1.17

解压之后我们只需要关注conf和bin这两个目录中的文件就可以:
在这里插入图片描述

注意:canal的通用配置,canal端口默认就是11111,修改canal的输出model,默认tcp。

conf下面的example是表示一个实例,每个实例下面都有一个instance.properties;如果需要多个实例处理不同的MySQL数据,只需要拷贝出多个example,并对其重新命名;最后修改canal.properties中的canal.destinations=xxx,xxx1,xxx2

接着配置example下的配置文件:

#################################################
## mysql serverId , v1.0.26+ will autoGen
# 因为canal是模拟了一个slave所以这里需要配置slaveId
canal.instance.mysql.slaveId=3

# enable gtid use true/false
canal.instance.gtidon=false

# position info
# 配置master的地址
canal.instance.master.address=192.168.31.174:33306
canal.instance.master.journal.name=
canal.instance.master.position=
canal.instance.master.timestamp=
canal.instance.master.gtid=

# username/password
# 配置master的账号密码
canal.instance.dbUsername=root
canal.instance.dbPassword=123456
canal.instance.connectionCharset = UTF-8
# enable druid Decrypt database password
canal.instance.enableDruid=false

接着启动Canal:

bin/startup.sh  # 启动
bin/stop.sh # 关闭

七. 监控测试

接着创建一个maven项目,并添加相关依赖:

<dependency>
    <groupId>com.alibaba.otter</groupId>
    <artifactId>canal.client</artifactId>
    <version>1.1.6</version>
</dependency>
<dependency>
    <groupId>com.alibaba.otter</groupId>
    <artifactId>canal.protocol</artifactId>
    <version>1.1.6</version>
</dependency>

接着看一下如何连接Canal并监控库表数据变换:

import com.alibaba.fastjson2.JSONObject;
import com.alibaba.otter.canal.client.CanalConnector;
import com.alibaba.otter.canal.client.CanalConnectors;
import com.alibaba.otter.canal.protocol.CanalEntry;
import com.alibaba.otter.canal.protocol.Message;
import com.google.protobuf.ByteString;
import com.google.protobuf.InvalidProtocolBufferException;

import java.net.InetSocketAddress;
import java.util.List;

/**
 * @author: Eternity.麒麟
 * @description: canal简单使用
 * @date: 2023/2/3 17:32
 * @version: 1.0
 */
public class CanalClient {
    public static void main(String[] args) throws InvalidProtocolBufferException {
        // 连接canal服务器
        CanalConnector connector = CanalConnectors.newSingleConnector(
                new InetSocketAddress("192.168.31.174", 11111),
                "example",
                "",
                "");
        System.out.println("开始监听......");
        while (true) {
            // 连接
            connector.connect();
            // 订阅的库和表
            connector.subscribe("cluster_db.*");
            // 一次拉取的数据量
            Message message = connector.get(10);
            List<CanalEntry.Entry> entries = message.getEntries();
            if (!entries.isEmpty()) {
                for (CanalEntry.Entry entry : entries) {
                    // 表名
                    String tableName = entry.getHeader().getTableName();
                    // 类型
                    CanalEntry.EntryType type = entry.getEntryType();
                    switch (type) {
                        // 数据变更
                        case ROWDATA -> {
                            // 获取数据
                            ByteString storeValue = entry.getStoreValue();
                            // 解析数据
                            CanalEntry.RowChange rowChange = CanalEntry.RowChange.parseFrom(storeValue);
                            // 事件类型
                            CanalEntry.EventType eventType = rowChange.getEventType();
                            // 获取行数据
                            List<CanalEntry.RowData> datasList = rowChange.getRowDatasList();
                            for (CanalEntry.RowData rowData : datasList) {
                                JSONObject beforeData = new JSONObject();
                                // 变更之前数据
                                rowData.getBeforeColumnsList().forEach(item -> beforeData.put(item.getName(), item.getValue()));
                                JSONObject afterData = new JSONObject();
                                // 变更之后数据
                                rowData.getAfterColumnsList().forEach(item -> afterData.put(item.getName(), item.getValue()));
                                System.out.println("表名: " + tableName + ", 事件类型: " + eventType + ", 变更之前: " + beforeData + ", 变更之后: " + afterData);
                            }
                        }
                        default -> System.out.println("当前操作类型为:" + type);
                    }
                }
            }
        }
    }
}

启动之后,我们在表中进行增删改查,对应的canal监听结果如下:

在这里插入图片描述

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/197122.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

代码随想录算法训练营第37天 回溯算法 java :134. 加油站 135. 分发糖果 1005.K次取反后最大化的数组和

文章目录LeetCode 134. 加油站思路AC代码LeetCode135. 分发糖果思路AC代码LeetCode 1005.K次取反后最大化的数组和思路AC代码总结LeetCode 134. 加油站 思路 两个数组一个是 增加汽油量 gas[ ] 一个耗费汽油量 cost[ ] 可以换一个思路&#xff0c;首先如果总油量减去总消耗大…

OpenStack Yoga安装使用kolla-ansible

基本上是按照官网文档快速入门进行安装&#xff0c;不过还有很多地方需要换源。重点在换源这块。如果说你的网关有魔法&#xff0c;那就不用看这篇文章了&#xff0c;直接复制官网命令安装。 支持的操作系统 注意&#xff1a;不再支持 CentOS 7 作为主机操作系统。Train 版本同…

Java 的 IDEA 神级插件!

安装插件 1. Codota 代码智能提示插件 只要打出首字母就能联想出一整条语句&#xff0c;这也太智能了&#xff0c;还显示了每条语句使用频率。原因是它学习了我的项目代码&#xff0c;总结出了我的代码偏好。 如果让它再加上机器学习&#xff0c;人工智能写代码的时代还会远吗…

tkinter绘制组件(39)——滑动控件

tkinter绘制组件&#xff08;39&#xff09;——滑动控件引言布局函数结构响应按钮框架响应按钮的表示文本响应移动完整函数代码效果测试代码最终效果github项目pip下载结语引言 swipecontrol直译滑动控件&#xff0c;参考WinUI的SwipeControl。 虽然&#xff0c;这个控件在平…

凡人修C传——专栏从凡人到成仙系列目录

这里先感谢博主THUNDER王给我提出来的一个创作建议&#xff0c;让我有了创作的灵感来创建这一篇博客以及凡人修C传这一个系列的文章。 本文最主要的目的就是给大家一个凡人修C传的一个目录&#xff0c;让大家更加容易学到自己想学的地方。 &#x1f4dd;【个人主页】&#xff1…

js实现滑动进度条

效果图 完整代码 <!DOCTYPE html> <html lang"en"><head><meta charset"UTF-8" /><meta http-equiv"X-UA-Compatible" content"IEedge" /><meta name"viewport" content"widthdevic…

TOOM舆情分析和报告工具,大数据决策免费舆情监控辅助工具?

大数据舆情工具是一种利用大数据技术进行舆情监控、分析、评估和预测的工具&#xff0c;以提高企业舆情应对能力。舆情监控工具可以帮助企业提高舆情应对能力&#xff0c;提升企业形象&#xff0c;以更好地处理各种舆情问题&#xff0c;TOOM舆情分析和报告工具&#xff0c;大数…

使用Python+Tensorflow的CNN技术快速识别验证码

近年来&#xff0c;机器学习变得愈加火热&#xff0c;中国选手柯洁与AlphaGo的人机大战更是引起热议。目前&#xff0c;在图像识别和视觉分析研究中&#xff0c;卷积神经网络&#xff08;CNN&#xff09;技术的使用越来越多。Tensorflow 是由 Google 团队开发的神经网络模块&am…

三级管集电极开路电路工作原理详细分析

今天给大家分享的是&#xff1a;集电极开路电路、集电极开路晶体管电路、集电极开路工作原理、集电极开路TTL、集电极开路输出接线图、集电极开路优缺点。 在数字芯片设计、微控制器应用和运算放大器中&#xff0c;集电极开始输出通常用于驱动继电器等高负载或用于连接其他电路…

从双钻模型看产品规划

作为产品经理&#xff0c;我们在进行产品规划的时候&#xff0c;往往是采用“探索→执行”的思维进行规划&#xff0c;然而这类方法虽然有效&#xff0c;但不全面&#xff0c;也不一定能够科学地指引我们去进行合理的产品规划。那么&#xff0c;有什么方式或模型能够让我们合理…

笔记_html

目录什么是 HTML?HTML元素(定义)骨架HTML元素a标签语法使用1-超链接使用2-锚点定位使用3-文件下载使用4-阻止a标签的默认事件HTML5新增元素HTML5新增元素属性什么是 HTML? HTML是由一系列元素组成的超文本标记语言。 tips: html标签不区分大小写&#xff01; HTML元素(定义)…

多核异构处理器对共享外设和资源的调配方法-飞凌嵌入式

来源&#xff1a;飞凌嵌入式官网www.forlinx.com在多核异构CPU中&#xff0c;多个内核就如同多个大脑&#xff0c;而外设和内存等资源就如同手足&#xff0c;那么多个大脑该如何控制手足才能保证它们正常有序地运行呢&#xff1f;以NXP i.MX8M Plus处理器的A核和M核为例&#x…

为HTML网页添加喜庆气氛的诸多方法

为HTML网页添加喜庆气氛的诸多方法 节假日&#xff0c;如春节&#xff0c;为网页&#xff08;或网站的主网页&#xff09;营造欢乐祥和氛围的手段&#xff0c;还是比较多的&#xff0c;下面介绍。 先给出未加喜庆气氛修饰的网页源码如下&#xff0c;特意做的简单&#xff0c;意…

Docker - 10. 本地镜像发布到阿里云

将本地镜像发布到阿里云&#xff0c;具体步骤如下&#xff1a; 1. 注册并登录阿里云控制台&#xff1a;阿里云登录平台 2. 进入容器镜像服务&#xff1a;阿里云 - 容器镜像服务 3. 创建个人实例&#xff0c;未创建前如下图1&#xff0c;创建后见下图2 4. 打开并创建命名空间…

XSS Labs (one)

Web Security Academy>>Cross-site scripting>>Contexts burpsuite官网XSS靶场地址 超全的fuzz payload Lab: Reflected XSS into HTML context with nothing encoded <script>alert(1)</script>弹出成功&#xff0c;最简单的反射型XSS靶场。 Lab: S…

2023牛客寒假算法基础集训营5 小沙の不懂(思维)

题意&#xff1a;这题目一开始看了半天啊&#xff0c;而且坑点很多&#xff08;qwq&#xff09;&#xff0c;大概意思就是给你两个字符串a,b&#xff08;可能存在前导0&#xff09;&#xff0c;然后给你任意个长度为10的从0到9的排列&#xff08;比如1234567890 &#xff0c;09…

攻防世界:crypt(RC4)

1、下载PE文件&#xff0c;控制台程序2、main函数大致分析&#xff0c;请看下面的注释int __cdecl main(int argc, const char **argv, const char **envp) {unsigned int str_length; // eaxunsigned int myflag_length; // eaxvoid *v5; // raxvoid *v7; // raxint i; // [rs…

软测(概念) · 开发模型 · 软件的生命周期 · 瀑布模型 · 螺线模型 · 增量模型 · 迭代模型 · 敏捷模型 · scrum · 软件测试模型之 V 模型 W 模型

一、开发模型的由来二、软件的生命周期三、瀑布模型&#xff08;Waterfall Model&#xff09;四、螺线模型&#xff08;Spiral Model&#xff09;五、增量模型&#xff08;Incremental Model&#xff09;六、迭代模型&#xff08;Rational UnifiedProcess&#xff09;七、敏捷模…

SQL Server 2014 数据库误删数据的恢复处理

一、序言 作为程序开发人员或 DBA&#xff0c;经常会接触到数据库&#xff08;以 SQL Server 2014 数据库为例&#xff09;的增、删、改查操作。执行 delete 语句时不小心误删数据表的记录情况&#xff0c;而数据库之前又没有任何备份。 SQL Server 数据库自身的数据库还原数…

使用蚁群优化 (ACO) 解决背包问题(Matlab代码实现)

目录 &#x1f4a5;1 概述 &#x1f4da;2 运行结果 &#x1f389;3 参考文献 &#x1f468;‍&#x1f4bb;4 Matlab代码 &#x1f4a5;1 概述 背包问题&#xff08;Knapsack problem&#xff09;是一种组合优化的NP完全&#xff08;NP-Complete&#xff0c;NPC&#xff0…