Idea+maven+springboot项目搭建系列--3 整合阿里云Canal完成Mysql数据的监听

news2025/1/16 15:59:43

前言:在搭建canal 服务之后,项目中就可以连接canal ,完成对感兴趣的数据库及其表中数据的变动完成监听,本文依赖于你已经完成了对canal 服务的搭建工作;

1 Cannal 特点:

Canal是阿里巴巴开源的一款基于MySQL数据库的增量数据订阅和消费组件,能够把MySQL的binlog日志解析,转换成多种类型的事件回调给应用消费或同步到数据存储或搜索引擎等其他数据终端。它提供了类似数据流的方式,对数据进行持续性同步和协作,大幅度提升了数据处理的效率。

Canal的主要特点包括:

  • 支持多种数据存储形式,包括MySQL、Oracle等,其设计和性能都针对常见的数据存储场景进行了优化;
  • Canal结构清晰、易于上手,并且提供了完善的API和文档,能够方便地实现对数据流的订阅、消费和处理;
  • Canal采用微服务架构,支持高可用和集群部署,能够保证数据的持久性和一致性,同时能够支持分布式的数据处理和存储;
  • . Canal具有良好的稳定性、可靠性和灵活性,在实现数据库变更的实时同步和数据处理等方面具有广泛的应用价值。

Canal是一款高效、可靠、易用的开源组件,在数据库的数据同步和处理方面具有广泛的应用价值,同时也为企业快速实现数据同步和大规模数据处理提供了有效的技术支持。

2 springBoot 集成:

2.1 先来看下 Canal 客户端的消费流程:

  • canal server作为一个程序独立运行,在启动时会创建一个或多个线程来获取Binlog数据,同时监听与下游客户端的连接请求。

  • canal server连接MySQL主库或从库,并通过MySQL的binlog dump协议实现对Binlog数据的实时抓取。通过数据库相关API,canal server订阅mysql的binlog,并将获取到的Binlog写入内存或磁盘文件中。

  • 当canal server捕获到新的Binlog数据时,会检测对应的数据源是否有相应的订阅(即canal.destinations参数配置),并将Binlog数据发送到对应的MQ Topic中。RocketMQ或其他MQ中间件将会缓存这些数据,等待下游的消费者来消费。

  • canal客户端作为消费者,通过订阅对应的MQ topic来获取Binlog数据,以实现对应用层的实时数据消费和处理。canal客户端在特定时刻发起订阅请求后,RocketMQ或其他MQ中间件会将缓存的数据推送给该客户端。

  • canal客户端在获取到Binlog数据后,会进行相应的处理和解析,并将结果写入配置的目标数据存储中(如MySQL、Redis等)。

canal server通过监听MySQL binlog dump协议,实现对Binlog实时抓取,并将数据推送到指定的MQ topic中。canal客户端可以通过订阅MQ topic得到实时的Binlog数据,以完成数据消费和处理,从而实现了MySQL数据的实时同步和处理。

通过canal 的消费流程我们可以知道:

  • canal server 端,会获取到mysql 的数据变化时,将数据解析后,封装成MQ 消息,放入到对应的topic 中;
  • canal client 端,会通过订阅canal server 端 感兴趣的topic 从而消费数据;

canal 实际上就是一个生产者和消费者的模型;canal server在默认情况下使用了RocketMQ作为消息队列,用于将解析出的binlog数据发送到下游的消费端,实现数据的异步传输和消息的可靠投递。

2.2 客户端 pom 依赖引入:每项依赖都进行了标注

 <!-- 提供http 访问-->
<dependency>
  <groupId>org.springframework.boot</groupId>
  <artifactId>spring-boot-starter-web</artifactId>
</dependency>
<!-- 提供实体类的get ,set ,toString 简化代码使用 -->
<dependency>
  <groupId>org.projectlombok</groupId>
  <artifactId>lombok</artifactId>
  <optional>true</optional>
</dependency>
<!-- 单元测试 -->
<dependency>
  <groupId>org.springframework.boot</groupId>
  <artifactId>spring-boot-starter-test</artifactId>
  <scope>test</scope>
</dependency>
<!-- canal 客户端的集成 -->
<dependency>
  <groupId>com.alibaba.otter</groupId>
  <artifactId>canal.client</artifactId>
  <version>1.1.6</version>
</dependency>
<!-- 客户端通信 消息传递 -->
<dependency>
    <groupId>com.alibaba.otter</groupId>
    <artifactId>canal.protocol</artifactId>
    <version>1.1.6</version>
