「从零单排canal 07」 parser模块源码解析

news2025/1/11 7:07:01

本文将对canal的binlog订阅模块parser进行分析。

parser模块(绿色部分)在整个系统中的角色如下图所示,用来订阅binlog事件,然后通过sink投递到store.

parser模块应该来说是整个项目里面比较复杂的模块,代码非常多。

因此,本文根据过程中的主线来进行展开分析,从 启动 开始,进行分析。

如果读者有其他相关内容不明白的,可以给我留言,我会进行解答或者根据情况再单独写相关内容。

模块内的类如下:

重点需要关注几个核心问题

  • 如何抓取binlog
  • 对binlog消息处理做了怎样的性能优化
  • 如何控制位点信息
  • 如何兼容阿里云RDS的高可用模式下的主备切换问题

1.从启动进入parser主流程

前面的文章我们已经提到了,instance启动的是,会按照顺序启动instance的各个模块

parser模块就是在这里开始的。

这里需要注意一下,在beforeStartEventParser方法中,启动了parser的两个相关组件CanalLogPositionManager 和 CanalHAController,这里先分别介绍一下。

  • CanalLogPositionManager:管理位点信息
  • CanalHAController:instance连接源数据库的心跳检测,并实现数据库的HA(如果配置了standby的数据库)

1.1 位点信息管理CanalLogPositionManager

我们用的是default-instance.xml的配置,所以实际实现类是FailbackLogPositionManager

这里构造器有两个入参,一个是primary的MemoryLogPositionManager,一个是second的MetaLogPositionManager。

前者是内存的位点信息,后者我们我们看一下构造器的metaManager是基于zk的位点信息管理器。

所以FailbackLogPositionManager逻辑也比较简单,获取位点信息时,先尝试从内存memory中找到lastest position,如果不存在才尝试找一下zookeeper里的位点信息。

1.2 心跳控制器CanalHAController

我们用的是default-instance.xml的配置,所以实际实现类是HeartBeatHAController

HeartBeatHAController里面没有特别复杂的逻辑,就是实现了心跳检测成功的onSuccess方法和onFail方法。另外维护了一个CanalHASwitchable对象,一旦心跳检测失败超过一定次数,就执行doSwitch()进行主备切换。

前提是我们要设置了主备数据库的连接信息。

这里的代码写的真的是有点混乱,居然是用MysqlEventParser实现了这个doSwitch()方法。

另外,在MysqlEventParser中,写了一个MysqlDetectingTimeTask内部类,集成了TimerTask来做定时心跳检测。

  • 定时去连接数据库,可以通过select\desc\show\explain等方法做存活检测
  • 如果检测成功,就调用HeartBeatHAController的onSuccess方法
  • 如果失败,就HeartBeatHAController的onFail方法
  • 如果失败超过一定次数,onFail方法中就会调用doSwitch方法进行主备切换

2.核心逻辑

从parser.start()进去后,我们就来到了parser的核心。

从default-instance.xml配置文件看,默认的parser实现是从base-instance.xml的baseEventParser来的,用的是RdsBinlogEventParserProxy类。

我们看下这个类图的结构。

从这个结构来看,我们从上到下,就能对parser模块的主体逻辑进行抽丝剥茧了。

Let’s go!

2.1 CanalEventParser接口

定义了一个空的接口

2.2 AbstractEventParser抽象类

这个类里面代码非常多,我们重点关注核心流程。

2.2.1 构造器AbstractEventParser()

构造器里面就只做了一件事情,创建了一个EventTransactionBuffer。

EventTransactionBuffer这个类顾名思义就是一个缓冲buffer,它的作用源码里的注释也很清楚,它是缓冲event队列,提供按事务刷新数据的机制。

那对于这里构造器中实现的TransactionFlushCallback的flush(List<CanalEntry.Entry> transaction) 方法,肯定就是对于事务中的一系列event,刷新到store中。

我们可以看下consumeTheEventAndProfilingIfNecessary(transaction)方法,跟我们想的一样,具体的sink方法放在后面的sink模块再展开分析。

2.2.2 主干的start()方法

