Canal 结合 SpringBoot 源码梳理

news2025/1/12 2:50:43

1、canal是什么,可以用来作什么

        canal是阿里开源的一个用于监听数据库binlog,从而实现数据同步的工具。

2、安装

        我使用的是1.1.5版本,太高的版本需要的jdk版本和mysql的驱动版本会更高,可以根据自己的环境选择。
        如果是自己玩的话安装 canal.deployer-1.1.5.tar.gz就可以了
                地址: Release v1.1.5 · alibaba/canal · GitHub

3、springboot+mysql+canal实现数据同步可以在网上找到很多博客,不在赘述

4、源码梳理

(1)、既然用到springboot,肯定有一个自动注入的autoconfigure的start。

        可以看到spring.factories会自动注入几个client。

(2)、找到一个看着顺眼的client进去看看:

  • 我选择的是SimpleClientAutoConfiguration
@Configuration
@EnableConfigurationProperties({CanalSimpleProperties.class})
@ConditionalOnBean({EntryHandler.class})
@ConditionalOnProperty(
    value = {"canal.mode"},
    havingValue = "simple",
    matchIfMissing = true
)
@Import({ThreadPoolAutoConfiguration.class})
public class SimpleClientAutoConfiguration {
    private CanalSimpleProperties canalSimpleProperties;


    public SimpleClientAutoConfiguration(CanalSimpleProperties canalSimpleProperties) {
        this.canalSimpleProperties = canalSimpleProperties;
    }


    @Bean
    public RowDataHandler<RowData> rowDataHandler() {
        return new RowDataHandlerImpl(new EntryColumnModelFactory());
    }


    @Bean
    @ConditionalOnProperty(
        value = {"canal.async"},
        havingValue = "true",
        matchIfMissing = true
    )
    public MessageHandler messageHandler(RowDataHandler<RowData> rowDataHandler, List<EntryHandler> entryHandlers, ExecutorService executorService) {
        return new AsyncMessageHandlerImpl(entryHandlers, rowDataHandler, executorService);
    }


    @Bean
    @ConditionalOnProperty(
        value = {"canal.async"},
        havingValue = "false"
    )
    public MessageHandler messageHandler(RowDataHandler<RowData> rowDataHandler, List<EntryHandler> entryHandlers) {
        return new SyncMessageHandlerImpl(entryHandlers, rowDataHandler);
    }


    @Bean(
        initMethod = "start",
        destroyMethod = "stop"
    )
    public SimpleCanalClient simpleCanalClient(MessageHandler messageHandler) {
        String server = this.canalSimpleProperties.getServer();
        String[] array = server.split(":");
        return SimpleCanalClient.builder().hostname(array[0]).port(Integer.parseInt(array[1])).destination(this.canalSimpleProperties.getDestination()).userName(this.canalSimpleProperties.getUserName()).password(this.canalSimpleProperties.getPassword()).messageHandler(messageHandler).batchSize(this.canalSimpleProperties.getBatchSize()).filter(this.canalSimpleProperties.getFilter()).timeout(this.canalSimpleProperties.getTimeout()).unit(this.canalSimpleProperties.getUnit()).build();
    }
}
        看到会注入SimpleCanalClient。并且指明了初始化方法和销毁的方法。进去看看。发现是继承了一个抽象的client,这个类是关键,内部有start和stop的具体实现。
        很明显,start就是启动一个线程 while(true)的去循环执行binlog的获取和处理。
如何获取的代码没有跟进,但是可以猜到,应该是通过连接然后去获取数据。
  • 这里着重看一下处理数据的代码:
public abstract class AbstractMessageHandler implements MessageHandler<Message> {






    private Map<String, EntryHandler> tableHandlerMap;






    private RowDataHandler<CanalEntry.RowData> rowDataHandler;




    public  AbstractMessageHandler(List<? extends EntryHandler> entryHandlers, RowDataHandler<CanalEntry.RowData> rowDataHandler) {
        this.tableHandlerMap = HandlerUtil.getTableHandlerMap(entryHandlers);
        this.rowDataHandler = rowDataHandler;
    }


