基于canal监听MySQL binlog实现数据增量同步

news2025/1/10 12:16:21

一、背景

业务反馈客服消息列表查询速度慢,有时候甚至要差不多20秒,急需优化提升速度。

二、方案

引入

首先,体验系统,发现查询慢的正是消息列表查询接口。

接着去看代码的设计,流程比较长,但从代码逻辑上设计没有问题什么大问题。

接着拿到查库的主SQL,发现连接的表比较多,然后在测试库看索引,索引缺了一些。加上索引,然后确定确实走了预期的索引之后就给正式库加上了索引。速度确实有了较大提升。能够进入十秒内,但因为数据量大,还是需要4~5秒左右。

不过,业务对这个已经比较满意了,但还是提出能否速度更快。优化完索引之后,可以试试上缓存,但细看了一下数据,变化频率不低,不太适合缓存。

接着就想到了本文的主角-----Elastic Search。支持高性能检索,倒排索引的机制,也更适合大数据量的场景。于是就以测试库来尝试,做引入ES的探索。

选型

ES和数据库作为不同的系统,要查询效果一致,首先就要考虑数据的同步问题。

首先对于数据库已有的数据,做全量同步。这个是没有异议的。

但对于增量数据如何处理就引出了几种常用的方案。

方案一

数据库更新的时候,直接同步更新ES。这种方式实现简单,数据同步也及时,但每处更新数据库都要加上更新ES的操作,在后续开发中,很容易会遗漏。(某些时候可能需要用SQL改数据库数据,这种操作无法更新ES)

方案二

数据库更新的时候,就将更新的操作发送到MQ,让MQ异步去更新ES。这种方案相比第一种复用性更高,可维护性更强一些,但还是有缺漏的风险。(某些时候可能需要用SQL改数据库数据,这种操作无法更新ES)

方案三

监听数据库binlog日志,只要有变更记录的操作,就同步更新ES。这种方案实现起来较复杂,但基本写完一套之后,后续基本不需要再变动。

综合考虑下来,我觉得第三种方案是最好的。在之前的学习中,我尝试了给项目引入Easy ES框架来实现ES的引入。

springboot整合easy-es实现数据的增删改查_easy es-CSDN博客

接下来就差数据的全量同步和增量同步了。全量同步可以直接通过工具来实现,而增量同步就需要工具结合代码来实现了。本文就是打算用阿里的canal来实现监听MySQL数据库的binlog日志。

三、原理

MySQL的分布式是基于主从架构实现的。一般情况下是一主多从,其中一个数据库作为主节点,其他数据库作为从节点,主从节点之间通过订阅binlog的方式实现数据同步。

canal的数据增量同步底层就是利用MySQL的主从同步机制实现的。将canal伪装成master的一个slave节点,向master节点发起dump协议,master节点在接收到dump协议之后,就会将binlog日志推送给canal,canal拿到binlog日志之后执行相应的操作从而实现数据同步。

四、配置流程

官方教程

4.1、配置MySQL

查看源MySQL的binlog配置,确保MySQL开启了binlog日志。

SHOW VARIABLES LIKE "%bin%"

日志的格式为ROW

查看源MySQL的server_id

准备好一个拥有slave权限的MySQL账号。

4.2、配置canal

1.下载canalicon-default.png?t=N7T8https://github.com/alibaba/canal/releases

2.解压,配置canal,修改文件conf/example/instance.properties

配置canal的server_id,注意要和上面查看的源MySQL的不一样。

配置源MySQL的ip+端口

配置源MySQL的账号密码

3.启动项目

在bin目录下找到对应系统的启动文件,双击启动。

/bin/startup.bat(window)

/bin/startup.sh(linux)

查看日志文件,检查是否启动成功。logs/canal/canal.log和logs/example/example.log

服务启动成功

4.3、canal集成到springboot

添加依赖

		<dependency>
			<groupId>com.alibaba.otter</groupId>
			<artifactId>canal.client</artifactId>
			<version>1.1.4</version>
		</dependency>

直接用官方的测试代码验证(不需要更改,如果canal安装在本地的话)

package org.jeecg.modules.admin.assignImClassTeacher;