主要做了这些事情:

  • 初始化缓冲队列transactionBuffer
  • 初始化binlogParser
  • 启动一个新的线程进行核心工作
    • 构造Erosa连接ErosaConnection
    • 利用ErosaConnection启动一个心跳线程
    • 执行dump前的准备工作,查看数据库的binlog_format和binlog_row_image,准备一下DatabaseTableMeta
    • findStartPosition获取最后的位置信息(挺重要的,具体实现在MysqlEventParser)
    • 构建一个sinkHandler,实现具体的sink逻辑(我们可以看到,里面就是把单个event事件写入到transactionBuffer中)
    • 开始dump过程,默认是parallel处理的,需要构建一个MultiStageCoprocessor;如果不是parallel,就直接用sinkHandler处理。内部while不断循环,根据是否parallel,选择MultiStageCoprocessor或者sinkHandler进行投递。
  • 如果有异常抛出,那么根据异常类型做相关处理,然后退出sink消费,释放一下状态,sleep一段时间后重新开始

代码很长,逻辑比较清晰,就不贴了。

2.2.3 核心dump过程

dump过程默认是parallel处理的,需要构建一个MultiStageCoprocessor;如果不是parallel,就直接用sinkHandler处理。内部while不断循环,根据是否parallel,选择MultiStageCoprocessor或者sinkHandler进行投递。

注意multiStageCoprocessor在这里start启动。

代码如下

dump方法在MysqlConnection类中实现,主要就是把自己注册到数据库作为一个slave,然后获取binlog变更,具体到协议我们就不展开分析了。

通过fetcher抓取到event,然后调用sink投递到store。

注意,parallel为false的,是单线程交给sinkHandler处理,parallel为true的,交给MultiStageCoprocessor的coprocessor.publish(buffer)处理,后面展开分析下并行处理的逻辑。

注意multiStageCoprocessor在这里publish进行写入RingBuffer,下文会详细讲下这里的机制。

2.3 AbstractMysqlEventParser抽象类

这个类比较简单,就是做了根据配置做了一些对象创建和设置的工作,比如BinlogParser的构建、filter的设置等

2.4 MysqlEventParser实现类

总共有将近1000行代码,里面其实代码组织有点混乱。像前面提到的MysqlDetectingTimeTask内部类、HeartBeatHAController的部分方法实现,都是在这个类里面的。

那抛开这些来说,这个类的主要功能还是在处理根据journalName、position、timestamp等配置查找对应的binlog位点。

我们选取核心流程里面的关键逻辑 findStartPostion( ) 方法进行分析即可。

这个是AbstractEventParser类中start方法中调用的,获取dump起始位点。

我们默认是使用 非GTID mode记录位点信息的,所以直接看下来看下findStartPositionInternal( ) 具体逻辑,这里可以了解到如何正确配置位点信息:

  • logPositionManager找历史记录
  • 如果没有找到
    • 如果instance没有配置canal.instance.master.journal.name
      • 如果instance配置了canal.instance.master.timestamp,就按照时间戳去binlog查找
      • 如果没有配置timestamp,就返回数据库binlog最新的位点
    • 如果instance配置了canal.instance.master.journal.name
      • 如果instance配置了canal.instance.master.position,那就根据journalName和position获取bingo位点信息
      • 如果配置了timestamp,就用journalName + timestamp形式获取位点信息
  • 如果找到了历史记录
    • 如果历史记录的连接信息和当前连接信息一致,那么判断下是否有异常,没有异常就直接返回
    • 如果历史记录的连接信息和当前连接信息不一致,说明可能发生主备切换,就把历史记录的时间戳回退一分钟,重新查询

这里是纯if else 流程代码,挺长的,就不贴了。

在这个过程中,调用了几个有意思的方法,可以了解一下

  • findServerId( ):查询当前db的serverId信息,mysql命令为 show variables like ‘server_id’
  • findEndPosition():查询当前的binlog位置,mysql命令为 show master status
  • findStartPosition():查询当前的binlog位置,mysql命令为 show binlog events limit 1
  • findSlavePosition():查询当前的slave视图的binlog位置,mysql命令为 show slave status

2.5 RdsBinlogEventParserProxy实现类

这个类比较简单,就是canal为阿里云rds定制的一个代理实现类。

主要解决了云rds本身高可用架构下,服务端HA切换后导致的binlog位点信息切换。

所以对于抛出的异常做了一定的处理,兼容了这种服务端HA的情况。

同时,也能满足rds的备份文件指定位点开始增量消费的特性。

