Redis学习(十)|使用消息队列的重试机制实现 MySQL 和 Redis 的数据一致性

news2025/1/14 4:59:50

文章目录

  • 介绍
  • 原理
  • 整体方案
  • 实现步骤
  • 示例代码
  • 总结
  • 其他:Kafka 重试策略配置
    • 1. 生产者重试策略配置
    • 2. 消费者重试策略配置

介绍

在分布式系统中,保持 MySQL 和 Redis 之间的数据一致性是至关重要的。为了确保数据的一致性,我们通常采取先更新数据库,再删除缓存的方案。然而,在实际应用中,由于网络问题、服务故障等原因,可能会导致数据库更新成功而缓存删除失败,进而导致数据不一致。为了解决这个问题,我们可以引入消息队列的重试机制,以确保缓存删除成功。

原理

重试机制是一种容错机制,用于在消息发送失败或者处理失败时进行重试。通过将数据更新操作封装成消息,并发送到消息队列中,在消费者处理消息时进行重试,可以提高系统的可靠性和稳定性。我们将使用消息队列的重试机制来实现 MySQL 和 Redis 的数据一致性。

整体方案

整体方案如下:

  1. 应用程序首先将数据更新操作发送到消息队列中。
  2. 消费者从消息队列中获取消息,并根据消息中的数据删除 Redis 中的缓存数据。
  3. 如果应用删除缓存失败,可以从消息队列中重新读取数据,然后再次删除缓存,这个就是重试机制。当然,如果重试超过的一定次数,还是没有成功,就需要向业务层发送报错信息了。
  4. 如果删除缓存成功,就要把数据从消息队列中移除,避免重复操作,否则就继续重试。

实现步骤

以下是使用消息队列的重试机制实现 MySQL 和 Redis 数据一致性的基本步骤:

  1. 将数据更新操作封装成消息:在应用程序中,将数据更新操作封装成消息,并发送到消息队列中。消息中应包含数据更新操作的类型(如插入、更新或删除)以及相关的数据。
  2. 消费者消息处理和失败重试:消费者从消息队列中获取消息,并根据消息中的数据更新操作来删除 Redis 中的缓存数据。消息消费失败后自动进行重试。可以根据重试次数和重试间隔来配置重试机制,例如指数退避策略。
  3. 消息确认机制:消费者在成功处理消息后,需要发送确认消息给消息队列,告知消息队列可以删除或标记消息为已处理。这样可以确保消息在成功处理后不会被重新处理,避免重复处理的情况。

示例代码

以下是一个简单的示例代码,演示了如何使用 Java 实现消息队列的重试机制,以确保 MySQL 和 Redis 的数据一致性:

import org.apache.kafka.clients.consumer.*;
import org.apache.kafka.clients.producer.*;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.apache.kafka.common.serialization.StringSerializer;

import java.time.Duration;
import java.util.Collections;
import java.util.Properties;

public class MySQLToKafka {

    private static final String TOPIC_NAME = "mysql_updates";
    private static final String BOOTSTRAP_SERVERS = "localhost:9092";
    private static final String GROUP_ID = "cache-deletion-group";

    public static void main(String[] args) {
        // 模拟 MySQL 更新后发送消息到 Kafka
        sendMySQLUpdateToKafka("data_update");

        // 模拟从 Kafka 拉取消息删除 Redis 缓存
        pullMessageFromKafkaAndDeleteCache();
    }