</dependency>

2.3 客户端配置文件:
application.properties

# canal配置
# Canal服务器的IP地址
canal.host=localhost
# Canal监听的端口
canal.port=11111
# 监听的库名,可以自定义
canal.destination=biglog
# Canal连接MySQL的用户名
canal.username=canal
# Canal连接MySQL的密码
canal.password=canal

# Canal 客户端每次向 服务端拉取消息的数量
canal.client.batch.size=10
# Canal 客户端 感兴趣的表
canal.client.subscribe.filter=biglog.user|biglog.student|biglog.about_us



logging.level.root=debug

这里对配置的参数进行简单说明:

  • canal.host : canal server 的ip 地址;
  • canal.port : canal server 的 端口 默认11111 可以通过 服务端的canal.properties 配置文件进行修改;
    在这里插入图片描述
  • canal.destination : 要监听的mysql 实例名称;这个参数用来区分客户端消费服务端哪个topic下的消息;
  • canal.username:要监听的mysql 实例连接用户名
  • canal.password: 要监听的mysql 实例连接密码
  • canal.client.batch.size: 客户端每次向服务端拉取的消息数量;
  • canal.client.subscribe.filter: 过滤客户端感兴趣的表,正则表达,多个正则之间以逗号(,)分隔,转义符需要双斜杠(\);

这里会发现:
(1)在服务端和客户端都进行了canal.destination 的配置,服务端canal.destination 配置了需要监听哪些mysql 的实例,并将mysql 实例的数据解析后放入 不同的topic 队列中;客户端的canal.destination 则定义了从哪个topic 消费数据;
(2)在服务端和客户端都进行了canal 连接mysql 实例用户名和密码的配置,服务端 配置的mysql用户是用来连接到mysql 的主/从 节点从而拿到binlog;客户端配置的mysql用户则是可以用来获取MySQL的表结构信息,比如列名、列类型、列约束等;

2.4 配置类装载:

  1. CanalClientConfig.java
package com.example.spring_canal.config;

import com.alibaba.otter.canal.client.CanalConnector;
import com.alibaba.otter.canal.client.CanalConnectors;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

import java.net.InetSocketAddress;

@Configuration
public class CanalClientConfig {
    @Value("${canal.host}")
    private String host;
    @Value("${canal.port}")
    private int port;
    @Value("${canal.destination}")
    private String destination;
    @Value("${canal.username}")
    private String username;
    @Value("${canal.password}")
    private String password;



    @Bean
    public CanalConnector canalConnector() {
        return CanalConnectors.newSingleConnector(new InetSocketAddress(host, port), destination, username, password);
    }

}

2)定义消息消费接口 CanalListener:

package com.example.spring_canal.listener;

import com.alibaba.otter.canal.protocol.Message;

public interface CanalListener {
    void onMessage(Message msg);
}

3)定义消息消费业务实现 MyCanalListener:

package com.example.spring_canal.listener;

import com.alibaba.otter.canal.protocol.CanalEntry;
import com.alibaba.otter.canal.protocol.Message;
import com.google.protobuf.InvalidProtocolBufferException;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Component;

import java.util.HashMap;
import java.util.List;
import java.util.Map;

@Slf4j
@Component
public class MyCanalListener implements CanalListener {

    @Override
    public void onMessage(Message msg) {
        List<CanalEntry.Entry> entries = msg.getEntries();

        for (CanalEntry.Entry entry : entries) {
            if (entry.getEntryType() == CanalEntry.EntryType.ROWDATA) {
                CanalEntry.RowChange rowChange = null;
                try {
                    rowChange = CanalEntry.RowChange.parseFrom(entry.getStoreValue());
                } catch (InvalidProtocolBufferException e) {
                    throw new RuntimeException("parse error", e);
                }
                String tableName = entry.getHeader().getTableName();
                CanalEntry.EventType eventType = rowChange.getEventType();
                List<CanalEntry.RowData> rowDataList = rowChange.getRowDatasList();
                String schemaName = entry.getHeader().getSchemaName();
                // 处理数据变更事件
                for (CanalEntry.RowData rowData : rowDataList) {
                    switch (eventType) {
                        case INSERT:
                            // 处理插入事件
                            dealInsert(schemaName, tableName, rowData.getAfterColumnsList());
                            break;
                        case UPDATE:
                            // 处理更新事件
                            dealUpdate(schemaName, tableName, rowData.getAfterColumnsList());
                            break;
                        case DELETE:
                            // 处理删除事件
                            dealDelate(schemaName, tableName, rowData.getBeforeColumnsList());
                            break;
                        default:
                            break;
                    }
                }
            }
        }
    }

