Canal1--搭建Canal监听数据库变化

news2025/1/11 3:52:49

1.安装mysql

默认安装了mysql(版本8.0.x);

新创建用户

-- 创建用户 用户名:canal 密码:Canal@123456
create user 'canal'@'%' identified by 'Canal@123456';

授权

grant SELECT, REPLICATION SLAVE, REPLICATION CLIENT on *.* to 'canal'@'%' with grant option;

flush privileges;

在这里插入图片描述

查看MySQL是否开启binlog模式

show variables like 'log_bin';

在这里插入图片描述
查看当前正在写入的binlog日志:

show master status;

在这里插入图片描述
记住文件名和偏移量

2.安装Canal

去官网下载页面进行下载;

我这里下载的是1.1.7的版本:
在这里插入图片描述
解压canal.deployer-1.1.7.tar.gz,我们可以看到里面有五个文件夹:
在这里插入图片描述
打开配置文件conf/example/instance.properties,配置信息如下:

## mysql serverId , v1.0.26+ will autoGen
## v1.0.26版本后会自动生成slaveId,所以可以不用配置
# canal.instance.mysql.slaveId=0

# 数据库地址
canal.instance.master.address=127.0.0.1:3306
# binlog日志名称
canal.instance.master.journal.name=自己的日志名称
# mysql主库链接时起始的binlog偏移量
canal.instance.master.position=日志的偏移量
# mysql主库链接时起始的binlog的时间戳
canal.instance.master.timestamp=
canal.instance.master.gtid=

# username/password
# 在MySQL服务器授权的账号密码
canal.instance.dbUsername=canal
canal.instance.dbPassword=Canal@123456
# 字符集
canal.instance.connectionCharset = UTF-8
# enable druid Decrypt database password
canal.instance.enableDruid=false

# table regex .*\\..*表示监听所有表 也可以写具体的表名,用,隔开
canal.instance.filter.regex=.*\\..*
# mysql 数据解析表的黑名单,多个表用,隔开
canal.instance.filter.black.regex=

开启Canal服务端:
进入bin目录

.\startup.bat

3.Java客户端操作

首先引入maven依赖:

<!--canal客户端-->
        <dependency>
            <groupId>com.alibaba.otter</groupId>
            <artifactId>canal.client</artifactId>
            <version>1.1.7</version>
        </dependency>
        <dependency>
            <groupId>com.alibaba.otter</groupId>
            <artifactId>canal.protocol</artifactId>
            <version>1.1.7</version>
        </dependency>

然后创建一个CanalClient类

import com.alibaba.otter.canal.client.*;
import org.springframework.beans.factory.InitializingBean;
import org.springframework.stereotype.Component;
import com.alibaba.otter.canal.protocol.Message;
import com.alibaba.otter.canal.protocol.CanalEntry.*;

import java.net.InetSocketAddress;
import java.util.List;

@Component
public class CanalClient implements InitializingBean {

    private final static int BATCH_SIZE = 1000;