    // 发送 MySQL 更新消息到 Kafka
    private static void sendMySQLUpdateToKafka(String message) {
        Properties props = new Properties();
        props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, BOOTSTRAP_SERVERS);
        props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
        props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());

        Producer<String, String> producer = new KafkaProducer<>(props);

        ProducerRecord<String, String> record = new ProducerRecord<>(TOPIC_NAME, message);

        try {
            producer.send(record).get();
            System.out.println("Message sent to Kafka successfully: " + message);
        } catch (Exception e) {
            e.printStackTrace();
        } finally {
            producer.close();
        }
    }

    // 从 Kafka 拉取消息并删除 Redis 缓存
    private static void pullMessageFromKafkaAndDeleteCache() {
        Properties props = new Properties();
        props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, BOOTSTRAP_SERVERS);
        props.put(ConsumerConfig.GROUP_ID_CONFIG, GROUP_ID);
        props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
        props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());

        KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
        consumer.subscribe(Collections.singletonList(TOPIC_NAME));

        try {
            while (true) {
                ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));
                for (ConsumerRecord<String, String> record : records) {
                    System.out.printf("Received message: offset = %d, key = %s, value = %s%n",
                            record.offset(), record.key(), record.value());
                    if (deleteCacheFromRedis(record.value())) {
                        System.out.println("Cache deleted from Redis successfully.");
                        // 处理消息成功后手动提交偏移量
                        consumer.commitAsync();
                    } else {
                        System.out.println("Cache deletion from Redis failed. Kafka will retry.");
                    }
                }
            }
        } catch (Exception e) {
            e.printStackTrace();
        } finally {
            consumer.close();
        }
    }

    // 模拟删除 Redis 缓存
    private static boolean deleteCacheFromRedis(String message) {
        // 这里省略删除 Redis 缓存的逻辑,直接模拟删除成功和失败
        // 模拟删除成功的概率为 0.7
        if (Math.random() < 0.7) {
            return true;
        }
        return false;
    }
}

以上代码完成了以下几个功能:

  1. 发送 MySQL 更新消息到 Kafka: sendMySQLUpdateToKafka 方法模拟了 MySQL 更新后发送消息到 Kafka 的过程。它使用 Kafka 生产者将消息发送到指定的 Kafka 主题中。
  2. 从 Kafka 拉取消息并删除 Redis 缓存: pullMessageFromKafkaAndDeleteCache 方法模拟了从 Kafka 拉取消息并删除 Redis 缓存的过程。它使用 Kafka 消费者订阅指定的 Kafka 主题,并轮询获取消息。对于每条消息,它尝试删除对应的 Redis 缓存。如果删除失败,就会打印一条消息表示失败,然后 Kafka 将自动重试该消息。只有在成功删除缓存后,才会手动提交偏移量。
  3. 模拟删除 Redis 缓存: deleteCacheFromRedis 方法用于模拟删除 Redis 缓存的过程。它返回一个布尔值,表示缓存删除操作的成功或失败。在实际应用中,这个方法应该被替换为真正的删除 Redis 缓存的逻辑。

通过这样的流程,模拟了一个简单的数据更新、消息发送、消息消费、缓存删除的完整流程,并且在处理消息失败时利用 Kafka 的自动重试机制进行了处理。

总结

通过引入消息队列的重试机制,可以有效地实现 MySQL 和 Redis 的数据一致性。使用重试机制,可以确保数据在 MySQL 更新后 Redis 的对应缓存能够成功删除,从而保持数据的一致性。这种方法适用于需要处理大量数据更新和异步消息传输的场景,同时也提高了系统的可靠性和稳定性。

其他:Kafka 重试策略配置

1. 生产者重试策略配置

对于 Kafka 生产者,可以通过配置以下参数来定义重试策略:

  • retries: 设置生产者在发生可重试的异常时重试的最大次数。默认值为 2147483647(即最大整数)。
  • retry.backoff.ms: 设置生产者在重试之间等待的时间。默认值为 100 毫秒。

示例配置:

# 设置最大重试次数为 3 次
retries=3
# 设置重试之间的等待时间为 500 毫秒
retry.backoff.ms=500

2. 消费者重试策略配置

对于 Kafka 消费者,可以通过以下参数来定义重试策略:

  • enable.auto.commit: 指定消费者是否自动提交偏移量。默认为 true,表示自动提交。
  • auto.commit.interval.ms: 如果启用了自动提交偏移量,可以通过该参数设置自动提交的间隔时间。默认值为 5000 毫秒。
  • max.poll.interval.ms: 设置消费者在拉取消息之间的最大时间间隔。如果消费者在此间隔内没有发送心跳,将被认为失败,并且将其分区重新分配给其他消费者。默认值为 300000 毫秒(5 分钟)。
  • max.poll.records: 设置消费者在单次调用 poll 方法中拉取的最大记录数。默认值为 500 条。

示例配置:

# 禁用自动提交偏移量
enable.auto.commit=false
# 设置自动提交偏移量的间隔时间为 1000 毫秒
auto.commit.interval.ms=1000
# 设置拉取消息之间的最大时间间隔为 10 秒
max.poll.interval.ms=10000
# 设置单次 poll 方法拉取的最大记录数为 100 条
max.poll.records=100