    private void dealDelate(String schemaName, String tableName, List<CanalEntry.Column> afterColumnsList) {
        Map<String, Object> dataMap = new HashMap<>();
        for (CanalEntry.Column column : afterColumnsList) {
            dataMap.put(column.getName(), column.getValue());
        }
//        log.debug("delate data:{}", afterColumnsList);
        log.debug("delate map data:{}", dataMap);
    }

    private void dealUpdate(String schemaName, String tableName, List<CanalEntry.Column> columns) {
        Map<String, Object> dataMap = new HashMap<>();
        for (CanalEntry.Column column : columns) {
            dataMap.put(column.getName(), column.getValue());
        }
//        log.debug("update data:{}", columns);
        log.debug("update map data:{}", dataMap);

    }

    private void dealInsert(String schemaName, String tableName, List<CanalEntry.Column> columns) {
        Map<String, Object> dataMap = new HashMap<>();
        for (CanalEntry.Column column : columns) {
            dataMap.put(column.getName(), column.getValue());
        }
//        log.debug("insert data:{}", columns);
        log.debug("insert map data:{}", dataMap);
    }
}

4) 定义消息的消费CanalService:

package com.example.spring_canal.listener;

import com.alibaba.otter.canal.client.CanalConnector;
import com.alibaba.otter.canal.protocol.CanalEntry;
import com.alibaba.otter.canal.protocol.Message;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Component;

import javax.annotation.PostConstruct;
import java.util.List;

@Slf4j
@Component
public class CanalService {


    @Value("${canal.client.subscribe.filter}")
    private String canalFilter;
    @Value("${canal.client.batch.size}")
    private int batchSize;


    @Autowired
    private CanalConnector canalConnector;
    @Autowired
    private CanalListener canalListener;


    @PostConstruct
    public void run() {
        // 定义最后消费的位点
        long lastOffset = fetchFromPosition();

        while (true) {
            Message message = canalConnector.getWithoutAck(batchSize);
            long batchId = message.getId();
            List<CanalEntry.Entry> entryList = message.getEntries();
            int size = message.getEntries().size();
            if (batchId == -1 || entryList.isEmpty()) {
                try {
                    // 线程休眠2秒
                    Thread.sleep(2000);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
                continue;
            }
            long nowOffset = entryList.get(0).getHeader().getLogfileOffset();
            if (nowOffset <= lastOffset) {
                continue;
            }
            try {
                canalListener.onMessage(message);
                canalConnector.ack(batchId);
                // 保存最后消费的位点
                lastOffset = message.getEntries().get(size - 1).getHeader().getLogfileOffset();
                savePositionState(lastOffset);
            } catch (Exception ex) {
                log.error("consume error:{}", ex.getMessage());
            }

        }
    }

    // 获取并设置消费的起始位点
    private long fetchFromPosition() {
        // Canal 连接器连接
        canalConnector.connect();
        // 订阅数据变更
        canalConnector.subscribe(canalFilter);
        // 从存储中获取上次消费的位点
        long position = getPositionState();
        if (position != -1) {
            // 回滚到上次保存的位点
            canalConnector.rollback(position);
        }
        return position;
    }

    // 获取位点状态
    private static long getPositionState() {
        // TODO: 从存储中获取上次消费的位点
        return -1;
    }

    // 保存位点状态
    private static void savePositionState(long position) {
        // TODO: 将 position 保存到存储中
    }


}


至此消息消费的代码完成,从消费端可以看到,通过定义好的CanalConnector 一直向canal server 去拉取消息,完成消费,并且提交消息的ack,并将消费到的最新位点进行保存 ;

3 总结:

  • Canal 客户端通过不断向canal server 拉取对应topic 下的消息(pull 模型),从而获取到mysql 表数据修改的数据;并且当多个客户端,连接到同一个canal 服务端,如果此时客户端感兴趣的数据库和表是相同的,则只有一个客户端能够接收到具体的Binlog数据,从而避免重复消费;

