实时同步:使用 Canal 和 Kafka 解决 MySQL 与缓存的数据一致性问题

news2025/1/19 11:20:25

目录

1. 准备工作

2. 将需要缓存的数据存储 Redis

3. 监听 canal 存储在 Kafka Topic 中数据


1. 准备工作

1. 开启并配置MySQL的 BinLog(MySQL 8.0 默认开启)

修改配置:C:\ProgramData\MySQL\MySQL Server 8.0\my.ini

log-bin="HELONG-bin"
binlog_format=ROW     # 只能配置行模式, 因为 Cannal 不具备将SQL转化成数据的能力
binlog-do-db=aicloud    # 监控 AI Cloud 项目

如果要同步多个项目:

binlog-do-db=aicloud
binlog-do-db=aicloud2
binlog-do-db=aicloud3

2. 重启MySQL服务

3. 赋值数据同步权限

CREATE USER canal IDENTIFIED BY 'canal';  
GRANT SELECT, REPLICATION SLAVE, REPLICATION CLIENT ON *.* TO 'canal'@'%';
FLUSH PRIVILEGES;

4. 安装并配置 Canal

下载地址:https://github.com/alibaba/canal/releases

① 修改canal.properties

canal.serverMode=kafka
canal.mq.servers=127.0.0.1:9092

canal 监控 binlog 日志,binlog 日志的传输默认使用 MySQL 的复制协议(基于 TCP/IP),

可以使用写代码的方式直接从 MySQL 服务器读取数据,此处使用本地 kafka 进行存储。

② 修改instance.properties

canal.instance.mysql.slaveId=100   # 大于 1 即可
canal.instance.master.address=127.0.0.1:3306
canal.mq.topic=ai-cloud-canal-to-kafka

slaveId 表示从节点 id,canal 的执行原理就是伪装成一个从库去主库同步数据

(主节点的 slaveId = 1)

address 配置连接本地的 MySQL

topic 配置数据发送到 Kafka 的某个主题下

5. 拷贝 Jar 包到 lib

将 canal 下 plugin 下的所有 jar 包拷贝到 lib 目录下。

6. 删除 bin 目录下 startup.bat 里的参数

如果启动时报错:

Unrecognized VM option 'PermSize=128m'

Error: Could not create the Java Virtual Machine.

Error: A fatal exception has occurred. Program will exit.

删除 -XX:PermSize=128m 参数即可。

7. 启动 canal

打开 cmd ,cd 到 bin 目录下,输入 startup.bat 回车

2. 将需要缓存的数据存储 Redis

此时我将这个查询列表接口的数据,存储在 Redis 中:

/**
 * 获取历史聊天记录(对话/绘图)
 *
 * @param type
 * @return {@link ResponseEntity }
 */
@RequestMapping("/list")
public ResponseEntity getHistoryList(Integer type, Integer model) {
    String listCacheKey = RedisUtil.getListCacheKey(SecurityUtil.getCurrentUser().getUid(), model, type);
    Object list = redisTemplate.opsForValue().get(listCacheKey);
    if (ObjectUtil.isNull(list)) {
        LambdaQueryWrapper<Answer> queryWrapper = new LambdaQueryWrapper<>();
        queryWrapper.eq(Answer::getUid, SecurityUtil.getCurrentUser().getUid());
        queryWrapper.eq(Answer::getType, type);
        queryWrapper.eq(Answer::getModel, model);
        queryWrapper.orderByDesc(Answer::getAid);
        List<Answer> answerList = answerService.list(queryWrapper);
        List<Long> userIds = answerList.stream().map(Answer::getUid).collect(Collectors.toList());
        Map<Long, User> userIdMap = userService.selectByIds(userIds).stream().collect(Collectors.toMap(User::getUid, Function.identity()));
        List<AnswerVo> answerVoList = answerList.stream().map(answer -> AnswerVoUtil.getListAnswerVo(answer, userIdMap)).collect(Collectors.toList());
        // 缓存 1 天
        redisTemplate.opsForValue().set(listCacheKey, answerVoList, 1, TimeUnit.DAYS);
        return ResponseEntity.success(answerVoList);
    } else {
        return ResponseEntity.success(list);
    }
}
/**
 * 查询列表存储 Redis 缓存
 *
 * @param uid
 * @param model
 * @param type
 * @return {@link String }
 */
public static String getListCacheKey(Long uid, Integer model, Integer type) {
    return "LIST_CACHE_KEY_" + uid + "_" + model + "_" + type;
}

3. 监听 Kafka Topic 中数据并删除 Redis 缓存

首先对数据库中需要缓存的数据进行一些修改操作:

此时,使用 kafka ui(下载地址划到最底下),刷新 kafka 对应 topic 下的 message,就可以看到当前所作出的修改:

执行修改操作:将 “如何学习Spring???”修改成 “如何学习Spring??”

执行删除操作:

由此可见,对数据库的每一个修改操作,都是对应固定格式的一个数据,所以可以监听对应的  topic 并针对 data 中的数据进行一个提取,得到一个  cacheKey,然后删除对应的缓存,使得下一次的查询去访问数据库,并同步缓存。

