Springboot整合Canal 实践经验

news2024/11/28 22:23:51

文章目录

  • 前言
  • 一、Canal 服务端:
    • 1.1 canal.properties:
    • 1.2 canal的监听实例:
  • 二、canal客户端
    • 2.1 客户端配置要监听的实例:
    • 2.2 通过连接获取信息
  • 总结


前言

本文是Springboot整合Canal 实践过程中经验记录;


一、Canal 服务端:

1.1 canal.properties:

该文件是canal 服务端的配置文件, 在改配置文件中需要修改如下:

# 启动端口,也是客户端连接的端口
canal.port = 11111
# tcp, kafka, rocketMQ, rabbitMQ, pulsarMQ 与canal 连接的客户端
# 如果是通过代码进行连接,这里为tcp
canal.serverMode = tcp
# canal 加载mysql 的实例
canal.destinations = example

1.2 canal的监听实例:

经过实践得知 canal.destinations 中定义的监听实例 与数据库中的某个实例名称是无关的:
在这里插入图片描述
也就是说 在canal.destinations 可以定义任意名字的实例,比如我们定义 aabbcc:

在这里插入图片描述

然后只需要在 canal\conf 的目录下新建一个文件夹,名字为 aabbcc 即可:
在这里插入图片描述
然后将 example 下的文件全部拷贝到 aabbcc 下:
在这里插入图片描述
然后设置要连接的数据库:

canal.instance.master.address=localhost:3406
canal.instance.dbUsername=root
canal.instance.dbPassword=ddsoft

可以看到上述配置里并没有配置某个具体的数据库实例,客户端在连接到服务端的 aabbcc 时实际上会得到这个实例连接下所有数据库实例数据的变化结果

二、canal客户端

2.1 客户端配置要监听的实例:

客户端通过 CanalConnectors.newSingleConnector 来创建连接对象:

@Bean
public CanalConnector canalConnector() {
  CanalConnector canalConnector1 = CanalConnectors.newSingleConnector(new InetSocketAddress("localhost", "11111"), "aabbcc", "", "");
     canalConnectors.add(canalConnector1);
     return canalConnector1;
 }

2.2 通过连接获取信息



import com.alibaba.otter.canal.client.CanalConnector;
import com.alibaba.otter.canal.protocol.CanalEntry;
import com.alibaba.otter.canal.protocol.Message;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;

import javax.annotation.PostConstruct;
import java.util.List;

@Slf4j
@Component
public class CanalService {


    @Autowired
    private CanalConnector canalConnector;




    @Autowired
    private CanalListener canalListener;


    @PostConstruct
    public void run() {
        // 定义最后消费的位点
        long lastOffset = fetchFromPosition();

        while (true) {
            Message message = canalConnector.getWithoutAck(10);
            long batchId = message.getId();
            List<CanalEntry.Entry> entryList = message.getEntries();
            int size = message.getEntries().size();
            if (batchId == -1 || entryList.isEmpty()) {
                try {
                    // 线程休眠2秒
                    Thread.sleep(2000);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
                continue;
            }
            long nowOffset = entryList.get(0).getHeader().getLogfileOffset();
            if (nowOffset <= lastOffset) {
                continue;
            }
            try {

                canalListener.onMessage(message);
                canalConnector.ack(batchId);
                // 保存最后消费的位点
                lastOffset = message.getEntries().get(size - 1).getHeader().getLogfileOffset();
                savePositionState(lastOffset);
            } catch (Exception ex) {
                log.error("consume error:{}", ex.getMessage());
            }

        }
    }

    // 获取并设置消费的起始位点
    private long fetchFromPosition() {
        // Canal 连接器连接
        canalConnector.connect();
        // 订阅数据变更:这里是连接服务端 aabbcc 实例下 监听哪些表 其中biglog 和 bluegrass 都是改实例下的mysql 实例
        //  user,student,about_us 是各自数据库下的表
        canalConnector.subscribe("biglog.user|biglog.student|biglog.about_us|bluegrass.about_us");
        // 从存储中获取上次消费的位点
        long position = getPositionState();
        if (position != -1) {
            // 回滚到上次保存的位点
            canalConnector.rollback(position);
        }
        return position;
    }

    // 获取位点状态
    private static long getPositionState() {
        // TODO: 从存储中获取上次消费的位点
        return -1;
    }

    // 保存位点状态
    private static void savePositionState(long position) {
        // TODO: 将 position 保存到存储中
    }


}

这里设置的过滤 :biglog.user|biglog.student|biglog.about_us|bluegrass.about_us;biglog 和 bluegrass 是aabbcc 实例下你要监听的mysql 实例

在这里插入图片描述
user,student,about_us是各自数据库下的表:
在这里插入图片描述
这样设置 实际上会监听到 biglog 下的 user,student,about_us 表变动,bluegrass下的about_us 表变动

数据变动的消费:
CanalListener.java

public interface CanalListener {
    void onMessage(Message msg);
}

MyCanalListener.java


import com.alibaba.otter.canal.protocol.CanalEntry;
import com.alibaba.otter.canal.protocol.Message;
import com.google.protobuf.InvalidProtocolBufferException;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Component;

import java.util.HashMap;
import java.util.List;
import java.util.Map;

@Slf4j
@Component
public class MyCanalListener implements CanalListener {