  • 如果canal客户端不提交ack,那么代表的是这些binlog事件没有被消费,并不会被标记为已处理,这样会导致重复消费数据和数据的延迟;当canal客户端从canal server拉取到binlog事件后,如果不手动提交ack确认信息,则canal server会认为这些事件未被消费,继续等待客户端发出确认信息。如果到了超时时间仍未收到确认信息,则canal server会将这些事件重新推送给客户端,从而导致重复消费数据。
    此外,如果canal客户端不提交ack,那么可以会导致数据的延迟。因为canal server会认为客户端没有处理完当前批次的事件,不会主动推送新的事件给客户端,直到当前批次全部处理完毕并提交ack确认为止。如果客户端一直不提交ack,那么canal server就不能推送新的binlog事件,从而导致实时性受到影响。

  • 即使拉取的数量没有达到canal.client.batch.size,客户端也会立即返回进行消费;

4 参考:
4.1 ClientExample 使用官网;

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/664623.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

Spring Boot实战:拦截器和监听器的应用指南

当使用Spring Boot时&#xff0c;我们可以通过拦截器&#xff08;Interceptor&#xff09;和监听器&#xff08;Listener&#xff09;来实现对请求和响应的处理。拦截器和监听器提供了一种可插拔的机制&#xff0c;用于在请求处理过程中进行自定义操作&#xff0c;例如记录日志…

使用自动化测试获取手机短信验证码

目前在职测试开发,,写一些脚本,个人认为这职业不科学不应该有的职业,测试就是测试,开发就是开发,运维还是老鸟,这行业总能折腾些莫名其妙的东西出来,刚做这行时学的第一门语言是bash shell, 去新去单位上班直接写了个一键搭建测试环境的测试脚本,本来不想干测试了,好好做微信小…

Linux 学习记录36(C高级篇)

Linux 学习记录36(C高级篇) 本文目录 Linux 学习记录36(C高级篇)一、文件相关指令1. chmod 修改文件用户权限(1. 权限字母表示法(2. 权限8进制表示法 2. 修改文件所属组(1. chgrp(2. chown 能够同时修改多个(3. 创建链接文件>1 ln创建硬链接文件>2 ln -s 创建软链接文件 …

7DGroup性能实施项目日记1

壬寅年 己酉月 丁丑日 2022年9月21日 晴 经过上周的7DGroup学员群内部沟通&#xff0c;我们决定启动一个性能实施项目。 在这个实施项目中&#xff0c;把RESAR性能工程的每个环节都落地一遍&#xff0c;让所有参与培训的学员都可以参与。 在这个项目实施过程中&#xff0c;我打…

苹果照片传输到电脑怎么传?批量传输的技巧!

苹果照片传输到电脑怎么传&#xff1f;照片是苹果手机和电脑之间传输比较频繁的内容。对于刚接触苹果手机的朋友&#xff0c;可能还不是很了解传输方法&#xff0c;鉴于此&#xff0c;我们在这里提供几种有效方法来帮助您完成此项任务。无论您想要使用或不使用 iTunes 将照片从…

CAN Frame详解

CAN Frame是CAN总线通信的基本单位&#xff0c;它有多种类型&#xff0c;其中最常见的是数据帧&#xff08;Data Frame&#xff09;&#xff0c;用于传输数据。数据帧有标准格式&#xff08;Standard Format&#xff09;和扩展格式&#xff08;Extended Format&#xff09;&…

redhat 6.4安装oracle11g RAC (一)

&#xff08;一&#xff09;基础环境 虚拟机环境 &#xff1a;vmware workstation 12 操作系统 &#xff1a; redhat6.4 - 64bit 数据库版本 &#xff1a;11.2.0.4 Last login: Fri Jun 16 18:40:20 2023 from 192.168.186.1 [rootrhel64 ~]# cat /etc/redhat-release Red Ha…

项目描述1

学成在线- 6分片上传&#xff0c;8xxl-job 课程模块开发 分布式事务&#xff0c;消息表 spring-security oauth 用户认证授权 学成在线 学成在线认证授权 一些代码 黑马分布式事物 框架学习 - 若依 / RuoYi-Vue-Plus 统一数据权限 若依数据权限使用 数据权限表结构 1. 设备故…

音视频技术开发周刊 | 298

每周一期&#xff0c;纵览音视频技术领域的干货。 新闻投稿&#xff1a;contributelivevideostack.com。 AI艺术在北京798&#xff0c;展望人工智能与环境的未来 本文很有意思的提出了个假设&#xff0c;通过人工智能和艺术家协作与实践产生环绕地球的叙事&#xff0c;去开启置…

【事故致因】HFACS模型各层级中因素的具体含义及内容归纳

HFACS模型各层级中因素的具体含义及内容归纳 1 HFACS(2000版本)中英文结构图2 定义3 结构组成4 各层级因素及内容4.1 不安全行为4.2 不安全行为的前提条件4.3 不安全监督4.4 组织影响 5 HFACS框架的使用 1 HFACS(2000版本)中英文结构图 英文版本&#xff08;论文首次提出原图&…

接口测试基础知识(使用 Fiddler 抓包、使用 Postman 发起请求、Postman汉化教程)

文章目录 一、 什么是接口二、接口测试的流程三、设计接口测试测试用例常见的点四、使用 Fiddler 进行抓包1. 一个重要设置2. 进行抓包 五、使用 Postman 发起 GET 请求1. 通过API文档得到调用信息2. 通过 Postman 构造 GET 请求 六、使用 Postman 发起 POST 请求1. 得到接口调…

机器学习——朴素贝叶斯(手动代码实现)

朴素的我&#xff0c;决定朴素地徒手实现贝叶斯算法&#xff01; 摒弃sklearn 这个体贴善解人意把一切都打包封装好的妈妈 再见了sklearn 妈妈 我要自己手动实现 哪怕前方困难重重 哪怕我此刻还在发牢骚 但我还是要说&#xff0c;撒哟娜拉sklearn妈 看了知乎阿婆主的分析&#…

在 Maya、ZBrush 和 Substance 3D 中创建女枪手(P2)

今天瑞云渲染小编给大家带来了Ivan Lim 的Female Gunslinger 项目的细目&#xff0c;讨论了他在 Think Tank 的教育过程&#xff0c;并解释了他为什么选择虚幻引擎来呈现这个角色。这篇接着上篇继续拓扑UV、灯光材质、渲染等方面的分享 头发 我开始用一个块来处理头发&#xf…

看看人家那高并发秒杀系统,那叫一个优雅

618&#xff0c;大家剁手了么&#xff1f; 说起618&#xff0c;就不得不提其中较为复杂的秒杀环节了。虽说秒杀只是一个促销活动&#xff0c;但对技术要求不低。 秒杀作为618、双十一等电商活动不可缺少的一环&#xff0c;是一个非常典型的活动场景。秒杀场景的业务特点是限时…

并发知识学习

aqs中有2个队列&#xff0c;一个是同步队列&#xff0c;另外一个是条件队列简单记住&#xff1a;独占没有朋友&#xff0c;所以是null。共享就有朋友&#xff0c;所以是固定的node对象。nextWaiter就是表示节点的模式&#xff0c;在条件队列中指向下一个节点。 一个想要去获取锁…

车载测试范例,如何进行ADAS执行器性能测测试?

概述 执行器性能分为横向性能和纵向性能&#xff0c;横向性能主要指方向盘转向的响应性能&#xff0c;纵向主要包括油门加速性能及刹车减速性能。其中横向性能在ADAS中涉及的功能包括LKA、LDW&#xff0c;跟纵向加/减速性能相关的功能主要是ACC&#xff08;自适应巡航&#xf…

C++数据结构【树状数组】

​ 树状数组 什么是树状数组&#xff1f;树状数组和线段树的区别 树状数组的结构什么是lowbitlowbit如何计算代码实现&#xff1a;补充知识——&&#xff0c;|&#xff0c;^运算&|^注意&#xff1a; 树状数组的基本操作单点修改while循环版代码for循环版代码 单点查询区…

pytest生成 junit-xml 测试报告

pytest 生成junit-xml 测试报告&#xff0c;那么生成的xml报告有什么用呢&#xff1f;可以集合一些持续集成工具&#xff08;如jenkins…等&#xff09;方便查看报告。 junit-xml 测试报告 命令行参数有2个跟 junit-xml 报告相关的参数 --junit-xmlpath create junit-xm…

2023年软件测试——精选金融银行面试真题

1、P2P你们也测试后台管理吗&#xff1f;个人芝麻信用积分是调取哪里的资料&#xff1f; 测试后台管理&#xff1a; 后台也测&#xff0c;但是我主要测试前台&#xff0c;我的关注点是前台&#xff0c;后台只是拿来用&#xff0c;能配合前台正常走完流程就行。 后台主要对前…

Linux线程同步(下)

文章目录 1. POSIX信号量2. 基于环形队列的生产消费模型2.1 代码实现2.1.1 构造函数和析构函数2.1.2 生产和消费2.1.3 测试 3. 线程池3.1 成员变量3.2 构造和析构3.3 push和pop3.4 启动线程池3.5 测试 4. 将线程池改成单例模式5. STL、智能指针和线程安全6. 其他常见的各种锁6.…