import com.alibaba.otter.canal.client.CanalConnector;
import com.alibaba.otter.canal.client.CanalConnectors;
import com.alibaba.otter.canal.common.utils.AddressUtils;
import com.alibaba.otter.canal.protocol.CanalEntry.*;
import com.alibaba.otter.canal.protocol.Message;

import java.net.InetSocketAddress;
import java.util.List;


public class SimpleCanalClientExample {


	public static void main(String args[]) {
		// 创建链接
		CanalConnector connector = CanalConnectors.newSingleConnector(new InetSocketAddress(AddressUtils.getHostIp(),
				11111), "example", "", "");
		int batchSize = 1000;
		int emptyCount = 0;
		try {
			connector.connect();
			connector.subscribe(".*\\..*");
			connector.rollback();
			int totalEmptyCount = 120;
			while (emptyCount < totalEmptyCount) {
				Message message = connector.getWithoutAck(batchSize); // 获取指定数量的数据
				long batchId = message.getId();
				int size = message.getEntries().size();
				if (batchId == -1 || size == 0) {
					emptyCount++;
					System.out.println("empty count : " + emptyCount);
					try {
						Thread.sleep(1000);
					} catch (InterruptedException e) {
					}
				} else {
					emptyCount = 0;
					// System.out.printf("message[batchId=%s,size=%s] \n", batchId, size);
					printEntry(message.getEntries());
				}

				connector.ack(batchId); // 提交确认
				// connector.rollback(batchId); // 处理失败, 回滚数据
			}

			System.out.println("empty too many times, exit");
		} finally {
			connector.disconnect();
		}
	}

	/**
	 * 打印canal server解析binlog获得的实体类信息
	 */
	private static void printEntry(List<Entry> entrys) {
		for (Entry entry : entrys) {
			if (entry.getEntryType() == EntryType.TRANSACTIONBEGIN || entry.getEntryType() == EntryType.TRANSACTIONEND) {
				//开启/关闭事务的实体类型,跳过
				continue;
			}
			//RowChange对象,包含了一行数据变化的所有特征
			//比如isDdl 是否是ddl变更操作 sql 具体的ddl sql beforeColumns afterColumns 变更前后的数据字段等等
			RowChange rowChage;
			try {
				rowChage = RowChange.parseFrom(entry.getStoreValue());
			} catch (Exception e) {
				throw new RuntimeException("ERROR ## parser of eromanga-event has an error , data:" + entry.toString(), e);
			}
			//获取操作类型:insert/update/delete类型
			EventType eventType = rowChage.getEventType();
			//打印Header信息
			System.out.println(String.format("================》; binlog[%s:%s] , name[%s,%s] , eventType : %s",
					entry.getHeader().getLogfileName(), entry.getHeader().getLogfileOffset(),
					entry.getHeader().getSchemaName(), entry.getHeader().getTableName(),
					eventType));
			//判断是否是DDL语句
			if (rowChage.getIsDdl()) {
				System.out.println("================》;isDdl: true,sql:" + rowChage.getSql());
			}
			//获取RowChange对象里的每一行数据,打印出来
			for (RowData rowData : rowChage.getRowDatasList()) {
				//如果是删除语句
				if (eventType == EventType.DELETE) {
					printColumn(rowData.getBeforeColumnsList());
					//如果是新增语句
				} else if (eventType == EventType.INSERT) {
					printColumn(rowData.getAfterColumnsList());
					//如果是更新的语句
				} else {
					//变更前的数据
					System.out.println("------->; before");
					printColumn(rowData.getBeforeColumnsList());
					//变更后的数据
					System.out.println("------->; after");
					printColumn(rowData.getAfterColumnsList());
				}
			}
		}
	}

	private static void printColumn(List<Column> columns) {
		for (Column column : columns) {
			System.out.println(column.getName() + " : " + column.getValue() + "    update=" + column.getUpdated());
		}
	}



}

运行main方法

在更新后,每个字段都有一个update字段,如果值为true代表这个字段更新了,为false代表没更新。

拿到这些涉及数据库变更的事件之后,就可以根据需要去做数据的增量同步了。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/1629707.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