通过以上配置,可以定制 Kafka 生产者和消费者的重试策略,以适应不同的业务需求和性能要求。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/1649763.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

【Three.js基础学习】15.scroll-based-animation

提示&#xff1a;文章写完后&#xff0c;目录可以自动生成&#xff0c;如何生成可参考右边的帮助文档 前言 课程要点 结合html等场景 做滚动动画 1.遇到的问题&#xff0c; 在向下滚动时&#xff0c;下方会显白&#xff08;部分浏览器&#xff09; 解决&#xff1a;alpha:true …

什么是多模态大模型,有了大模型,为什么还要多模态大模型?

随着人工智能技术的愈演愈烈&#xff0c;其技术可以说是日新月异&#xff0c;每隔一段时间就会有新的技术和理念被创造出来&#xff1b;而多模态大模型也是其中之一。 什么是多模态 想弄明白什么是多模态大模型&#xff0c;那么首先就要弄明白什么是多模态。 简单来说&#x…

shell常用文件处理命令

1. 解压 1.1 tar 和 gz 文件 如果你有一个 .tar 文件,你可以使用以下命令来解压: tar -xvf your_file.tar在这个命令中,-x 表示解压缩,-v 表示详细输出(可选),-f 后面跟着要解压的文件名。 如果你的 .tar 文件同时被 gzip 压缩了(即 .tar.gz 文件),你可以使用以下…

PHP 匿名函数和闭包在数据结构中的应用

匿名函数和闭包在数据结构处理中的应用php 中的匿名函数和闭包可用于处理数组、链表和队列等数据结构。针对数组&#xff0c;匿名函数可用于过滤元素&#xff1b;针对链表&#xff0c;闭包可用于创建节点&#xff1b;针对队列&#xff0c;匿名函数和闭包可实现 fifo 队列操作。…

2005-2021年全国各地级市生态环境注意力/环保注意力数据(根据政府报告文本词频统计)

2005-2021年全国各地级市生态环境注意力/环保注意力数据&#xff08;根据政府报告文本词频统计&#xff09; 2005-2021年全国各地级市生态环境注意力/环保注意力数据&#xff08;根据政府报告文本词频统计&#xff09; 1、时间&#xff1a;2005-2021年 2、范围&#xff1a;2…

初始Linux(基础命令)

前言&#xff1a; 我们不能总沉浸在编程语言中&#xff0c;虽然代码能力提升了&#xff0c;但是也只是开胃小菜。我们要朝着更高的方向发展。 最近小编一直在刷力扣&#xff0c;以至于博客更新的比较少。今天就带各位开始学习全新的知识——Linux.至于为啥要学&#xff1f; Lin…

基于FPGA的多路彩灯控制器VHDL代码Quartus仿真

名称&#xff1a;基于FPGA的多路彩灯控制器VHDL代码Quartus仿真&#xff08;文末获取&#xff09; 软件&#xff1a;Quartus 语言&#xff1a;VHDL 代码功能&#xff1a; 多路彩灯控制器 综合训练内容要求 设计一台基于FPGA的多路彩灯控制器的设计。要求如下 1.彩灯从左…

IOS自动化—将WDA打包ipa批量安装驱动

前言 CSDN&#xff1a; ios自动化-Xcode、WebDriverAgent环境部署 ios获取原生系统应用的包 如果Mac电脑没有配置好Xcode相关环境,可以参考以上文章。 必要条件 Mac电脑&#xff0c;OS版本在12.4及以上&#xff08;低于这个版本无法安装Xcode14&#xff0c;装不了Xcode14就…

20230507,LIST容器

学了又忘学了又忘&#xff0c;明知道会忘又不想复习又还得学 LIST容器 1.1 基本概念 链表是一种物理存储单元上非连续的存储结构&#xff0c;数据元素的逻辑顺序是通过链表中的指针链接实现的&#xff1b;链表由一系列结点组成 结点&#xff1a;一个是存储数据元素的数据域&a…

《ESP8266通信指南》12-Lua 固件烧录