【代码示例】

/**
 * canal 监控 binlog 日志,将修改的数据存储 kafka topic 中
 * 监听 kafka topic 中的数据
 *
 * @param data
 * @param ack
 * @throws JsonProcessingException
 */
@KafkaListener(topics = {KafkaConstant.CANAL_TOPIC})
public void canalListen(String data, Acknowledgment ack) throws JsonProcessingException {
    HashMap<String, Object> map = objectMapper.readValue(data, HashMap.class);
    if (map.isEmpty()) {
        ack.acknowledge();
        return;
    }
    // 匹配上对应的数据库和数据表
    if (KafkaConstant.TARGET_DATABASE.equals(map.get(KafkaConstant.DATABASE_KEY).toString()) &&
            KafkaConstant.TARGET_TABLE.equals(map.get(KafkaConstant.TABLE_KEY).toString())) {
        // 更新缓存 
        List<Map<String, Object>> list = (List<Map<String, Object>>) map.get(KafkaConstant.DATA_KEY);
        if (!CollectionUtils.isEmpty(list)) {
            for (Map<String, Object> answerMap : list) {
                String answerListCacheKey = RedisUtil.getListCacheKey(
                        Long.valueOf(answerMap.get("uid").toString()),
                        Integer.parseInt(answerMap.get("model").toString()),
                        Integer.parseInt(answerMap.get("type").toString()));
                // 删除缓存,让下一次查询走数据库,并同步缓存
                redisTemplate.delete(answerListCacheKey);
            }
        }
    }
    //  手动确认应答
    ack.acknowledge();
}
/**
 * canal 同步数据到 kafka
 */
public static final String CANAL_TOPIC = "ai-cloud-canal-to-kafka";


/**
 * 数据库,缓存数据一致性的
 */

public static final String DATABASE_KEY = "database";

public static final String TABLE_KEY = "table";

public static final String DATA_KEY = "data";

public static final String TARGET_DATABASE = "aicloud";

public static final String TARGET_TABLE = "answer";

【补充】

kafka ui 下载地址:​​​​​​https://github.com/provectus/kafka-ui/tags

修改配置

kafka:
  clusters:
    - name: kafka3_cluster
      bootstrapServers: 127.0.0.1:9092

 

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/1949309.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

STM32——GPIO(LED闪烁)

一、什么是GPIO&#xff1f; GPIO&#xff08;通用输入输出接口&#xff09;&#xff1a; 1.GPIO 功能概述 GPIO 是通用输入/输出&#xff08;General Purpose I/O&#xff09;的简称&#xff0c;既能当输入口使用&#xff0c;又能当输出口使用。端口&#xff0c;就是元器件…

HTML常用的转义字符——怎么在网页中写“<div></div>”?

一、问题描述 如果需要在网页中写“<div></div>”怎么办呢&#xff1f; 使用转义字符 如果直接写“<div></div>”&#xff0c;编译器会把它翻译为块&#xff0c;类似的&#xff0c;其他的标签也是如此&#xff0c;所以如果要在网页中写类似于“<div…

docker 构建 mongodb

最近需要在虚拟机上构建搭建mongo的docker容器&#xff0c;搞了半天老有错&#xff0c;归其原因&#xff0c;是因为现在最新的mango镜像的启动方式发生了变化&#xff0c;故此现在好多帖子&#xff0c;就是错的。 ok&#xff0c;话不多说&#xff1a; # 拉取最新镜像&#xf…

SpringBoot 使用easypoi.excel实现导入解析数据,并结合数据字典实现对数据的校验

在日常开发工作中避免不了的功能需求&#xff1a;导入Excel文件&#xff0c;然而导入文件流操作、对数据的校验有是件麻烦事&#xff0c;自从接触了easypoi后&#xff0c;觉得封装的很好&#xff0c;很简洁。 使用的主要依赖如下&#xff1a; <dependency><groupId&…

Unity3D结合AI教育大模型 开发AI教师 AI外教 AI英语教师案例

自2022年底ChatGPT引爆全球之后&#xff0c;大模型技术便迎来了一段崭新的快速发展期&#xff0c;由其在GPT4.0发布后&#xff0c;AI与教育领域结合产品研发、已成为教育AI科技竞争的新高地、未来产业的新赛道、经济发展的新引擎和新产品的诞生地。 据不完全统计&#xff0c;目…

代码随想录 day 22 回溯

第七章 回溯算法part01 理论基础 其实在讲解二叉树的时候&#xff0c;就给大家介绍过回溯&#xff0c;这次正式开启回溯算法&#xff0c;大家可以先看视频&#xff0c;对回溯算法有一个整体的了解。 题目链接/文章讲解&#xff1a;https://programmercarl.com/%E5%9B%9E%E6%B…

pdf格式过大怎么样变小 pdf文件过大如何缩小上传 超实用的简单方法

面对体积庞大的 PDF 文件&#xff0c;我们常常需要寻找有效的方法来缩减其大小。这不仅能够优化存储空间&#xff0c;还能提升文件的传输和打开速度。PDF文件以其稳定性和跨平台兼容性成为工作和学习中的重要文件格式。然而&#xff0c;当我们需要通过邮件发送或上传大文件时&a…