    @Override
    public void onMessage(Message msg) {
        List<CanalEntry.Entry> entries = msg.getEntries();

        for (CanalEntry.Entry entry : entries) {
            if (entry.getEntryType() == CanalEntry.EntryType.ROWDATA) {
                CanalEntry.RowChange rowChange = null;
                try {
                    rowChange = CanalEntry.RowChange.parseFrom(entry.getStoreValue());
                } catch (InvalidProtocolBufferException e) {
                    throw new RuntimeException("parse error", e);
                }
                String tableName = entry.getHeader().getTableName();
                CanalEntry.EventType eventType = rowChange.getEventType();
                List<CanalEntry.RowData> rowDataList = rowChange.getRowDatasList();
                String schemaName = entry.getHeader().getSchemaName();
                // 处理数据变更事件
                for (CanalEntry.RowData rowData : rowDataList) {
                    switch (eventType) {
                        case INSERT:
                            // 处理插入事件
                            dealInsert(schemaName, tableName, rowData.getAfterColumnsList());
                            break;
                        case UPDATE:
                            // 处理更新事件
                            dealUpdate(schemaName, tableName, rowData.getAfterColumnsList());
                            break;
                        case DELETE:
                            // 处理删除事件
                            dealDelate(schemaName, tableName, rowData.getBeforeColumnsList());
                            break;
                        default:
                            break;
                    }
                }
            }
        }
    }

    private void dealDelate(String schemaName, String tableName, List<CanalEntry.Column> afterColumnsList) {
        Map<String, Object> dataMap = new HashMap<>();
        for (CanalEntry.Column column : afterColumnsList) {
            dataMap.put(column.getName(), column.getValue());
        }
//        log.debug("delate data:{}", afterColumnsList);
        log.debug("delate map data:{}", dataMap);
    }

    private void dealUpdate(String schemaName, String tableName, List<CanalEntry.Column> columns) {
        Map<String, Object> dataMap = new HashMap<>();
        for (CanalEntry.Column column : columns) {
            dataMap.put(column.getName(), column.getValue());
        }
//        log.debug("update data:{}", columns);
        log.debug("update map data:{}", dataMap);

    }