主要过程如下

  • 如果抛出了PositionNotFoundException异常,就委托rdsLocalBinlogEventParser进行处理
  • rdsLocalBinlogEventParser会通过下载binlog的oss备份,找到目标binlog文件和位置

3.事件处理优化 MultiStageCoprocessor

我们前面说过,parallel为false的,是单线程交给sinkHandler处理,parallel为true的,交给MultiStageCoprocessor处理。

这里展开看看并行处理是如何实现的。

实现类是MysqlMultiStageCoprocessor, 看下基本结构,持有了EventTransactionBuffer(前文提到过的存储事务内多个evnet的buffer)、RingBuffer、几个线程池、两个BatchEventProcessor。

这些属性类型基本是跟Disruptor框架相关。

start()方法里面对一系列属性做了初始化配置并进行启动,要理解这里的逻辑,其实主要是使用Disruptor框架做的任务队列。

如果了解了Disruptor框架的使用,就能明白这里所做的任务队列处理模型了。

start()源码如下:

首先,这里用了Disruptor框架的典型单生产者-多消费者模型。

这里创建生产者的时候,就创建了RingBuffer和Sequencer,全局唯一。

上面在dump方法内,订阅到binlog事件后,通过multiStageCoprocessor的publish方法写入RingBuffer,作为单一的生产者。

多消费者主要通过Disruptor的Sequencer管理。

Sequencer 接口有两种实现,SingleProducerSequencer 和 MultiProducerSequencer,分别来处理单个生产者和多个生产者的情况,这里使用了SingleProducerSequencer。

在 Sequencer 中有一个 next() 方法,就是这个方法来产生 Sequence 中的 value。Sequence 本质上可以认为是一个 AtomicLong,消费者和生产者都会维护自己的 Sequence。

Sequencer 的核心就是解决了两个问题,第一个是对于所有的消费者,在 RingBuffer 为空时,就不能再从中取数据,对于生产者,新生产的内容不能把未消费的数据覆盖掉。

上图中 C 代表消费者,P 代表生产者。

当然,在多消费者模型中,一个关键的问题是控制消费者的消费顺序。

这里主要通过消费者之间控制依赖关系其实就是控制 sequence 的大小,如果说 C2 消费者 依赖 C1,那就表示 C2 中 Sequence 的值一定小于等于 C1 的 Sequence。

具体的方法是通过RingBuffer的addGatingSequences( )进行的。

具体Disruptor的原理和使用就不展开说明了,这里了解这些关键问题即可。

通过这样的编程模型,parser实现了解析器的多阶段顺序处理。

  • Stage1: 网络接收 (单线程),publish投递到RingBuffer
  • Stage2: 从RingBuffe获取事件,使用SimpleParserStage进行基本解析 (单线程,事件类型、DDL解析构造TableMeta、维护位点信息)
  • Stage3: 事件深度解析 ,用workpool进行多线程, 使用DmlParserStage进行DML事件数据的完整解析
  • Stage4: SinkStoreStage单线程投递到store

SimpleParserStage和SinkStoreStage使用了stageExecutor这个线程池进行管理,DmlParserStage使用了workpool进行管理。

这三个类都是MysqlMultiStageCoprocessor的内部类,通过实现OnEvent方法进行逻辑处理,具体处理逻辑就不展开了,大家有兴趣可以看下源码。

4.总结

这个模块是非常核心的,涉及到了对binlog事件的抓取和处理,以及相关位点信息的处理。

回头看开头几个问题,相信也都有了答案:

  • 如何抓取binlog

dump方法在MysqlConnection类中实现,主要就是把自己注册到数据库作为一个slave,然后获取binlog变更(具体的协议我们就不展开分析了)。

  • 对binlog消息处理做了怎样的性能优化

利用disruptor框架,基于RingBuffer实现了

单线程接受 -> 单线程解析事件 -> 多线程深度解析事件 -> 单线程投递store 这样的一个流程。

(这里有点疑惑,单线程接受事件后,为什么需要一个单线程先解析一下再多线程深度解析,而不是直接多线程深度解析?有了解的朋友可以给我留言指点一下,谢谢)

  • 如何控制位点信息

有多种CanalLogPositionManager可以选择。

默认采用FailbackLogPositionManager,获取位点信息时,先尝试从内存memory中找到lastest position,如果不存在才尝试找一下zookeeper里的位点信息。

  • 如何兼容阿里云RDS的高可用模式下的主备切换问题