    @Override
    public  void handleMessage(Message message) {
        List<CanalEntry.Entry> entries = message.getEntries();  第一步 
        for (CanalEntry.Entry entry : entries) {
            if (entry.getEntryType().equals(CanalEntry.EntryType.ROWDATA)) {  第二步
                try {
                    EntryHandler<?> entryHandler = HandlerUtil.getEntryHandler(tableHandlerMap, entry.getHeader().getTableName());   第三步
                    if(entryHandler!=null){
                        CanalModel model = CanalModel.Builder.builder().id(message.getId()).table(entry.getHeader().getTableName())
                                .executeTime(entry.getHeader().getExecuteTime()).database(entry.getHeader().getSchemaName()).build();
                        CanalContext.setModel(model);
                        CanalEntry.RowChange rowChange = CanalEntry.RowChange.parseFrom(entry.getStoreValue());
                        List<CanalEntry.RowData> rowDataList = rowChange.getRowDatasList();    第四步
                        CanalEntry.EventType eventType = rowChange.getEventType();
                        for (CanalEntry.RowData rowData : rowDataList) {
                            rowDataHandler.handlerRowData(rowData,entryHandler,eventType);
                        }
                    }
                } catch (Exception e) {
                    throw new RuntimeException("parse event has an error , data:" + entry.toString(), e);
                }finally {
                   CanalContext.removeModel();
                }


            }
        }
    }




}
  • 进入rowDataHandler.handlerRowData(maps, entryHandler, eventType);实现类选择的是RowDataHandlerImpl。
public class RowDataHandlerImpl implements RowDataHandler<CanalEntry.RowData> {






    private IModelFactory<List<CanalEntry.Column>> modelFactory;








    public RowDataHandlerImpl(IModelFactory modelFactory) {
        this.modelFactory = modelFactory;
    }


    @Override
    public <R> void handlerRowData(CanalEntry.RowData rowData, EntryHandler<R> entryHandler, CanalEntry.EventType eventType) throws Exception {
        if (entryHandler != null) {
            switch (eventType) {
                case INSERT:
                    R object = modelFactory.newInstance(entryHandler, rowData.getAfterColumnsList());
                    entryHandler.insert(object);
                    break;
                case UPDATE:
                    Set<String> updateColumnSet = rowData.getAfterColumnsList().stream().filter(CanalEntry.Column::getUpdated)
                            .map(CanalEntry.Column::getName).collect(Collectors.toSet());
                    R before = modelFactory.newInstance(entryHandler, rowData.getBeforeColumnsList(),updateColumnSet);
                    R after = modelFactory.newInstance(entryHandler, rowData.getAfterColumnsList());
                    entryHandler.update(before, after);
                    break;
                case DELETE:
                    R o = modelFactory.newInstance(entryHandler, rowData.getBeforeColumnsList());
                    entryHandler.delete(o);
                    break;
                default:
                    break;
            }
        }
    }
}
        回想一下springboot中使用canal的时候,会有一个注解@CanalTable和一个实现类EntryHandler。
        这里的代码要做的就是(1)、匹配合适的语句类型(insert、delete、update)。(2)、insert和delete只需要记录一下操作的值;update需要记录一下修改前和修改后的值。也很好理解,insert和delete回滚只需要反向重放代码就行,而update需要知道之前的数据采集重新update。
  • 进入newInstance方法,选择AbstractModelFactory:
public abstract class AbstractModelFactory<T> implements IModelFactory<T> {




    @Override
    public <R> R newInstance(EntryHandler entryHandler, T t) throws Exception {
        String canalTableName = HandlerUtil.getCanalTableName(entryHandler);
        if (TableNameEnum.ALL.name().toLowerCase().equals(canalTableName)) {
            return (R) t;
        }
        Class<R> tableClass = GenericUtil.getTableClass(entryHandler);
        if (tableClass != null) {
            return newInstance(tableClass, t);
        }
        return null;
    }




