canal实操应用

news2024/11/13 9:36:53

一、MySQL的binlog日志

1.1、binlog的分类

binlog一般分为三类:statement语句级,记录一条一条的SQL,一条SQL可能更改多行,且SQL语句中如果用到now()函数或者random()函数,会存在数据不一致的问题。row行级,记录一行行的数据,记录特别细致,但是日志文件会比较大。mixed:混合模式,默认还是statement,某些情况下,如UUID()函数就会用row的方式进行处理。

1.2、准备好数据库及表

我这里准备了一个canal库,一张student表

CREATE TABLE `student` (
  `id` int NOT NULL,
  `name` varchar(255) DEFAULT NULL,
  `sex` bit(1) DEFAULT NULL COMMENT '0:女,1:男',
  `age` int DEFAULT NULL,
  PRIMARY KEY (`id`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_0900_ai_ci;

小知识:mysql utf8mb4_0900_ai_ci和utf8mb4_general_ci区别?

utf8mb4_general_ci:
utf8mb4_general_ci 是较为简单的排序规则,对字符的比较是基于字符的二进制值的。
不考虑语言特定的规则,适用于一般的字符比较,但在某些语境下可能导致不符合期望的排序结果。
不区分大小写,但对于一些特殊字符,可能不按照某些语言的预期顺序排序。
utf8mb4_0900_ai_ci:
utf8mb4_0900_ai_ci 是 MySQL 8.0.0 版本后引入的排序规则,采用 Unicode 9.0.0 版本的规则。
它考虑了更多的语言和语境,提供更精确的字符排序,适用于多语言环境。
支持大小写不敏感的比较,并且对于一些特殊字符的排序更符合语言特定的规范。
选择哪种排序规则取决于你的具体需求。如果你的应用面向的是特定语言环境,需要更精确和符合语言规范的字符比较,那么 utf8mb4_0900_ai_ci 可能更适合。如果你对语言特定的排序规则没有特别的要求,或者你希望比较快速并且不涉及复杂的字符排序问题,那么 utf8mb4_general_ci 也是一个可行的选择。

CREATE TABLE my_table (
    column1 VARCHAR(255) CHARACTER SET utf8mb4 COLLATE utf8mb4_0900_ai_ci,
    column2 VARCHAR(255) CHARACTER SET utf8mb4 COLLATE utf8mb4_general_ci
);

注:但是utf8只有utf8_general_ci这种字符集排序规则
在这里插入图片描述

二、实操

2.1、修改配置文件开启binlog

# For advice on how to change settings please see
# http://dev.mysql.com/doc/refman/8.0/en/server-configuration-defaults.html

[mysqld]
# [必须]主服务器唯一ID,千万注意做mysql集群的时候这个id不能重复,
# 而当前我们用canal同步也是,canal是伪装成mysql的slave,所以也不能和canal配置文件中的id重复
server-id=1
# [必须]启动二进制日志,指明路径。比如:自己本地的路径/log/mysqlbin
log-bin=mysql-bin
# [可选]设置需要复制的数据库,默认全部记录。比如
# 开启需要监控的数据库
binlog-do-db=replica_master_slave
binlog-do-db=canal
# binlog日志级别statement,row,mixed
#binlog_format=STATEMENT
binlog_format=ROW

修改完MySQL配置文件之后记得重启MySQL服务, systemctl restart mysqld
重启之后开是否真正开启binlog日志,可以插入一条数据前看一下日志大小,插入数据之后再查看一下binlog文件的大小,linu直接安装的一般在/var/lib/mysql文件夹下就可以看到binlog日志

2.2、canal用户赋值MySQL权限

最小权限原则,赋予canl用户读的权限即可,mysql -u root -p123456 密码写你自己的,登录客户端执行如下语句

# MySQL5.7需要执行如下2条语句
set global validate_password_length=4;
set global validate_password_policy=0;
#8.0使用下面一条语句即可
GRANT SELECT, REPLICATION SLAVE, REPLICATION CLIENT ON *.* TO 'canal'@'%' IDENTIFIED BY 'canal';
#也许你也和我一样,上面那条语句根本执行不成功,执行如下语句即可——先创建用户,再赋予权限
CREATE USER 'canal'@'%' IDENTIFIED BY 'canal';
GRANT SELECT, REPLICATION SLAVE, REPLICATION CLIENT ON *.* TO 'canal'@'%';

如下图只开放了读的权限
在这里插入图片描述

2.3、下载canal并使用

网址,我用的当前最新版,冲冲冲
下载deploy版本即可。
解压tar -zxvf canal.deployer-1.1.7.tar.gz -C /usr/local/cancal-1.1.7/之后如下图更改关键配置
tcp是自己写代码想怎么操作随自己来,然后就是写入各种MQ,请君自选。
在这里插入图片描述
在这里插入图片描述

三、搭建Java客户端

    <dependencies>
        <dependency>
            <groupId>com.alibaba.otter</groupId>
            <artifactId>canal.client</artifactId>
            <version>1.1.0</version>
        </dependency>
    </dependencies>

canal组件细节图
在这里插入图片描述

package org.tg.canal;

import com.alibaba.fastjson.JSONObject;
import com.alibaba.otter.canal.client.CanalConnector;
import com.alibaba.otter.canal.client.CanalConnectors;
import com.alibaba.otter.canal.protocol.CanalEntry;
import com.alibaba.otter.canal.protocol.Message;
import com.google.protobuf.ByteString;
import com.google.protobuf.InvalidProtocolBufferException;

import java.net.InetSocketAddress;
import java.util.List;
import java.util.concurrent.TimeUnit;

/**
 * @Author Sumuxi
 * @Date 2023/11/10 21:58
 * @Desc
 */

public class Main {


    public static void main(String[] args) throws InvalidProtocolBufferException {

        // 1。获取链接
        CanalConnector connector = CanalConnectors.newSingleConnector(new InetSocketAddress("hadoop113", 11111), "example", "", "");

        // 2.链接canal
        connector.connect();
        // 3.订阅数据库
        connector.subscribe("canal.*");
        while (true) {

            // 4.获取数据
            Message message = connector.get(100);// 获取指定数量的数据,有多少取多少,不会阻塞等待
            // 5.获取entry集合
            List<CanalEntry.Entry> entryList = message.getEntries();

            // 集合是空,就等3s后再去拉取
            if (entryList.size() == 0) {
                try {
                    TimeUnit.SECONDS.sleep(3);
                } catch (InterruptedException e) {
                    throw new RuntimeException(e);
                }
            } else {
                // 遍历entryList,单条单条解析
                for (CanalEntry.Entry entry : entryList) {
                    // 1.获取表名
                    String tableName = entry.getHeader().getTableName();

                    // 2.获取类型
                    //    TRANSACTIONBEGIN      事务开启,
                    //    ROWDATA               行数据,
                    //    TRANSACTIONEND        事务关闭
                    //    HEARTBEAT             心跳,
                    //    GTIDLOG               GTID日志;
                    CanalEntry.EntryType entryType = entry.getEntryType();

                    // 3.获取序列化后的数据
                    ByteString storeValue = entry.getStoreValue();
                    // 4.判断当前entry的类型是什么?
                    if (CanalEntry.EntryType.ROWDATA.equals(entryType)) {
                        // 5.反序列化数据
                        CanalEntry.RowChange rowChange = CanalEntry.RowChange.parseFrom(storeValue);
                        // 6.获取事件的操作类型
                        CanalEntry.EventType eventType = rowChange.getEventType();
                        // 7.获取数据集
                        List<CanalEntry.RowData> rowDataList = rowChange.getRowDatasList();

                        for (CanalEntry.RowData rowData : rowDataList) {
                            // 更新前
                            JSONObject beforeData = new JSONObject();
                            List<CanalEntry.Column> beforeColumnsList = rowData.getBeforeColumnsList();
                            for (CanalEntry.Column column : beforeColumnsList) {
                                beforeData.put(column.getName(), column.getValue());
                            }
                            // 更新后
                            JSONObject afterData = new JSONObject();
                            List<CanalEntry.Column> afterColumnsList = rowData.getAfterColumnsList();
                            for (CanalEntry.Column column : afterColumnsList) {
                                afterData.put(column.getName(), column.getValue());
                            }

                            System.out.println(
                                    "Table:" + tableName +
                                            ",EventType:" + eventType +
                                            ",Before:" + beforeData +
                                            ",After:" + afterData);
                        }
                    } else {
                        System.out.println("当前操作类型为:" + entryType);
                    }
                }
            }
        }
    }
}

最后,其实参考github上的这个实例也能书写客户端,https://github.com/alibaba/canal/wiki/ClientExample,不妥之处,盼多多指正,后续持续完善。
纨绔不饿死,儒冠多误身。读书破万卷,下笔如有神。致君尧舜上,再使风俗淳。骑驴三十载,旅食京华春。朝扣富儿门,暮随肥马尘埃。残杯与冷炙,到处潜悲辛。白鸥没浩荡,万里谁能驯。——节选杜甫《奉赠韦左丞丈二十二韵》中的几句

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/1194015.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

大容量疯了!居然想把磁带放到硬盘,100TB+是否可以实现?

1.引言 上一篇关于大容量硬盘的文章&#xff08;HDD最后的冲刺&#xff1a;大容量硬盘的奋力一搏&#xff09;中&#xff0c;我们针对大容量硬盘研发状态&#xff0c;小编最近又有了新发现。WDC希望可以通过HDD和磁带结合&#xff0c;把盘的容量提升到100TB。 2.数据大爆炸的…

C# Socket通信从入门到精通(7)——单个异步TCP服务器监听单个客户端C#代码实现

前言: 我们在开发TCP服务器程序的时候,有的时候需要一些异步的应用,比如我读取客户端发送的数据,但是服务器程序不能一直等待客户端数据发送过来,服务器要先做一些别的事情,这个时候C# Socket通信从入门到精通(5)——单个同步TCP服务器监听一个客户端C#代码实现这篇文…

低代码平台受欢迎度排行榜:揭秘市场热门之选

对于企业而言&#xff0c;低代码平台不仅仅是一个开发工具&#xff0c;它更是一个加速器&#xff0c;推动了企业的数字化转型进程。传统的开发模式下&#xff0c;业务部门与IT部门之间常常存在沟通障碍&#xff0c;导致需求难以实现或实现速度缓慢。而低代码平台打破了这种障碍…

C++学习贴---C++预处理器

文章目录 前言预处理器#define预处理条件编译#ifdef#ifndef#if、#elif、#else 和 #endif #和##运算符 预定义宏 前言 预处理器 预处理器是指一些指示编译器在实际编译之前所需要完成的指令。 预处理器负责处理以**井号&#xff08;#&#xff09;**开头的预处理指令&#xff0…

为啥$p(w|D)=p(y|X,w)$?

为啥 p ( w ∣ D ) p ( y ∣ X , w ) p(w|D)p(y|X,w) p(w∣D)p(y∣X,w)&#xff1f; p ( w ∣ X , y ) p ( w ∣ D ) p(w|X,y)p(w|D) p(w∣X,y)p(w∣D), p ( w ∣ D ) p ( D , w ) / p ( D ) p(w|D)p(D,w)/p(D) p(w∣D)p(D,w)/p(D)为啥 p ( D ∣ w ) p ( y ∣ X , w ) p(D|…

PLC开放式以太网通信网络状态查看工具netstat

在进行PLC的开放式以太网通信时,为了查看网络状态我们可以利用ping这个强有力的工具,还可以使用netstat这个工具。 博途PLC开放式以太网通信 UDP通信 博途PLC 1200/1500PLC开放式以太网通信TSEND_C通信(UDP)_RXXW_Dor的博客-CSDN博客文章浏览阅读1.7k次。开放式TSEND_C通信…

大数据毕业设计选题推荐-污水处理大数据平台-Hadoop-Spark-Hive

✨作者主页&#xff1a;IT研究室✨ 个人简介&#xff1a;曾从事计算机专业培训教学&#xff0c;擅长Java、Python、微信小程序、Golang、安卓Android等项目实战。接项目定制开发、代码讲解、答辩教学、文档编写、降重等。 ☑文末获取源码☑ 精彩专栏推荐⬇⬇⬇ Java项目 Python…

【Java】记一次服务内实现排队消费模式

主要是记录一下实现过程和实现的过程中遇到的坑。 我的业务 系统中有一个接口&#xff0c;是从大数据那边拉数据&#xff0c;之前的做法是&#xff0c;开个线程池&#xff0c;让SQL去执行&#xff0c;可是如果大量的慢SQL同时&#xff0c;请求数据库的话会适得其反。并且还有…

Python语法基础(字符串 列表 元组 字典 集合)

目录 字符串(str)字符串的创建特殊情况字符串的转义字符字符串的运算符字符串常用方法求字符串长度去掉多余空格是否包含某子串分割字符串合并字符串替换字符串统计统计字符串出现的次数 练习&#xff1a;判断字符串是否为回文串 列表(list)列表的创建列表常用方法遍历列表列表…

小程序如何设置下单提示语句

下单提示会展示在购物车和提交订单页面&#xff0c;它可以帮助商家告知客户事项&#xff0c;提高用户体验和减少错误操作。例如提示&#xff1a;商品是否包邮、某些区域是否发货、商品送达时间等等。 在小程序管理员后台->配送设置处&#xff0c;填写下单提示。在设置下单提…

基于ssm的高校失物招领管理系统

基于ssm的高校失物招领管理系统 摘要 失物招领管理系统是一种利用现代信息技术&#xff0c;为高校提供高效、便捷的失物招领服务的平台。本系统基于SSM框架&#xff08;Spring SpringMVC MyBatis&#xff09;&#xff0c;充分利用了各框架的优势&#xff0c;实现了系统的稳定…

1.微服务与SpringCloud

微服务和SpringCloud 文章目录 微服务和SpringCloud1.什么是微服务2.SpringCloud3. 微服务 VS SpringCloud4. SpringCloud 组件5.参考文档6.版本要求 1.什么是微服务 微服务是将一个大型的、单一的应用程序拆分成多个小型服务&#xff0c;每个服务实现特定的业务功能&#xff…

C#上位机序列10: Winform上位机通用框架

C#上位机序列1: 多线程&#xff08;线程同步&#xff0c;事件触发&#xff0c;信号量&#xff0c;互斥锁&#xff0c;共享内存&#xff0c;消息队列&#xff09; C#上位机序列2: 同步异步(async、await) C#上位机序列3: 流程控制&#xff08;串行&#xff0c;并行&#xff0c…

防火防盗防小人 使用 Jasypt 库来加密配置文件

⚔️ 项目配置信息存放在哪&#xff1f; 在日常开发工作中&#xff0c;我们经常需要使用到各种敏感配置&#xff0c;如数据库密码、各厂商的 SecretId、SecretKey 等敏感信息。 通常情况下&#xff0c;我们会将这些敏感信息明文放到配置文件中&#xff0c;或者放到配置中心中。…

原厂监视综合控制继电器 ZZS-7/1 AC220V 凸出端子固定安装

ZZS-7/11分闸、合闸、电源监视综合控制装置&#xff1b; ZZS-7/12分闸、合闸、电源监视综合控制装置&#xff1b; ZZS-7/13分闸、合闸、电源监视综合控制装置&#xff1b; ZZS-7/14分闸、合闸、电源监视综合控制装置&#xff1b; ZZS-7/102分闸、合闸、电源监视综合控制装置…

GIT的安装与常见命令

Git的介绍 Git是一个开源的分布式版本控制系统&#xff0c;最初由Linus Torvalds在2005年创建用于管理Linux内核的开发&#xff0c;现在已成为全球最流行的版本控制工具之一。 Git可以跟踪代码的修改&#xff0c;记录开发历程&#xff0c;保证多人合作开发时代码的一致性&…

关于maven读取settings.xml文件的优先级问题

今天在IDEA中配置maven的setting.xml文件路径指向的.m2路径下的setting_a.xml文件&#xff0c;同时&#xff0c;我的maven3.6.3也放在.m2中。 [1] .m2文件夹 [2] apache-maven-3.6.3文件夹 然后&#xff0c;在IDEA中打包发布时发现&#xff0c;无论如何都读取不到指定的setti…

【Linux】Linux常用命令—磁盘管理、压缩包管理

创作不易&#xff0c;本篇文章如果帮助到了你&#xff0c;还请点赞 关注支持一下♡>&#x16966;<)!! 主页专栏有更多知识&#xff0c;如有疑问欢迎大家指正讨论&#xff0c;共同进步&#xff01; &#x1f525;c系列专栏&#xff1a;C/C零基础到精通 &#x1f525; 给大…

猫罐头哪家好?宠物店自用的5款猫罐头推荐!猫咪嘎嘎炫~

亲爱的铲屎官们&#xff0c;你们是否会为猫咪选购猫罐头而感到烦恼&#xff1f;你们是否渴望了解哪些猫罐头在宠物界有着良好的口碑&#xff1f;猫罐头&#xff0c;作为猫咪日常饮食中的重要组成部分&#xff0c;其品质直接影响到猫咪的健康和幸福。 猫罐头哪家好&#xff1f;作…

Vue的vant notify组件报错Notify is not defined

解决方法&#xff1a; 原创作者&#xff1a;吴小糖 创作时间&#xff1a;2023.11.10