RdsBinlogEventParserProxy如果发现了PositionNotFoundException异常,就委托rdsLocalBinlogEventParser通过下载binlog的oss备份,找到目标binlog文件和位置。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/31525.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

Nmap爆破MySQL弱口令漏洞:解决报错Accounts: No valid accounts found

nmap工具不仅仅能扫描&#xff0c;也可以暴力破解mysql&#xff0c;ftp&#xff0c;telnet等服务。 看到这里不要怀疑&#xff0c;在kali系统中查一下到底支持哪些暴力破解功能&#xff0c;命令如下 ls /usr/share/nmap/scripts |grep brute.nse 查询结果为 afp-brute.nse …

镜像底层原理详解和基于Docker file创建镜像

目录 一、镜像底层原理 1.联合文件系统(UnionFS) 2.镜像加载原理 3.为什么Docker里的centos的大小才200M? 二、Dockerfile 1.简介 2.Dockerfile操作常用命令 &#xff08;1&#xff09;FORM 镜像 &#xff08;2&#xff09;MAINTAINER 维护人信息 &#xff08;3&…

Vue的devtools安装教程

devtools是一个便于开发者调试Vue代码的插件 先确保你已经安装了node.js 点击此处去github上拉取工具包 安装yarn&#xff08;用npm在打包的时候会失败&#xff0c;使用yarn可以打包成功&#xff09; ① cmd输入&#xff1a;npm install -g yarn 全局安装yarn包管理工具   …

Nature Plants|植物基因组测序20年回顾与展望:三代HiFi基因组时代

2021年11月29日&#xff0c;美国密歇根州立大学在《Nature Plants》期刊在线发表题为“Representation and participation across 20 years of plant genomesequencing”综述&#xff0c;系统阐述了在过去的20年间&#xff0c;对陆地植物基因组学组装质量、已测序物种的分类和地…

Java的几大常用类

一、Object类 超类、基类&#xff0c;所有类的直接或者间接父类&#xff0c;位于继承树的最顶层。 任何类&#xff0c;如果没有写 extends 显示继承某个类&#xff0c;都直接默认继承 Object 类&#xff0c;否则为间接继承。 Object 类中所定义的方法&#xff0c;是所有对象…

DeeplabV3实战:基于tensorflow搭建DeeplabV3实现语义分割任务

任务描述: 语义分割是一种典型的计算机视觉问题,其是将一些图像作为输入并将它们转换为具有突出显示的感兴趣区域的掩模,即图像中的每个像素根据其所属的感兴趣对象被分配类别。如下图中左图所示,其语义是人骑自行车,语义分割的结果如右图所示,粉红色代表人,绿色代表自行…

狂神的MySQL(1)

01、什么是数据库&#xff0c;为什么要学习数据库 javaEE&#xff1a;企业级java开发 Web 前端&#xff08;页面&#xff1a;展示&#xff0c;数据&#xff01;&#xff09; 后台&#xff08;连接点&#xff1a;连接数据库JDBC&#xff0c;连接前端&#xff08;控制&#xf…

web前端-javascript-运算符(介绍说明,算术运算符、+、-、*、/、%,隐式类型转换、转换为String、转换为Number)

文章目录运算符1. 操作符说明1.1. 通过运算符可以对一个或多个值进行运算,并获取运算结果1.2. 它会将该值的类型以字符串的形式返回2. 算数运算符2.1. 当对非 Number 的值进行运算时&#xff0c;会将这些值转换为 Number 然后再运算2.2. 2.3 -2.4 \*2.5 /2.6 %3. 隐式类型转换3…

思科配置VLAN间单臂路由

思科配置VLAN间单臂路由 为什么要配置单臂路由 路由器上链接不同的VLAN的物理接口数量有限&#xff0c;随着VLAN增加端口很快就被耗尽&#xff0c;然而VLAN中继允许单个路由器物理接口接多个VLAN的流量&#xff0c;即有了单臂路由技术。 示例拓扑 本文以下图所示拓扑为例配…

mac误删除文件恢复,mac文件丢失如何找回

mac误删除文件恢复&#xff1f;相信不少朋友一开始遇到这个问题没太放在心上&#xff0c;还以为是自己误删除了&#xff0c;想着以后删除文件的时候尽量小心点&#xff0c;直到发现重要文件被删除后没有经过废纸篓&#xff0c;而被直接删除了&#xff0c;这才引起重视&#xff…