    abstract <R> R newInstance(Class<R> c, T t) throws Exception;
}

        重点来了,有两个HandlerUtil.getCanalTableName和GenericUtil.getTableClass。还记得咱们再springboot中的代码会指定 @CanalTable 处理的是那个表和EntryHandler泛型吗。
第一步判断这个EntryHandler实现类有没有指定要处理那个表,如果指定了All。那么就要就走自定义的返回值,这个返回值通常不是我们需要的。所以在使用中一定尽量指定要处理的表。
第二步需要匹配EntryHandler中的泛型类进行赋值操作了。
  • 最后进入newInstance方法:
public class EntryColumnModelFactory extends AbstractModelFactory<List<CanalEntry.Column>> {

......

    @Override
    <R> R newInstance(Class<R> c, List<CanalEntry.Column> columns) throws Exception {
        R object = c.newInstance();
        Map<String, String> columnNames = EntryUtil.getFieldName(object.getClass());
        for (CanalEntry.Column column : columns) {
            String fieldName = columnNames.get(column.getName());
            if (StringUtils.isNotEmpty(fieldName)) {
                FieldUtil.setFieldValue(object, fieldName, column.getValue());
            }
        }
        return object;
    }




}

        代码比较简单,通过反射给对象赋值。如果不太清楚这里是怎么把数据解析出来的,可以自己搭建起来服务执行一下看看canal返回的结构体,我下边也提出来我的返回,并且我也会将上边代码中和数据解析的地方标红。
获取消息 Message[id=14,entries=[header {
  version: 1
  logfileName: "mysql-bin.000004"
  logfileOffset: 19806
  serverId: 1
  serverenCode: "UTF-8"
  executeTime: 1706838103000
  sourceType: MYSQL
  schemaName: ""
  tableName: ""
  eventLength: 80
}
entryType: TRANSACTIONBEGIN
storeValue: " 9"
, header {
  version: 1
  logfileName: "mysql-bin.000004"
  logfileOffset: 19939
  serverId: 1
  serverenCode: "UTF-8"
  executeTime: 1706838103000
  sourceType: MYSQL
  schemaName: "test"
  tableName: "first"
  eventLength: 53
  eventType: INSERT
  props {
    key: "rowsCount"
    value: "1"
  }
}
entryType: ROWDATA
storeValue: "\b\341\001\020\001P\000b\203\001\022\"\b\000\020\373\377\377\377\377\377\377\377\377\001\032\002id \001(\0010\000B\0016R\006bigint\022%\b\001\020\f\032\aaddress \000(\0010\000B\003333R\vvarchar(10)\0226\b\002\020]\032\vcreate_time \000(\0010\000B\0232024-02-02 09:41:43R\bdatetime"
, header {
  version: 1
  logfileName: "mysql-bin.000004"
  logfileOffset: 19992
  serverId: 1
  serverenCode: "UTF-8"
  executeTime: 1706838103000
  sourceType: MYSQL
  schemaName: ""
  tableName: ""
  eventLength: 31
}
entryType: TRANSACTIONEND
storeValue: "\022\006381841"
],raw=false,rawEntries=[]]

        至此,在springboot中通过canal获取binlog的日志并且解析为自定义的entry对象的流程就已经分析、梳理完了。至于后续要怎么处理就有很多的方式了。
        最后在分享一个idea跟踪源码的小技巧:
        比如我们看到一个比较重要的注解,但是不知道这个注解具体实现在哪里,可以进入注解中,选中注解名称,然后选择Find Usages。就可以看到哪里使用了。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/1429647.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

24.云原生之ArgoCD钩子

云原生专栏大纲 文章目录 Argo CD钩子如何定义钩子钩子删除策略 Argo CD钩子 Argo CD 是一个用于部署和管理 Kubernetes 应用程序的工具&#xff0c;它提供了一种声明式的方式来定义和自动化应用程序的部署过程。Argo CD 钩子&#xff08;Hooks&#xff09;是一种机制&#x…

MySQL-- if()函数 简单明了