力扣94题(java语言)

题目 思路 使用一个栈来模拟递归的过程&#xff0c;以非递归的方式完成中序遍历(使用栈可以避免递归调用的空间消耗)。 遍历顺序步骤&#xff1a; 遍历左子树访问根节点遍历右子树 package algorithm_leetcode;import java.util.ArrayList; import java.util.List; import…

无人机之环保监控篇

随着科技的不断进步&#xff0c;无人机作为一种创新的技术手段&#xff0c;在环保监控领域发挥着越来越重要的作用。 一、覆盖范围广 无人机能够轻松覆盖广阔的地理区域&#xff0c;无论是偏远的山区、广袤的森林还是大型的工业园区。相比传统的地面检测方式&#xff0c;其不…

vue3 常用的知识点

setup:容许在script当中书写组合式API 并且vue3的template不再要求唯一的根元素 <script setup>const name app; </script>组合式API的用法&#xff1a; 可以直接在script标签中定义变量或者函数&#xff0c;然后直接在template当中使用 <template>{{mes…

机器学习 | 回归算法原理——多重回归

Hi&#xff0c;大家好&#xff0c;我是半亩花海。接着上次的多项式回归继续更新《白话机器学习的数学》这本书的学习笔记&#xff0c;在此分享多重回归这一回归算法原理。本章的回归算法原理基于《基于广告费预测点击量》项目&#xff0c;欢迎大家交流学习&#xff01; 目录 一…

从零入门 AI for Science(AI+药物) #Datawhale AI 夏令营

使用平台 我的Notebook 魔搭社区 https://modelscope.cn/my/mynotebook/preset 主要操作 运行实例&#xff0c;如果有时长尽量选择方式二&#xff08;以下操作基于方式二的实例实现&#xff09; 创建文件夹&#xff0c;并重命名为 2.3siRNA 上传两个文件 到文件夹&#…

android手势监听

关于作者&#xff1a;CSDN内容合伙人、技术专家&#xff0c; 从零开始做日活千万级APP。 专注于分享各领域原创系列文章 &#xff0c;擅长java后端、移动开发、商业变现、人工智能等&#xff0c;希望大家多多支持。 未经允许不得转载 目录 一、导读二、概览三、使用四、 如何实…

数据库窗口函数实战

目录 前言 窗口函数语法 创建测试表和数据 使用示例 PARTITION BY 窗口函数 ROW_NUMBER RANK DENSE_RANK RANGE ROWS 前言 SQL 具有很高的灵活性&#xff0c;可以根据需求进行复杂的数据查询和分析&#xff0c;支持多表联合查询&#xff08;join&#xff09;、排序…

[Unity] ShaderGraph实现镜头加速线/残血效果 URP

效果如下所示&#xff1a;残血状态时&#xff0c;画面会压暗角&#xff0c;并出现速度线营造紧迫感。 使用到的素材如下&#xff0c;换别的当然也可以。[这是张白色的png放射图&#xff0c;并非皇帝的新图hhh] 这个效果的实现逻辑&#xff0c;其实就是利用time向圆心做透明度的…

学习笔记-系统框图简化求传递函数公式例题

简化系统结构图求系统传递函数例题 基础知识回顾 第四讲 控制系统的方框图 (zhihu.com) 「自控原理」2.3 方框图的绘制及化简_方框图化简-CSDN博客 自动控制原理笔记-结构图及其等效变换_结构图等效变换-CSDN博客 例子一 「自控原理」2.3 方框图的绘制及化简_方框图化简-CS…

【ARM】MDK-ARM软件开发工具的最终用户许可协议获取

【更多软件使用问题请点击亿道电子官方网站】 1、 文档目标 了解MDK-ARM系列产品内软件开发工具的最终用户许可协议的获取。 2、 问题场景 对于部分外企客户需要软件开发工具的最终用户许可协议作为产品资料&#xff0c;以便附录并说明。 3、软硬件环境 1&#xff09;、软件…

Axure怎么样?全面功能评测与用户体验分析!

软件 Axure 曾经成为产品经理必备的原型设计工具&#xff0c;被认为是专门为产品经理设计的工具。但事实上&#xff0c;软件 Axure 的使用场景并不局限于产品经理构建产品原型。UI/UX 设计师还可以使用 Axure 软件构件应用程序 APP 原型&#xff0c;网站设计师也可以使用 Axure…

快速上手,spring boot3整合task实现定时任务

在已经上线的项目中&#xff0c;定时任务是必不可少的。基于spring boot自动装配的原理&#xff0c;我们要集成task定时任务还是非常简单的。只需要简单的两步就可以实现。 1、创建一个spring boot项目&#xff0c;并在项目的启动类&#xff08;也不一定非要是启动类&#xff…

二手车小程序

本文来自&#xff1a;FastAdmin二手车小程序 - 源码1688 一款基于ThinkPHPFastAdmin开发的原生微信小程序二手车管理系统。 前端小程序码&#xff1a; 后台演示地址&#xff1a; https://facars.site100.cn/OHNYSKzuba.php/carswxsys/sysinit?refaddtabs