    @Override
    public void afterPropertiesSet() throws Exception {
        // 创建链接
        CanalConnector connector = CanalConnectors.newSingleConnector(
                new InetSocketAddress("127.0.0.1", 11111),
                "example", "", "");
        try {
            //打开连接
            connector.connect();
            //订阅数据库表,全部表
            connector.subscribe(".*\\..*");
            //回滚到未进行ack的地方,下次fetch的时候,可以从最后一个没有ack的地方开始拿
            connector.rollback();
            while (true) {
                // 获取指定数量的数据
                Message message = connector.getWithoutAck(BATCH_SIZE);
                //获取批量ID
                long batchId = message.getId();
                //获取批量的数量
                int size = message.getEntries().size();
                //如果没有数据
                if (batchId == -1 || size == 0) {
                    try {
                        //线程休眠2秒
                        Thread.sleep(2000);
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                } else {
                    //如果有数据,处理数据
                    printEntry(message.getEntries());
                }
                //进行 batch id 的确认。确认之后,小于等于此 batchId 的 Message 都会被确认。
                connector.ack(batchId);
            }
        } catch (Exception e) {
            e.printStackTrace();
        } finally {
            connector.disconnect();
        }
    }

    /**
     * 打印canal server解析binlog获得的实体类信息
     */
    private static void printEntry(List<Entry> entrys) {
        for (Entry entry : entrys) {
            if (entry.getEntryType() == EntryType.TRANSACTIONBEGIN || entry.getEntryType() == EntryType.TRANSACTIONEND) {
                //开启/关闭事务的实体类型,跳过
                continue;
            }
            //RowChange对象,包含了一行数据变化的所有特征
            //比如isDdl 是否是ddl变更操作 sql 具体的ddl sql beforeColumns afterColumns 变更前后的数据字段等等
            RowChange rowChage;
            try {
                rowChage = RowChange.parseFrom(entry.getStoreValue());
            } catch (Exception e) {
                throw new RuntimeException("ERROR ## parser of eromanga-event has an error , data:" + entry.toString(), e);
            }
            //获取操作类型:insert/update/delete类型
            EventType eventType = rowChage.getEventType();
            //打印Header信息
            System.out.println(String.format("================》; binlog[%s:%s] , name[%s,%s] , eventType : %s",
                    entry.getHeader().getLogfileName(), entry.getHeader().getLogfileOffset(),
                    entry.getHeader().getSchemaName(), entry.getHeader().getTableName(),
                    eventType));
            //判断是否是DDL语句
            if (rowChage.getIsDdl()) {
                System.out.println("================》;isDdl: true,sql:" + rowChage.getSql());
            }
            //获取RowChange对象里的每一行数据,打印出来
            for (RowData rowData : rowChage.getRowDatasList()) {
                //如果是删除语句
                if (eventType == EventType.DELETE) {
                    printColumn(rowData.getBeforeColumnsList());
                    //如果是新增语句
                } else if (eventType == EventType.INSERT) {
                    printColumn(rowData.getAfterColumnsList());
                    //如果是更新的语句
                } else {
                    //变更前的数据
                    System.out.println("------->; before");
                    printColumn(rowData.getBeforeColumnsList());
                    //变更后的数据
                    System.out.println("------->; after");
                    printColumn(rowData.getAfterColumnsList());
                }
            }
        }
    }

    private static void printColumn(List<Column> columns) {
        for (Column column : columns) {
            System.out.println(column.getName() + " : " + column.getValue() + "    update=" + column.getUpdated());
        }
    }
}

完成之后再对应数据库进行操作,控制台会打印对应的操作,说明对数据的写入操作进行了有效的监控;
注意只读操作并不会写入binlog也不会被Canal监控到(也没必要监控读取操作)。
在这里插入图片描述

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/1614480.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

使用AOP切面做防止用户重复提交功能

在我们的项目中&#xff0c;需要考虑到有时候因为网络原因或者其他原因用户对同一个接口进行同一批数据的重复性操作&#xff0c;如果不做这样的处理很可能会在数据库中添加多条同样的数据。 我们可以通过使用aop来解决这样的问题&#xff0c;接下来看看具体怎么做吧~ 自定义…

c语言中,数组取地址的书写格式

数组取地址 为了更好的区分数组取地址时的情况&#xff0c;我们建立两个数组&#xff0c;arr1一维数组和arr2二维数组&#xff0c;用printf函数来打印出每个例子arr1和arr2的地址&#xff0c;这样可以更加直观的区分出来。 首先我们看到第一组打印&#xff0c;可以看到若是直接…

Python | Leetcode Python题解之第37题解数独

题目&#xff1a; 题解&#xff1a; class Solution:def solveSudoku(self, board: List[List[str]]) -> None:def dfs(pos: int):nonlocal validif pos len(spaces):valid Truereturni, j spaces[pos]for digit in range(9):if line[i][digit] column[j][digit] bloc…

jmeter 指定QPS压测接口

文章目录 jmeter 指定QPS压测接口更换语言为中文创建测试任务新建线程组右键线程组&#xff0c;新建http request&#xff0c;填写要你要压测的接口地址、参数如果需要自定义请求头&#xff0c;添加一个Http头信息管理器要查看结果和QPS统计数据&#xff0c;给上门的http请求添…

JVM虚拟机(十二)ParallelGC、CMS、G1垃圾收集器的 GC 日志解析

目录 一、如何开启 GC 日志&#xff1f;二、GC 日志分析2.1 PSPO 日志分析2.2 ParNewCMS 日志分析2.3 G1 日志分析 三、GC 发生的原因3.1 Allocation Failure&#xff1a;新生代空间不足&#xff0c;触发 Minor GC3.2 Metadata GC Threshold&#xff1a;元数据&#xff08;方法…

poll实现echo服务器的并发

poll实现echo服务器的并发 代码实现 #include <stdio.h> #include <string.h> #include <sys/types.h> #include <sys/socket.h> #include <stdlib.h> #include <arpa/inet.h> #include <sys/time.h> #include <unistd.h> #…

c++的智能指针(5) -- weak_ptr

概述 我们在使用shared_ptr会出现以下的问题&#xff0c;会导致内存泄露。 代码1: 类内指针循环指向 #include <iostream> #include <memory>class B;class A { public:A() {std::cout << "Construct" << std::endl;}~A() {std::cout <…

基于开源CrashRpt与微软开源Detours技术深度改造的异常捕获库分享

目录 1、异常捕获模块概述 2、为什么需要异常捕获模块&#xff1f; 3、在有些异常的场景下是没有生成dump文件的 4、开源异常捕获库CrashRpt介绍 5、对开源库CrashRpt的改进 C软件异常排查从入门到精通系列教程&#xff08;专栏文章列表&#xff0c;欢迎订阅&#xff0c;持…

# 从浅入深 学习 SpringCloud 微服务架构(二)模拟微服务环境(1)

从浅入深 学习 SpringCloud 微服务架构&#xff08;二&#xff09;模拟微服务环境&#xff08;1&#xff09; 段子手168 1、打开 idea 创建父工程 创建 artifactId 名为 spring_cloud_demo 的 maven 工程。 --> idea --> File --> New --> Project --> Ma…

基于贝叶斯算法的机器学习在自动驾驶路径规划中的应用实例

目录 第一章 引言 第二章 数据准备 第三章 贝叶斯路径规划模型训练 第四章 路径规划预测 第五章 路径执行 第六章 实验结果分析 第一章 引言 自动驾驶技术的发展带来了自动驾驶车辆的出现&#xff0c;而路径规划作为自动驾驶车辆的关键功能之一&#xff0c;对于确定最佳行…

锐捷校园网自助服务系统 operatorReportorRoamService SQL注入漏洞致RCE漏洞复现

0x01 产品简介 锐捷校园网自助服务系统是锐捷网络推出的一款面向学校和校园网络管理的解决方案。该系统旨在提供便捷的网络自助服务,使学生、教职员工和网络管理员能够更好地管理和利用校园网络资源。 0x02 漏洞概述 锐捷校园网自助服务系统 operatorReportorRoamService 接…

STP学习的第一篇

1.STP的基本概念&#xff1a;根桥 &#xff08;1&#xff09;STP的主要作用之一是在整个交换网络中计算出一棵无环的“树”&#xff08;STP树&#xff09;。 &#xff08;2&#xff09;根桥是一个STP交换网络中的“树根”。 &#xff08;3&#xff09;STP开始工作后&#xf…

一、MinIO基本知识

MinIO基本知识 一、简介1.许可 二、部署1.Docker部署1.1 部署容器 1.2 MinIO页面访问1.3 创建Bucket![在这里插入图片描述](https://img-blog.csdnimg.cn/direct/6c8aa92975f146b691f1f36ce1033e7c.png) 三、Python-API1.安装包2.Bucket、Object概念3.Bucket-API4.MinIOClient-…

【Yolov系列】Yolov5学习(一)补充1.2:自适应锚框计算详解+代码注释

一、自适应锚框计算详解 自适应锚框计算的具体过程&#xff1a; ①获取数据集中所有目标的宽和高。 ②将每张图片中按照等比例缩放的方式到 resize 指定大小&#xff0c;这里保证宽高中的最大值符合指定大小。 ③将 bboxes 从相对坐标改成绝对坐标&#xff0c;这里…

余氯控制器的功能优势简介

余氯控制器是一款智能化的水质监测设备&#xff0c;它采用高精度AD转换和单片机微处理技术&#xff0c;能够完成余氯值的高精度测量。这款控制器具备时间显示、数据存储等基本功能。 高智能化设计&#xff1a;余氯控制器采用了高精度AD转换和单片机微处理技术&#xff0c;确保…

VisualGLM-6B的部署步骤

对于如下命令&#xff0c;你将完全删除环境和环境中的所有软件包 conda remove -n env_name --all 一、VisualGLM-6B环境安装 1、硬件配置 操作系统&#xff1a;Ubuntu_64&#xff08;ubuntu22.04.3&#xff09; GPU&#xff1a;4050 显存&#xff1a;16G 2、配置环境 建…

防水型RTU IP68防水遥测终端机

在工业物联网的领域中&#xff0c;防水型RTU(Remote Terminal Unit)具有不可或缺的重要性。作为工业设备的守护神&#xff0c;它在实现数据采集和传输、远程控制和预警告警的同时&#xff0c;还能保障设备免受水分侵害&#xff0c;确保系统稳定安全的运行。    计讯物联防水…

JDK 11下载、安装、配置

下载 到Oracle管网下载JDK 11&#xff0c;下载前需要登录&#xff0c;否则直接点下载会出现502 bad gateway。 下载页面链接 https://www.oracle.com/hk/java/technologies/downloads/#java11-windows 登录 有些人可能没有Oracle账号&#xff0c;注册也比较慢&#xff0c;有需…

2024_GAMES101作业环境配置Mac(intel)_VSCode_Clion

目录 VSCodeClionCMakeList.txt VSCode brew install cmake 更换下载源为阿里云下载 opencv&#xff0c;不然会很慢 cd "$(brew --repo)" git remote -v cd "$(brew --repo)" git remote set-url origin https://mirrors.aliyun.com/homebrew/brew.git…

Python --- 基于Iris flower数据集的kNN分类实战

基于Iris flower数据集的kNN分类实战 Iris data set(鸢尾花数据集简介) 鸢尾花数据集共包含三种鸢尾花&#xff1a;Iris setosa, Iris virginica and Iris versicolor。 Iris setosa&#xff08;山鸢尾&#xff09; Iris virginica&#xff08;维吉尼亚鸢尾 &#xff09; Iris …