if 主要有&#xff1a;IF函数嵌套和IF聚合函数 两类&#xff0c;主要是用来根据条件返回不同值。 基本语法为; IF(条件表达式,值1,值2)如果条件表达式为True&#xff0c;返回值1&#xff0c;为False,返回值2.返回值可以是任何值&#xff0c;比如&#xff1a;数值&#xff0c;…

消息中间件之RocketMQ源码分析(五)

消费进度保存机制 消费者启动时会同时启动位点管理器&#xff0c;RocketMQ设计了远程位点管理和本地位点管理 两种位点管理方式. 集群消费时&#xff0c;位点由客户端提交给Broker保存. 广播消费时&#xff0c;位点保存在消费者本地磁盘上 OffsetStore接口核心方法 void load(…

重写Sylar基于协程的服务器(4、协程调度模块的设计)

重写Sylar基于协程的服务器&#xff08;4、协程调度模块的设计&#xff09; 重写Sylar基于协程的服务器系列&#xff1a; 重写Sylar基于协程的服务器&#xff08;0、搭建开发环境以及项目框架 || 下载编译简化版Sylar&#xff09; 重写Sylar基于协程的服务器&#xff08;1、日…

智慧文旅:驱动文化与旅游融合发展的新动力

随着科技的快速发展和人们生活水平的提高&#xff0c;文化和旅游的融合成为了时代发展的必然趋势。智慧文旅作为这一趋势的引领者&#xff0c;通过先进的信息技术手段&#xff0c;推动文化与旅游的深度融合&#xff0c;为产业的发展注入新的活力。本文将深入探讨智慧文旅如何成…

被人疯狂吐槽的预制菜,居然是资本看重的“万亿级”市场?

被人疯狂吐槽的预制菜&#xff0c;居然是资本看重的“万亿级”市场&#xff1f; 文丨微三云营销总监胡佳东&#xff0c;点击上方“关注”&#xff0c;为你分享市场商业模式电商干货。 - 大家是不是以为只有被天天吐槽难吃的外卖和小饭店&#xff0c;才会用预制菜&#xff0c;…

【LeetCode】17. 电话号码的字母组合(中等)——代码随想录算法训练营Day25

题目链接&#xff1a;17. 电话号码的字母组合 题目描述 给定一个仅包含数字 2-9 的字符串&#xff0c;返回所有它能表示的字母组合。答案可以按 任意顺序 返回。 给出数字到字母的映射如下&#xff08;与电话按键相同&#xff09;。注意 1 不对应任何字母。 示例 1&#xff…

基于springboot校园二手书交易管理系统源码和论文

在Internet高速发展的今天&#xff0c;我们生活的各个领域都涉及到计算机的应用&#xff0c;其中包括乐校园二手书交易管理系统的网络应用&#xff0c;在外国二手书交易管理系统已经是很普遍的方式&#xff0c;不过国内的管理系统可能还处于起步阶段。乐校园二手书交易管理系统…

dubbo+sentinel最简集成实例

说明 在集成seata后&#xff0c;下面来集成sentinel进行服务链路追踪管理&#xff5e; 背景 sample-front网关服务已配置好 集成 一、启动sentinel.jar 1、官网下载 选择1:在本地启动 nohup java -Dserver.port8082 -Dcsp.sentinel.dashboard.serverlocalhost:8082 -Dp…

【C语言】数组的应用:扫雷游戏(包含扩展和标记功能)附完整源代码

这个代码还是比较长的&#xff0c;为了增加可读性&#xff0c;我们还是把他的功能分装到了test.c&#xff0c;game.c&#xff0c;game.h里面。 扫雷游戏的规则相信大家来阅读本文之前已经知晓了&#xff0c;如果点到雷就输了&#xff0c;如果不是雷&#xff0c;点到的格子会显…

Kotlin快速入门系列8

Kotlin的泛型 与Java一样&#xff0c;Kotlin也提供泛型。泛型&#xff0c;即 "参数化类型"&#xff0c;将类型参数化&#xff0c;可以用在类&#xff0c;接口&#xff0c;方法上。可以为类型安全提供保证&#xff0c;消除类型强转的烦恼。声明泛型类的格式如下&…