动手学深度学习——线性回归从零实现

1. 数据集 1.1 生成数据集 要训练模型首先要准备训练数据集&#xff0c;对于线性模型 yXwb&#xff0c;定义生成数据集的函数如下&#xff1a; def synthetic_data(w, b, num_examples): #save"""生成yXwb噪声"""# 从均值为0&#xff0c;标准…

Git重修系列 ------ Git的使用和常用命令总结

一、Git的安装和配置 git安装&#xff1a; Git - Downloads git首次配置用户信息&#xff1a; $ git config --global user.name "kequan" $ git config --global user.email kequanchanqq.com $ git config --global credential store 配置 Git 以使用本地存储机…

基于自注意力机制的长短期记忆神经网络(LSTM-SelfAttention)的回归预测

提示&#xff1a;MATLAB版本需要R2023a以上 基于自注意力机制的长短期记忆神经网络&#xff08;LSTM-SelfAttention&#xff09;是一种用于时序数据预测的模型。这个模型结合了两个不同的结构&#xff0c;即长短期记忆网络&#xff08;LSTM&#xff09;和自注意力机制&#xff…

解决HttpServletRequest中的InputStream/getReader只能被读取一次的问题

一、事由 由于我们业务接口需要做签名校验&#xff0c;但因为是老系统了签名规则被放在了Body里而不是Header里面&#xff0c;但是我们不能在每个Controller层都手动去做签名校验&#xff0c;这样不是优雅的做法&#xff0c;然后我就写了一个AOP&#xff0c;在AOP中实现签名校…

Cesium.js(1):Cesium.js简介

1 前言 现有的gis开发方向较流行的是webgis开发&#xff0c;其中Cesium是一款开源的WebGIS库&#xff0c;主要用于实时地球和空间数据的可视化和分析。它提供了丰富的地图显示和数据可视化功能&#xff0c;并能实现三维可视化开发。Cesium适用于地球科学研究、军事情报分析、航…

Java编程练习之final关键字

1.final类&#xff1a;不允许任何类继承&#xff0c;并且不允许其他人对这个类进行任何改动&#xff1b; 当被某个类设置为final类时&#xff0c;类中的所有方法都被隐式的设置为final形式&#xff0c;但是final类中的成员变量既可以被定义为final形式&#xff0c;又可以被定义…

【区块链】椭圆曲线数字签名算法(ECDSA)

本文主要参考&#xff1a; 一文读懂ECDSA算法如何保护数据 椭圆曲线数字签名算法 1. ECDSA算法简介 ECDSA 是 Elliptic Curve Digital Signature Algorithm 的简称&#xff0c;主要用于对数据&#xff08;比如一个文件&#xff09;创建数字签名&#xff0c;以便于你在不破坏它…

Maven的仓库、周期和插件

优质博文&#xff1a;IT-BLOG-CN 一、简介 随着各公司的Java项目入库方式由老的Ant改为Maven后&#xff0c;相信大家对Maven已经有了个基本的熟悉。但是在实际的使用、入库过程中&#xff0c;笔者发现挺多人对Maven的一些基本知识还缺乏了解&#xff0c;因此在此处跟大家简单地…

SpringCloud系列(19)--将服务消费者Consumer注册进Consul

前言&#xff1a;在上一章节中我们把服务提供者Provider注册进了Consul&#xff0c;而本章节则是关于如何将服务消费者Consumer注册进Consul 1、再次创建一个服务提供者模块&#xff0c;命名为consumerconsul-order80 (1)在父工程下新建模块 (2)选择模块的项目类型为Maven并选…

使用CubeMx配置GD32F303系列单片机进行DMA ADC

原理图查看 查原理图可以看到GD32F103C8T6的官方开发板GD32303C-START-V1.0的PA1没有接任何东西 使用PA1作为ADC端口 CubeMX配置ADC和时钟 配置ADC通道 启用循环模式 配置此通道ADC分频 配置ADC DMA为循环模式 配置时钟 生成项目 Keil里面的配置 选择对应的GD32型号 编译…

2024全新瀚海跑道:矢量图片迅速养号游戏玩法,每天一小时,日转现200