往期 《ESP8266通信指南》11-Lua开发环境配置-CSDN博客 《ESP8266通信指南》10-MQTT通信&#xff08;Arduino开发&#xff09;-CSDN博客 《ESP8266通信指南》9-TCP通信&#xff08;Arudino开发&#xff09;-CSDN博客 《ESP8266通信指南》8-连接WIFI&#xff08;Arduino开发…

循环链表 -- c语言实现

#pragma once // 带头双向循环链表增删查改实现 #include<stdlib.h> #include<stdio.h> #include<assert.h>typedef int LTDataType;typedef struct ListNode {LTDataType data;struct ListNode* next;struct ListNode* prev; }ListNode;//双链表申请一个新节…

【Python】PTA 查验身份

知识点&#xff1a; 1.这里的加权求和就是指每一位乘以题目给的对应位置上的数字 在python中&#xff0c;对于int(10)这样的转换而来的直接是整数10&#xff0c;但是在c语言中会转换成ASCII值&#xff0c;所以要特别注意 2.本题中有两种情况是错误的&#xff0c;就是要直接输…

DES加密解密算法(简单、易懂、超级详细)

目录 一、基础补充 二、什么是DES算法 &#xff08;1&#xff09;对称加密算法 &#xff08;2&#xff09;非对称加密算法 &#xff08;3&#xff09;对称加密算法的应用 三、DES算法的基础操作步骤 1.明文的加密整体过程 2.F轮函数解析 3.密钥的形成过程 四、AC代码 五、D…

自然语言(NLP)

It’s time for us to learn how to analyse natural language documents, using Natural Language Processing (NLP). We’ll be focusing on the Hugging Face ecosystem, especially the Transformers library, and the vast collection of pretrained NLP models. Our proj…

JuiceFS v1.2-beta1,Gateway 升级,多用户场景权限管理更灵活

JuiceFS v1.2-beta1 今天正式发布。在这个版本中&#xff0c;除了进行了大量使用体验优化和 bug 修复外&#xff0c;新增三个特性&#xff1a; Gateway 功能扩展&#xff1a;新增了“身份和访问管理&#xff08;Identity and Access Management&#xff0c;IAM&#xff09;” 与…

WHM中如何查看磁盘使用情况

今日看到有用户在论坛留言反馈他买了Hostease 独立服务器并购买cPanel面板&#xff0c;想要通过面板查看当前服务器使用的磁盘情况&#xff0c;但是不知道如何查看。因为这边也是对于cPanel即WHM面板有是有所了解的&#xff0c;对于这个用户的问题&#xff0c; 操做步骤如下&am…

【Linux】Docker 安装部署 Nacos

个人简介&#xff1a;Java领域新星创作者&#xff1b;阿里云技术博主、星级博主、专家博主&#xff1b;正在Java学习的路上摸爬滚打&#xff0c;记录学习的过程~ 个人主页&#xff1a;.29.的博客 学习社区&#xff1a;进去逛一逛~ 【Linux】Docker 安装部署 Nacos docker搜索na…

看完这篇文章我奶奶都懂Opentracing了(一)

前言 如果要基于Opentracing开发分布式链路追踪Java客户端工具包&#xff0c;首先肯定需要了解Opentracing中的各种概念&#xff0c;包括但不限于Span和Scope等&#xff0c;其实这些概念在Opentracing的官方文档中是有比较详尽的说明的&#xff0c;英文不好也能靠着机器翻译读…

【linux-IMX6ULL中断配置流程】

目录 1. Cortex-A7和GIC中断概述1. 1 Cortex-A7中断系统&#xff1a;1. 2 GIC中断控制器简介&#xff1a; 2. 中断配置概述3. 底层中断文件配置3.1 对启动文件.s的配置思路3.2 对中断函数配置思路 4. 上层中断配置流程 1. Cortex-A7和GIC中断概述 学习IMX6UL的中断处理系统&…

毕业就业信息|基于Springboot+vue的毕业就业信息管理系统的设计与实现(源码+数据库+文档)

毕业就业信息管理系统 目录 基于Springboot&#xff0b;vue的毕业就业信息管理系统设计与实现 一、前言 二、系统设计 三、系统功能设计 1学生信息管理 2 公司信息管理 3公告类型管理 4公告信息管理 四、数据库设计 五、核心代码 六、论文参考 七、最新计算机毕设…