关于反爬虫的的概述

目录 前言 一、验证码验证 二、IP限制 三、User-Agent限制 四、动态页面加载 总结 前言 反爬虫是一种防止网站被自动程序&#xff08;爬虫&#xff09;访问和抓取数据的技术手段。在网络爬虫的发展和使用过程中&#xff0c;有一部分爬虫是用于非法获取网站数据、侵犯隐私…

重写Sylar基于协程的服务器(5、IO协程调度模块的设计)

重写Sylar基于协程的服务器&#xff08;5、IO协程调度模块的设计&#xff09; 重写Sylar基于协程的服务器系列&#xff1a; 重写Sylar基于协程的服务器&#xff08;0、搭建开发环境以及项目框架 || 下载编译简化版Sylar&#xff09; 重写Sylar基于协程的服务器&#xff08;1、…

【C/C++ 10】扫雷小游戏

一、题目 写一个扫雷小游戏&#xff0c;每次输入一个坐标&#xff0c;若该处是地雷&#xff0c;则游戏失败&#xff0c;若该处不是地雷&#xff0c;则显示周围地雷数量&#xff0c;若扫除全部非地雷区域&#xff0c;则扫雷成功。 二、算法 设置两张地图&#xff08;二维数组&…

校园墙表白墙系统uniapp微信小程序

配置文件 (自动编号、配置参数名称、配置参数值)&#xff1b; 前端开发:vue 语言&#xff1a;javapythonnodejsphp均支持 运行软件:idea/eclipse/vscode/pycharm/wamp均支持 框架支持:Ssm/django/flask/thinkphp/springboot/springcloud均支持 数据库 mysql 数据库工具&#x…

洛谷P1002 过河卒(简单DP)

[NOIP2002 普及组] 过河卒 题目描述 棋盘上 A A A 点有一个过河卒&#xff0c;需要走到目标 B B B 点。卒行走的规则&#xff1a;可以向下、或者向右。同时在棋盘上 C C C 点有一个对方的马&#xff0c;该马所在的点和所有跳跃一步可达的点称为对方马的控制点。因此称之为…

PKG系统安装包及IPSW固件:MacOS 11-14 Sonoma 正式版

MacOS 14 Sonoma&#xff0c;为提高生产力和创造力带来了全新的功能&#xff0c;有了更多使用小部件和令人惊叹的新屏幕保护程序进行个性化设置的方法&#xff0c;对Safari浏览器和视频会议进行了重大更新&#xff0c;以及优化的游戏体验——Mac体验比以往任何时候都更好。 mac…

2024美赛预测算法 | 回归预测 | Matlab基于RIME-LSSVM霜冰算法优化最小二乘支持向量机的数据多输入单输出回归预测

2024美赛预测算法 | 回归预测 | Matlab基于RIME-LSSVM霜冰算法优化最小二乘支持向量机的数据多输入单输出回归预测 目录 2024美赛预测算法 | 回归预测 | Matlab基于RIME-LSSVM霜冰算法优化最小二乘支持向量机的数据多输入单输出回归预测预测效果基本介绍程序设计参考资料 预测效…

计算huggingface模型占用硬盘空间的实战代码

大家好,我是herosunly。985院校硕士毕业,现担任算法研究员一职,热衷于机器学习算法研究与应用。曾获得阿里云天池比赛第一名,CCF比赛第二名,科大讯飞比赛第三名。拥有多项发明专利。对机器学习和深度学习拥有自己独到的见解。曾经辅导过若干个非计算机专业的学生进入到算法…

【Java程序设计】【C00232】基于Springboot的抗疫物资管理系统(有论文)

基于Springboot的抗疫物资管理系统&#xff08;有论文&#xff09; 项目简介项目获取开发环境项目技术运行截图 项目简介 这是一个基于Springboot的抗疫物资管理系统 用户主要分为管理员和普通用户 管理员&#xff1a; 管理员可以对后台数据进行管理、拥有最高权限、具体权限有…