java项目-第160期ssm大学生校园兼职系统_ssm毕业设计_计算机毕业设计

java项目-第160期ssm大学生校园兼职系统_ssm毕业设计_计算机毕业设计 【源码请到资源专栏下载】 今天分享的项目是《ssm大学生校园兼职系统》 该项目分为3个角色&#xff0c;管理员、学生、企业。 学生角色可以访问前台&#xff0c;查看企业信息、招聘信息进行应聘。 企业角色…

Android Material Design之NavigationView(五)

先上图 使用说明 NavigationView与DrawerLayout 成对出现,经常被用作侧滑菜单,因此当我们使用 NavigationView时布局文件的根节点一定是DrawerLayout 属性 NavigationView 属性描述android:id控件idandroid:layout_width控件的长度android:layout_height控件的高度android…

关于web前端大作业的HTML网页设计——我的班级网页HTML+CSS+JavaScript

&#x1f389;精彩专栏推荐&#x1f447;&#x1f3fb;&#x1f447;&#x1f3fb;&#x1f447;&#x1f3fb; ✍️ 作者简介: 一个热爱把逻辑思维转变为代码的技术博主 &#x1f482; 作者主页: 【主页——&#x1f680;获取更多优质源码】 &#x1f393; web前端期末大作业…

Flink CDC 新一代数据集成框架

前言&#xff1a; 主要讲解了技术原理&#xff0c;入门与生产实践&#xff0c;主要功能&#xff1a;全增量一体化数据集成、实时数据入库入仓、最详细的教程。Flink CDC 是Apache Flink的一个重要组件&#xff0c;主要使用了CDC技术从各种数据库中获取变更流并接入到Flink中&a…

神经网络训练不起来,怎么办?

从optimization的角度&#xff0c;Cross-entropy比Mean Square Error更加适合用在分类上。使用Cross-entropy这个Loss function的时候&#xff0c;pytorch自动帮你把Soft-max加到你的Network的最后一层。 Optimization Critical Point是Saddle Point还是Local Point&#xff1…

【蓝桥杯web】第十四届蓝桥杯(Web应用开发)模拟赛1期-大学组

数据类型检测 请看这篇数据类型检测 渐变色背景生成器 html <!DOCTYPE html> <html lang"en"><head><meta charset"UTF-8" /><meta http-equiv"X-UA-Compatible" content"IEedge" /><meta name&…

华为云安全亮相世界互联网大会

近日&#xff0c;以“共建网络世界 共创数字未来——携手构建网络空间命运共同体”为主题的世界互联网大会在浙江乌镇盛大举行。华为云安全专家团队携全新的安全运营2.0和数据安全解决方案亮相峰会现场&#xff0c;重点展示华为云在安全运营和数据方案的最新实践成果&#xff0…

小啊呜产品读书笔记001:《邱岳的产品手记-11》第21讲 产品案例分析:Fabulous的精致养成

小啊呜产品读书笔记001&#xff1a;《邱岳的产品手记-11》第21讲 产品案例分析&#xff1a;Fabulous的精致养成一、今日阅读计划二、泛读&知识摘录第21讲 产品案例分析&#xff1a;Fabulous的精致养成三、头脑风暴四、思考叮嘟&#xff01;这里是小啊呜的产品进阶读书笔记整…

病毒和战争齐飞,24 届秋招会更惨吗?

2022 壬寅年&#xff0c;病毒和战争齐飞&#xff0c;流言和混乱四起。2023 届秋招的残酷已不需要再多说&#xff0c;各大平台校招社招一片哀嚎&#xff0c;如果说往年的各种帖子是在贩卖焦虑&#xff0c;今年就是实打实的就业寒冬。 先来看段分析&#xff0c;来自阮一峰大佬&a…

刷题看力扣,刷了两个月 leetcode 算法,顺利拿下百度、阿里等大厂的 offer

随着互联网寒潮的到来, 越来越多的互联网公司提高了面试的难度&#xff0c;其中之一就是加大了面试当中手撕算法题的比例。这里说的算法题不是深度学习&#xff0c;机器学习这类的算法&#xff0c;而是排序&#xff0c;广度优先&#xff0c;动态规划这类既考核数据结构也考核编…