基于SpringBoot实现MySQL与Redis的数据最终一致性

news2025/1/15 12:42:36

问题场景

在并发场景下,MySQL和Redis之间的数据不一致性可能成为一个突出问题。这种不一致性可能由网络延迟、并发写入冲突以及异常情况处理等因素引起,导致MySQL和Redis中的数据在某些时间点不同步或出现不一致的情况。数据一致性问题的级别可以分为三种:

  • 强一致性:写入何值,读出何值,但在实现中,性能较差。
  • 弱一致性:写入新数据后,承诺在某个时间级别(分、秒、毫秒)后,达到数据一致。
  • 最终一致性:写入新数据后,承诺在规定时间内达到数据一致。

解决方案

强一致性: 强一致性解决方案在高并发场景下实现过于苛刻,本案例暂不讨论。

弱一致性: 一致性的解决方案可以使用“先写MySQL,再删除Redis”策略,这种方案在极限条件下有不一致的可能性,但结合需求和技术实现可以综合评判。弱一致性的应用场景如:社交平台点赞功能,用户可以实时看到点赞的更新,尽管MySQL和Redis可能存在短暂的数据不一致。

最终一致性: 采用“先写MySQL,通过MySQL的Binlog特性,异步写入Redis”。这种方案一般适用于库存、金融等业务场景,但是需要建立相关失败重试、告警、补偿机制,以及容灾措施。

在本案例中,弱一致性采用 Cache Aside 方案,最终一致性采用阿里巴巴开源组件 canal 实现。

Cache Aside

  1. 该方案在读取数据库时,首先从缓存中查询数据库:
    • 如果缓存中存在数据,则直接返回给应用程序。
    • 如果缓存中不存在数据,则从数据库中读取数据,并将数据存储到缓存中,然后返回给应用程序。
  1. 写入数据时,先更数据库的数据,当数据库更新成功后,再删除缓存中的数据。

Cache Aside注意事项
  • 缓存失效:缓存中的数据可能会过期或失效,需要考虑设置合适的缓存过期时间,或使用合适的缓存失效策略(如LRU)来管理缓存中的数据。
  • 缓存穿透:当请求查询一个不存在的数据时,会导致缓存层无法命中,从而直接访问数据库。为了避免缓存穿透问题,可以使用空值缓存或布隆过滤器等技术来减轻数据库的负载。

综上所述,Cache Aside方案适用于读取频率较高、对数据实时性要求不高的场景,通过合理地使用缓存来提高系统性能和扩展性,并通过维护数据的一致性来避免数据不一致的问题。

Cache Aside demo

基于Cache Aside实现点赞功能。

实体类信息

public class Like {
    private String postId;
    private int likeCount;

    // 构造函数、getter和setter方法
}

逻辑层

@Service
public class LikeService {
    private final LikeRepository likeRepository;
    private final RedisUtils redisUtils;

    public LikeService(LikeRepository likeRepository, RedisUtils redisUtils) {
        this.likeRepository = likeRepository;
        this.redisUtils = redisUtils;
    }

    public Like getLikeInfo(String postId) {
        String cacheKey = "like:" + postId;

        // 从缓存中获取点赞信息
        Like like = (Like) redisUtils.get(cacheKey);

        // 如果缓存中不存在,则从持久层(数据库)获取
        if (like == null) {
            like = likeRepository.findByPostId(postId);

            // 如果数据库中存在数据,则保存到缓存中
            if (like != null) {
                redisUtils.set(cacheKey, like);
            }
        }

        // 如果点赞信息为空,则初始化为0
        if (like == null) {
            like = new Like(postId, 0);
        }

        return like;
    }

    public void addLike(String postId) {
        String cacheKey = "like:" + postId;

        // 在持久层(数据库)新增点赞信息
        Like like = likeRepository.findByPostId(postId);

        if (like == null) {
            like = new Like(postId, 1);
        } else {
            like.setLikeCount(like.getLikeCount() + 1);
        }

        likeRepository.save(like);

        // 更新缓存中的数据
        redisUtils.set(cacheKey, like);
    }
}