最初我注意到这种玩法&#xff0c;是因为最近在浏览各大平台的视频时&#xff0c;我发现了一种特殊类型的账号&#xff0c;其养号成功率高达90%。这些账号发布的视频内容和数据非常夸张&#xff0c;而且制作起来非常简单&#xff0c;任何人都可以轻松上手。这些账号主要发布矢量…

堆与优先队列——练习题

1. 数据流中的第 K 大元素 代码实现&#xff1a; 思路&#xff1a;创建一个大小为 k 的小顶堆&#xff0c;堆顶元素就是第 K 大元素 typedef struct {int *__data, *data;int size;int n; } KthLargest;#define swap(a, b) { \__typeof(a) __c (a); \(a) (b); \(b) __c; \ }…

C++ 笔试练习笔记【1】:字符串中找出连续最长的数字串 OR59

文章目录 OR59 字符串中找出连续最长的数字串题目思路分析实现代码 注&#xff1a;本次练习题目出自牛客网 OR59 字符串中找出连续最长的数字串 题目思路分析 首先想到的是用双指针模拟&#xff0c;进行检索比较输出 以示例1为例&#xff1a; 1.首先i遍历str直到遍历到数字&a…

字符串类型漏洞之updatexml函数盲注

UPDATEXML 是 MySQL 数据库中的一个函数&#xff0c;它用于对 XML 文档数据进行修改和查询。然而&#xff0c;当它被不当地使用或与恶意输入结合时&#xff0c;它可能成为 SQL 注入攻击的一部分&#xff0c;从而暴露敏感信息或导致其他安全漏洞。 在 SQL 注入攻击中&#xff0…

CentOS 9 (stream) 安装 nginx

1.我们直接使用安装命令 dnf install nginx 2.安装完成后启动nginx服务 # 启动 systemctl start nginx # 设置开机自启动 systemctl enable nginx# 重启 systemctl restart nginx# 查看状态 systemctl status nginx# 停止服务 systemctl stop nginx 3.查看版本确认安装成功…

Pytorch实现线性回归模型

在机器学习和深度学习的世界中&#xff0c;线性回归模型是一种基础且广泛使用的算法&#xff0c;简单易于理解&#xff0c;但功能强大&#xff0c;可以作为更复杂模型的基础。使用PyTorch实现线性回归模型不仅可以帮助初学者理解模型的基本概念&#xff0c;还可以为进一步探索更…

深信服超融合虚拟机备份报错显示准备备分镜像失败

问题&#xff1a;最近一段时间深信服超融合虚拟机在执行备份策略时总是报错&#xff0c;备份空间又还很富余。 解决办法&#xff1a; 1 删除备份失败虚拟机的所有备份 2 解绑该虚拟机的备份策略 可靠服务>>备份与CDP>> 找到备份策略>>点【编辑】>>…

刷机维修进阶教程---开机定屏 红字感叹号报错 写字库保资料 救砖 刷官方包保资料的步骤方法解析

在维修各种机型 中经常会遇到开机定屏 进不去系统,正常使用无故定屏进不去系统或者更新降级开机红色感叹号的一些故障机。但顾客需要报资料救砖的要求,遇到这种情况。我们首先要确定故障机型的缘由。是摔 还是更新降级 还是无故使用重启定屏等等。根据原因来对症解决。 通过…

springboot3整合redis

redis在我们的日常开发中是必不可少的&#xff0c;本次来介绍使用spring boot整合redis实现一些基本的操作&#xff1b; 1、新建一个spring boot项目&#xff0c;并导入相应的依赖&#xff1b; <dependency><groupId>org.springframework.boot</groupId><…

基于YOLOV8+Pyqt5无人机航拍太阳能电池板检测系统

1.YOLOv8的基本原理 YOLOv8是一种前沿的目标检测技术&#xff0c;它基于先前YOLO版本在目标检测任务上的成功&#xff0c;进一步提升了性能和灵活性&#xff0c;在精度和速度方面都具有尖端性能。在之前YOLO 版本的基础上&#xff0c;YOLOv8 引入了新的功能和优化&#xff0c;…