    private void dealInsert(String schemaName, String tableName, List<CanalEntry.Column> columns) {
        Map<String, Object> dataMap = new HashMap<>();
        for (CanalEntry.Column column : columns) {
            dataMap.put(column.getName(), column.getValue());
        }
//        log.debug("insert data:{}", columns);
        log.debug("insert map data:{}", dataMap);
    }
}


总结

canal 服务端定义的监听实例名称与数据库中的实例无关,通过建立一个实例并且在改实例中对instance.properties 设置连接的数据源,实际上可以监听到这个数据源下的所有mysql 实例库中所有表的数据变化;通过在客户端连接这个监听的实例,可以获取到该实例对应数据源下所有mysql 实例库中所有表的数据变化;

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/1392337.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

Linux/Networked

Enumeration nmap 网站更新之后有了一个引导模式&#xff0c;更利于学习了&#xff0c;之前看ippsec的视频&#xff0c;要不总是没有思路&#xff0c;现在出现的问题多了提示也更多了&#xff0c;还没有使用&#xff0c;一会用用再说 首先&#xff0c;第一个问题是“目标上正…

2 python快速上手

2 python快速上手 快速上手1.编码&#xff08;密码本&#xff09;2.编程初体验3.输出4. 初识数据类型4.1 整形&#xff08;int&#xff09;4.2 字符串&#xff08;str&#xff09;4.3 布尔类型&#xff08;bool&#xff09;4.4 类型转换 5. 变量5.1 变量名的规范5.2 变量内存指…

项目架构之Zabbix部署

1 项目架构 1.1 项目架构的组成 业务架构&#xff1a;客户端 → 防火墙 → 负载均衡&#xff08;四层、七层&#xff09; → web缓存/应用 → 业务逻辑&#xff08;动态应用&#xff09; → 数据缓存 → 数据持久层 运维架构&#xff1a;运维客户端 → 跳板机/堡垒机&#x…

探索Python数据结构与算法:解锁编程的无限可能

文章目录 一、引言1.1 数据结构与算法对于编程的重要性1.2 Python作为实现数据结构与算法的强大工具 二、列表和元组2.1 列表&#xff1a;创建列表、索引、切片和常用操作2.2 元组&#xff1a;不可变序列的特性和使用场景 三、字符串操作和正则表达式3.1 字符串的常见操作和方法…

本地运行LlaMA 2的简易指南

大家好&#xff0c;像LLaMA 2这样的新开源模型已经变得相当先进&#xff0c;并且可以免费使用。可以在商业上使用它们&#xff0c;也可以根据自己的数据进行微调&#xff0c;以开发专业版本。凭借其易用性&#xff0c;现在可以在自己的设备上本地运行它们。 本文将介绍如何下载…

0003.为什么有的电流表需要使用分流器?

以下两款电流表&#xff0c;你仔细看能有什么发现&#xff1f; 除了量程一个是20A&#xff0c;一个是30A&#xff0c;还有什么区别&#xff1f; 仔细观察你会发现30A的电流表上还有一个20A电流表没有的参数75mV. 是的&#xff0c;这就是他们之间最大的差距。 要测量一…

HTML--基本结构构成

基本结构&#xff1a; 文档声明: <!DOCTYPE html> htm标签对 :<html> </html> head标签对&#xff1a; <head> </head> body标签对&#xff1a;<body> </body> 如下结构&#xff1a; <html> <head> <title>这是一…

修改iview的表格table展开的默认icon和样式

修改前 修改后 修改内容 .title_label_list .ivu-icon-ios-add{font-size: 26px;color: #888888; } .title_label_list .ivu-icon-ios-add:hover{color: #11AAAA; } .title_label_list .ivu-icon-ios-add:before {content: "\F341"; } .title_label_list .ivu-icon-…

JVM工作原理与实战(十八):运行时数据区-堆

专栏导航 JVM工作原理与实战 RabbitMQ入门指南 从零开始了解大数据 目录 专栏导航 前言 一、运行时数据区 二、堆 1.堆介绍 2.关键参数 总结 前言 ​JVM作为Java程序的运行环境&#xff0c;其负责解释和执行字节码&#xff0c;管理内存&#xff0c;确保安全&#xff0c…

Qt/QML编程之路:小键盘keyboard(36)

小键盘对于qml应用是经常用到的,在qml里面,就如一个fileDialog也要自己画一样,小键盘keyboard也是要自己画的,对于相应的每个按键的clicked都要一一实现的。 这里有一个示例: 代码如下: import QtQuick 2.5 import QtQuick.Controls 1.4 import QtQuick.Window 2.0 im…

【刷题】 leetcode 2 .两数相加

两数相加 两数相加1 思路一 &#xff08;暴毙版&#xff09;2 思路二 &#xff08;本质出发&#xff09; 谢谢阅读Thanks♪(&#xff65;ω&#xff65;)&#xff89;下一篇文章见&#xff01;&#xff01;&#xff01;&#xff01;&#xff01;&#xff01; 两数相加 我们来看…

DQN、Double DQN、Dueling DQN、Per DQN、NoisyDQN 学习笔记

文章目录 DQN (Deep Q-Network)说明伪代码应用范围 Double DQN说明伪代码应用范围 Dueling DQN实现原理应用范围伪代码 Per DQN (Prioritized Experience Replay DQN)应用范围伪代码 NoisyDQN伪代码应用范围 部分内容与图片摘自&#xff1a;JoyRL 、 EasyRL DQN (Deep Q-Networ…

Queue详解(Java)

Queue详解 Java 中的队列&#xff08;Queue&#xff09;是一种数据结构&#xff0c;它遵循先进先出&#xff08;FIFO&#xff09;的原则。队列可以用于在一个集合中保存一组元素&#xff0c;并支持在队列的尾部添加元素&#xff0c;以及在队列的头部移除元素。 Java 标准库提…

CleanMyMac X .4.14.7如何清理 Mac 系统?

细心的用户发现苹果Mac电脑越用越慢&#xff0c;其实这种情况是正常的&#xff0c;mac电脑用久了会产生很多的缓存文件&#xff0c;如果不及时清理会影响运行速度。Mac系统在使用过程中都会产生大量系统垃圾&#xff0c;如不需要的系统语言安装包&#xff0c;视频网站缓存文件&…

Spring全家桶

官网 Spring | Home 一、市面上主流的Spring框架以及简介 Spring Framework&#xff1a;Spring Framework是最基础、最核心的Spring框架&#xff0c;提供了IoC&#xff08;控制反转&#xff09;和AOP&#xff08;面向切面编程&#xff09;等功能。它是其他Spring项目的基础&am…

统计学-R语言-5.2

文章目录 前言大数定理中心极限定理和抽样分布抽样分布样本均值的分布样本比例的分布练习 前言 本篇文章将继续上篇的进行介绍。 大数定理 大数定理大数定理”的另一种表达方式是“均值定理”&#xff0c;其含义是&#xff0c;随机变量X多个观察值的均值会随着观察值的增加越…

探索 Python:发现有趣的库——第 1 章:数据可视化之旅

在一个充满活力的科技世界中&#xff0c;数据分析专家“算法仙”和编程爱好者“代码侠”相遇了&#xff0c;决定一起踏上数据可视化的探险之旅。他们将运用 Matplotlib 和 Seaborn 这两个强大的 Python 库&#xff0c;将枯燥的数据转化为生动的图形。 算法仙&#xff1a;你好&…

【部署LLaMa到自己的Linux服务器】

部署LLaMa到自己的Linux服务器 1、Llama2 项目获取方法1&#xff1a;有git可以直接克隆到本地方法2&#xff1a;直接下载 2、LLama2 项目部署3、申请Llama2许可4、下载模型权重5、运行 1、Llama2 项目获取 方法1&#xff1a;有git可以直接克隆到本地 创建一个空文件夹然后鼠标…

vscode无法自动补全

前提&#xff1a;安装c/c插件 c/c插件功能非常强大&#xff0c;几乎能满足日常编码过程中常用的功能&#xff1b;因此也包含自动补全的功能&#xff0c;开启方法如下&#xff1a; 文件->首选项->设置&#xff1a; 扩展->c/c->Intellisense&#xff0c;找到Intell…

docker-compose和docker compose的区别

在docker实际使用中&#xff0c;经常会搭配Compose&#xff0c;用来定义和运行多个 Docker 容器。使用时会发现&#xff0c;有时候的指令是docker-compose&#xff0c;有时候是docker compose&#xff0c;下面给出解释。 docker官方文档&#xff1a;https://docs.docker.com/c…