canal

引用canal官方说明:

canal [kə’næl] ,译意为水道/管道/沟渠,主要用途是基于 MySQL 数据库增量日志解析,提供增量数据订阅和消费

早期阿里巴巴因为杭州和美国双机房部署,存在跨机房同步的业务需求,实现方式主要是基于业务 trigger 获取增量变更。从 2010 年开始,业务逐步尝试数据库日志解析获取增量变更进行同步,由此衍生出了大量的数据库增量订阅和消费业务。

基于日志增量订阅和消费的业务包括

  • 数据库镜像
  • 数据库实时备份
  • 索引构建和实时维护(拆分异构索引、倒排索引等)
  • 业务 cache 刷新
  • 带业务逻辑的增量数据处理

当前的 canal 支持源端 MySQL 版本包括 5.1.x , 5.5.x , 5.6.x , 5.7.x , 8.0.x

前置知识:MySQL主从复制原理
  • MySQL master 将数据变更写入二进制日志( binary log, 其中记录叫做二进制日志事件binary log events,可以通过 show binlog events 进行查看)
  • MySQL slave 将 master 的 binary log events 拷贝到它的中继日志(relay log)
  • MySQL slave 重放 relay log 中事件,将数据变更反映它自己的数据
canal工作原理
  • canal 模拟 MySQL slave 的交互协议,伪装自己为 MySQL slave ,向 MySQL master 发送dump 协议
  • MySQL master 收到 dump 请求,开始推送 binary log 给 slave (即 canal )
  • canal 解析 binary log 对象(原始为 byte 流)
环境搭建

需要的开发环境:

  • MySQL
  • Redis
  • Canal

特别说明:canal只支持JDK 8和JDK 11,如果您在本地物理机安装,请切换JDK默认版本。笔者更建议您使用Docker安装开发环境,由于canal安装后需要修改的配置较多,可以通过Docker-Compose安装。

那么,麻烦ChatGPT写一个Docker-Compose文件吧:

  • version请按本地安装的Docker-Compose版本定义。
  • Docker-Compose安装请自行查询。
version: '2.4'

services:
  mysql:
    image: mysql:8.0
    container_name: mysql
    restart: false
    environment:
      MYSQL_ROOT_PASSWORD: root
    ports:
      - "33060:3306"
    volumes:
      - ./mysql-data:/var/lib/mysql

  canal:
    image: canal/canal-server:v1.1.5
    container_name: canal
    restart: false
    ports:
      - "11111:11111"
      - "11112:11112"
    depends_on:
      - mysql
    environment:
      - canal.destinations=example
      - canal.instance.mysql.slaveId=1234
      - canal.instance.master.address=mysql:3306
      - canal.instance.dbUsername=root
      - canal.instance.dbPassword=root
      - canal.instance.connectionCharset=UTF-8
      - canal.instance.tsdb.enable=false
      - canal.instance.gtidon=false
      - canal.instance.filter.regex=.*
      - canal.instance.filter.black.regex=mysql\.slave_.*
      
      
  redis:
    image: redis:latest
    restart: always
    ports:
      - 6379:6379
    volumes:
      - ./redis_data:/data

将文件命名为:docker-compose.yml,开始安装。

docker-compose up -d

本案例使用balance余额表来演示,数据库表设计如下:

CREATE TABLE `balance` (
  `id` varchar(50) NOT NULL COMMENT '主键',
  `account` varchar(50) NOT NULL COMMENT '账户',
  `amount` decimal(10,2) NOT NULL COMMENT '金额',
  PRIMARY KEY (`id`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_0900_ai_ci 
COMMENT='余额表';
开发环境
  • JDK 17
  • SpringBoot 3.1.2
  • MyBatis-Plus 3.5.3.1
  • druid
  • lettuce

开发环境根据您的实际需要选择即可。

环境启动后,进入编码阶段。

/**
 * @author: liu_pc
 * Date:        2023/8/25
 * Description: 余额信息变更Redis变成处理类
 * Version:     1.0
 */
@Component
public class BalanceRedisProcessorService implements EntryHandler<Balance>, Runnable {

    private final Logger logger = LoggerFactory.getLogger(BalanceRedisProcessorService.class);

    private final RedisUtils redisUtils;

    private final CanalConfig canalConfig;

    private final Executor executor;

    @Value("${canal.server.open}")
    private boolean open;

    @Autowired
    public BalanceRedisProcessorService(RedisUtils redisUtils,
                                        CanalConfig canalConfig,
                                        @Qualifier("ownThreadPoolExecutor") Executor executor) {
        this.redisUtils = redisUtils;
        this.canalConfig = canalConfig;
        this.executor = executor;
    }


    @PostConstruct
    public void init() {
        Map<String, String> mainMdcContext = Maps.newHashMap();
        mainMdcContext.put("canal-thread", "balance-redis-processor-service");
        MDC.setContextMap(mainMdcContext);
        executor.execute(this);
        logger.info("MySQL-Balance数据自动同步到Redis:线程已经启动");
    }


    @Override
    public void run() {
        CanalConnector canalConnector = canalConfig.canalConnector();
        canalConnector.connect();
        // 回滚到未进行ack的地方
        canalConnector.rollback();
        try {
            while (open) {
                // 获取数据 每次获取一百条改变数据
                Message message = canalConnector.getWithoutAck(100);
                //获取这条消息的id
                long batchId = message.getId();
                int size = message.getEntries().size();

                if (batchId == -1 || size == 0) {
                    Thread.sleep(1000);
                    continue;
                }

                // 处理数据
                for (CanalEntry.Entry entry : message.getEntries()) {
                    if (entry.getEntryType() == CanalEntry.EntryType.ROWDATA) {
                        CanalEntry.RowChange rowChange = CanalEntry.RowChange.parseFrom(entry.getStoreValue());
                        CanalEntry.EventType eventType = rowChange.getEventType();

                        if (eventType == CanalEntry.EventType.UPDATE || eventType == CanalEntry.EventType.INSERT || eventType == CanalEntry.EventType.DELETE) {
                            for (CanalEntry.RowData rowData : rowChange.getRowDatasList()) {
                                List<CanalEntry.Column> columns = rowData.getAfterColumnsList();
                                String tableName = entry.getHeader().getTableName();

                                // 判断是否是 Balance 表的 amount 字段变更
                                if ("balance".equals(tableName)) {
                                    StringBuilder redisKey = new StringBuilder("balance:");
                                    for (CanalEntry.Column column : columns) {
                                        logger.info("Balance changed in 'balance' dataInfo: {}", column);
                                        if ("id".equals(column.getName())) {
                                            String changeId = column.getValue();
                                            logger.info("当前变更id为:{}", changeId);
                                            redisKey.append(changeId);
                                        }
                                        if ("amount".equals(column.getName())) {
                                            String changeValue = column.getValue();
                                            logger.info(changeValue);
                                            redisUtils.set(redisKey.toString(), changeValue);
                                        }
                                    }
                                }
                            }
                        }
                    }
                }
                // 确认消费完成这条消息
                canalConnector.ack(message.getId());
                logger.info("消费成功");
            }
        } catch (Exception e) {
            logger.warn("canal-消费失败");
        } finally {
            // 关闭连接
            canalConnector.disconnect();
        }
    }
}
测试

使用接口调用或者手动改库的方式,制造数据变更,查看日志打印情况:

Redis数据:

完成。

我已将canal实现数据同步代码开源,请自行下载领取,笔者不介意您宝贵的Star,如果能帮到您,十分荣幸。

mdc_logback

同时,如果您对笔者其他文章感兴趣,可以扫一扫关注笔者的公众号:种颗代码技术树

公众号文章更新更及时,以及一些程序员周边相关更新。

感谢您阅读到这里,不胜感激。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/935614.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

《深入理解Java虚拟机》读书笔记:方法调用

方法调用并不等同于方法执行&#xff0c;方法调用阶段唯一的任务就是确定被调用方法的版本&#xff08;即调用哪一个方法&#xff09;&#xff0c;暂时还不涉及方法内部的具体运行过程。在程序运行时&#xff0c;进行方法调用是最普遍、最频繁的操作&#xff0c;但前面已经讲过…

Nginx详解 一:编译安装Nginx和Nginx模块

文章目录 1.HTTP 和 Nginx1.1 Socket套接字1.2 HTTP工作机制1.2.1一次http事务1.2.2 资源类型1.2.3提高HTTP连接性能 2. I/O模型2.1 I/O模型相关概念2.2 网络I/O模型2.2.1 **阻塞型** **I/O** 模型&#xff08;blocking IO&#xff09;2.2.2 **非阻塞型** **I/O** **模型** **(…

在React项目是如何捕获错误的?

文章目录 react中的错误介绍解决方案后言 react中的错误介绍 错误在我们日常编写代码是非常常见的 举个例子&#xff0c;在react项目中去编写组件内JavaScript代码错误会导致 React 的内部状态被破坏&#xff0c;导致整个应用崩溃&#xff0c;这是不应该出现的现象 作为一个框架…

Java基础 数据结构一【栈、队列】

什么是数据结构 数据结构是计算机科学中的一个重要概念&#xff0c;用于组织和存储数据以便有效地进行访问、操作和管理。它涉及了如何在计算机内存中组织数据&#xff0c;以便于在不同操作中进行查找、插入、删除等操作 数据结构可以看作是一种数据的组织方式&#xff0c;不…

[maven]关于pom文件中的<relativePath>标签

关于pom文件中的<relativePath>标签 为什么子工程要使用relativePath准确的找到父工程pom.xml.因为本质继承就是pom的继承。父工程pom文件被子工程复用了标签。&#xff08;可以说只要我在父工程定义了标签&#xff0c;子工程就可以没有&#xff0c;因为他继承过来了&…

Kotlin数据结构

数据结构基础 什么是数据结构 在计算机科学中&#xff0c;数据结构&#xff08;Data Structure&#xff09;是计算机中存储、组织数据的方式。数据结构是各种编程语言的基础。 一些使用场景 不同的数据结构适用于不同的应用场景。比如HashMap与ConcurrentHashMap&#xff0…

【太多网工对NAT还存在这4种误解!你是其中一个吗?】

NAT是解决公网地址不够用大家最熟悉的网络技术之一&#xff0c;而NAT最依赖的是NAT translation表项&#xff0c;至于NAT的概念和背景这里不再解释&#xff0c;网络上有很多关于此的类似介绍&#xff0c;自己搜索即可。下面主要是针对大家对NAT的一些误解进行分析。 1 误解一…

leetcode3. 无重复字符的最长子串(滑动窗口 - java)

滑动窗口 无重复字符的最长子串滑动窗口 上期经典 无重复字符的最长子串 难度 - 中等 3. 无重复字符的最长子串 给定一个字符串 s &#xff0c;请你找出其中不含有重复字符的 最长子串 的长度。 示例 1: 输入: s “abcabcbb” 输出: 3 解释: 因为无重复字符的最长子串是 “abc…

0201hdfs集群部署-hadoop-大数据学习

文章目录 1 前言2 集群规划3 hadoop安装包上传与安装3.1 上传解压 4 hadoop配置5 从节点同步和环境变量配置6 创建用户7 集群启动8 问题集8.1 Invalid URI for NameNode address (check fs.defaultFS): file:/// has no authority. 结语 1 前言 下面我们配置下单namenode节点h…

基于Django的博客管理系统

1、克隆仓库https://gitee.com/lylinux/DjangoBlog.git 若失效&#xff1a;https://gitee.com/usutdzxy/DjangoBlog.git 2、环境安装 pip install -Ur requirements.txt3、修改djangoblog/setting.py 修改数据库配置&#xff0c;其他的步骤就按照官方文档。 DATABASES {def…

无涯教程-机器学习 - Jupyter Notebook函数

Jupyter笔记本基本上为开发基于Python的数据科学应用程序提供了一个交互式计算环境。它们以前称为ipython笔记本。以下是Jupyter笔记本的一些功能,使其成为Python ML生态系统的最佳组件之一- Jupyter笔记本可以逐步排列代码,图像,文本,输出等内容,从而逐步说明分析过程。 它有…

【js案例】滚动效果实现及简单动画函数抽离

目录 &#x1f31f;效果 &#x1f31f;实现思路 &#x1f31f;实现方法 HTML&CSS代码 初始化 滚动效果 完整JS代码 &#x1f31f;抽离动画函数 函数的简单使用 小案例一 小案例二 &#x1f31f;效果 &#x1f31f;实现思路 要实现自动滚动&#xff0c;无非就…

高等数学上册 第十章 重积分 第十一章 曲线积分与曲面积分 知识点总结

重积分 二重积分计算法&#xff1a; 直角坐标下&#xff1a;化为二次积分 { 如果图形是 X Y 型&#xff0c;则都可以&#xff0c;但要考虑哪个计算不定积分方便 如果图形既不是 X 也不是 Y 型&#xff0c;则要拆分 极坐标下&#xff1a; ∬ f ( x , y ) d x d y ∬ f ( ρ cos…

基于适应度相关算法优化的BP神经网络(预测应用) - 附代码

基于适应度相关算法优化的BP神经网络&#xff08;预测应用&#xff09; - 附代码 文章目录 基于适应度相关算法优化的BP神经网络&#xff08;预测应用&#xff09; - 附代码1.数据介绍2.适应度相关优化BP神经网络2.1 BP神经网络参数设置2.2 适应度相关算法应用 4.测试结果&…

Python学习之一 基于交互式解释器的简单Python编程

在很奇葩的Deepin下Miniconda安装之旅 中完成了Deepin系统下的Miniconda安装&#xff0c;在使用Miniconda 中完成了Miniconda的使用。今天&#xff0c;将开始学习Python编程。 (一) 为Python编程学习创建虚拟环境 首先创建虚拟环境&#xff0c;选择Python3.7。 conda create…

用于C++律动运动的中央模式生成器

用于C律动运动的中央模式生成器 一、说明 本篇讲述关于生物模型的神经网络&#xff0c; 中央模式生成器的简单神经网络的一个例子是半中心振荡器&#xff1b;该系统分成两个组成&#xff0c;信号层和物理层。新概念仓本模型&#xff0c;以及龙格库塔法的方程解法&#xff0c;总…

6.跑一下Triton官方教程

1.模型部署 首先拉取官方示例代码 git clone --recursive https://github.com/triton-inference-server/tutorials.git cd tutorials/Conceptual_Guide/Part_1-model_deployment 1.下载文本检测模型 wget https://www.dropbox.com/s/r2ingd0l3zt8hxs/frozen_east_text_dete…

裸露土堆识别算法

裸露土堆识别算法首先利用图像处理技术&#xff0c;提取出图像中的土堆区域。裸露土堆识别算法首通过计算土堆中被绿色防尘网覆盖的比例&#xff0c;判断土堆是否裸露。若超过40%的土堆没有被绿色防尘网覆盖&#xff0c;则视为裸露土堆。当我们谈起计算机视觉时&#xff0c;首先…

React与Vue:两大前端巨头的深度对决

引言 在当今的前端开发领域&#xff0c;React和Vue无疑是两大巨头。它们各自有着独特的历史和哲学&#xff0c;但都为开发者提供了强大的工具来构建高效、响应式的web应用。这篇文章将深入探讨这两个框架的差异&#xff0c;帮助开发者更好地理解它们的优势和劣势。 React与Vu…

Python“牵手”唯品会商品列表数据,关键词搜索唯品会API接口数据,唯品会API接口申请指南

唯品会平台API接口是为开发电商类应用程序而设计的一套完整的、跨浏览器、跨平台的接口规范&#xff0c;唯品会API接口是指通过编程的方式&#xff0c;让开发者能够通过HTTP协议直接访问唯品会平台的数据&#xff0c;包括商品信息、店铺信息、物流信息等&#xff0c;从而实现唯…