Redis Streams在Spring Boot中的应用:构建可靠的消息队列解决方案【redis实战 二】

news2025/2/5 6:02:50

欢迎来到我的博客,代码的世界里,每一行都是一个故事


在这里插入图片描述

Redis Streams在Spring Boot中的应用:构建可靠的消息队列解决方案

    • 引言
    • 前言
    • Redis Streams的基本概念和特性
      • 1. 日志数据结构
      • 2. 消息和字段
      • 3. 消费者组
      • 4. 消息ID
      • 5. 实时和历史数据处理
      • 6. 性能和可靠性
    • 实战
      • maven依赖
      • 配置StreamConfig(监听)
      • 配置生产者
      • 配置消费者(组)
      • 配置初始化方法
      • 实现效果
    • 基于List和专业消息队列对比
      • 相比于Redis List解决的痛点:
      • 相比于专业高级队列的不足:
    • 总结

引言

Redis Stream解密:探秘数据流处理的黑科技【一】

解锁Redis Stream新境界:高级用法大揭秘【二】

Redis List:打造高效消息队列的秘密武器【redis实战 一】

前言

在快节奏的技术世界中,消息队列是连接不同服务和组件的关键。而在这个领域,Redis Streams作为一种新兴的消息队列解决方案,以其高性能和易用性吸引了众多开发者的目光。当这项技术遇到了Spring Boot —— 当今最受欢迎的Java开发框架,它们的结合将如何开启新的可能性?让我们开始这趟探索之旅,深入了解如何将这两种强大的技术融合在一起,打造出优雅而强大的消息队列系统。

Redis Streams的基本概念和特性

Redis Streams是Redis数据库的一个强大类型,于Redis 5.0中引入。它主要用于消息队列和事件流的存储与传递,是一个高性能、持久化的日志数据结构。以下是Redis Streams的一些基本概念和核心特性:

1. 日志数据结构

  • 持久化的消息日志:Redis Streams是一个按时间排序的消息日志。每条消息都存储在它被插入时的顺序位置,并且有一个唯一的ID标识。
  • 可追溯性:由于其日志特性,Redis Streams允许你访问历史消息,这对于消息的追溯、重放和延迟处理非常有用。

2. 消息和字段

  • 消息结构:每条消息都可以包含一个或多个字段(field)和值(value)。这类似于一个小的哈希结构,使得每条消息可以携带多个相关的数据点。
  • 灵活的数据模型:你可以根据应用的需要自由定义每条消息包含的字段和数据格式。

3. 消费者组

  • 支持多消费者:Redis Streams可以被多个消费者或多个消费者组同时读取,每个消费者组都会跟踪其成员的进度。
  • 消息确认:消费者读取并处理消息后,可以发送确认,表示消息已被处理。未确认的消息可以被再次处理,确保消息不会因消费者失败而丢失。
  • 故障处理:支持挂起的消息列表和消费者超时检测,使得在消费者失败时可以由其他消费者接手处理消息。

4. 消息ID

  • 自动生成或指定:消息ID通常由Redis自动生成,保证了全局唯一性和顺序性。你也可以手动指定ID以实现更复杂的场景。
  • 组成结构:ID由一个时间戳部分和一个序列号部分组成,格式为<时间戳>-<序列号>

5. 实时和历史数据处理

  • 实时消息处理:通过XREADXREADGROUP命令,你可以实时监听并处理新添加到流中的消息。
  • 历史消息查询:通过XRANGEXREVRANGE等命令,可以查询流中的历史消息,这对于数据分析、审计和消息重放非常有用。

6. 性能和可靠性

  • 高性能:Redis Streams设计用于处理高吞吐量的消息,能够支持每秒数百万消息的读写。
  • 持久化:与Redis的其他数据类型一样,Streams的数据可以持久化到磁盘,保证了数据的持久性和可靠性。

实战

maven依赖

<dependency>
  <groupId>org.springframework.boot</groupId>
  <artifactId>spring-boot-starter-data-redis</artifactId>
</dependency>

<dependency>
  <groupId>org.projectlombok</groupId>
  <artifactId>lombok</artifactId>
</dependency>

配置StreamConfig(监听)

package fun.bo.config;

import fun.bo.consumer.MessageConsumer;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.data.redis.connection.RedisConnectionFactory;
import org.springframework.data.redis.connection.stream.Consumer;
import org.springframework.data.redis.connection.stream.ObjectRecord;
import org.springframework.data.redis.connection.stream.ReadOffset;
import org.springframework.data.redis.connection.stream.StreamOffset;
import org.springframework.data.redis.stream.StreamMessageListenerContainer;

import java.time.Duration;

/**
 * @author xiaobo
 */
@Configuration
public class StreamConfig {

    @Bean
    public StreamMessageListenerContainer<String, ObjectRecord<String, String>> streamMessageListenerContainer(
            RedisConnectionFactory connectionFactory, MessageConsumer messageConsumer) {

        // 用于配置消息监听容器的选项。在这个方法中,通过设置不同的选项,如轮询超时时间和消息的目标类型,可以对消息监听容器进行个性化的配置。
        StreamMessageListenerContainer.StreamMessageListenerContainerOptions<String, ObjectRecord<String, String>> options =
                StreamMessageListenerContainer.StreamMessageListenerContainerOptions
                        .builder()
                        // 设置了轮询超时的时间为100毫秒。这意味着当没有新的消息时,容器将每隔100毫秒进行一次轮询。
                        .pollTimeout(Duration.ofMillis(100))
                        // 指定了消息的目标类型为 String。这意味着容器会将接收到的消息转换为 String 类型,以便在后续的处理中使用。
                        .targetType(String.class)
                        .build();

        // 创建一个可用于监听Redis流的消息监听容器。
        StreamMessageListenerContainer<String, ObjectRecord<String, String>> listenerContainer =
                StreamMessageListenerContainer.create(connectionFactory, options);

        // 方法配置了容器来接收来自特定消费者组和消费者名称的消息。它还指定了要读取消息的起始偏移量,以确定从哪里开始读取消息。
        listenerContainer.receive(
                Consumer.from("your-consumer-group", "your-consumer-name"),
                StreamOffset.create("your-stream-name", ReadOffset.lastConsumed()), messageConsumer);

        // 方法启动了消息监听容器,使其开始监听消息。一旦容器被启动,它将开始接收并处理来自Redis流的消息。
        listenerContainer.start();

        return listenerContainer;
    }
}

配置生产者

package fun.bo.produce;

import lombok.RequiredArgsConstructor;
import org.springframework.data.redis.connection.stream.RecordId;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.stereotype.Service;

import java.util.HashMap;
import java.util.Map;

/**
 * @author xiaobo
 */
@Service
@RequiredArgsConstructor
public class MessageProducer {

    private final RedisTemplate<String, String> redisTemplate;


    public void sendMessage(String streamKey, String messageKey, String message) {
        Map<String, String> messageMap = new HashMap<>();
        messageMap.put(messageKey, message);

        RecordId recordId = redisTemplate.opsForStream().add(streamKey, messageMap);
        if (recordId != null) {
            System.out.println("Message sent to Stream '" + streamKey + "' with RecordId: " + recordId);
        }
    }
}

配置消费者(组)

package fun.bo.consumer;

import org.springframework.data.redis.connection.stream.ObjectRecord;
import org.springframework.data.redis.stream.StreamListener;
import org.springframework.stereotype.Service;

/**
 * @author xiaobo
 */
@Service
public class MessageConsumer implements StreamListener<String, ObjectRecord<String, String>> {

    @Override
    public void onMessage(ObjectRecord<String, String> message) {
        String stream = message.getStream();
        String messageId = message.getId().toString();
        String messageBody = message.getValue();

        System.out.println("Received message from Stream '" + stream + "' with messageId: " + messageId);
        System.out.println("Message body: " + messageBody);
    }
}

配置初始化方法

如果是已经存在stream,则可以不配置,这个主要是为了防止启动报错,org.springframework.data.redis.RedisSystemException: Error in execution; nested exception is io.lettuce.core.RedisCommandExecutionException: NOGROUP No such key ‘your-stream-name’ or consumer group ‘your-consumer-group’ in XREADGROUP with GROUP option

public void initializeStream() {

  StreamOperations<String, Object, Object> streamOperations = redisTemplate.opsForStream();

  // 创建一个流
  try {
    streamOperations.createGroup("your-stream-name", ReadOffset.from("0"), "your-consumer-group");
  } catch (Exception e) {
    // 流可能已存在,忽略异常
  }
}

实现效果

在这里插入图片描述

基于List和专业消息队列对比

Redis Streams作为消息队列相比于使用传统的Redis List类型,引入了一系列改进和新功能,同时也与专业的高级消息队列系统(如RabbitMQ、Kafka等)相比存在一些差距。以下是详细的分析:

相比于Redis List解决的痛点:

  1. 更好的消息顺序保证

    • List:虽然List可以保持插入顺序,但在高并发情况下,确保生产者和消费者的顺序一致性较为复杂。
    • Streams:提供了全局唯一的、基于时间的ID来标识消息,确保了消息的全局顺序。
  2. 消费者组支持

    • List:原生List类型不支持消费者组的概念,实现多消费者协调处理同一任务队列较为复杂。
    • Streams:原生支持消费者组,允许多个消费者共享负载,并跟踪各自的进度。
  3. 消息持久化和读取

    • List:读取或消费消息后,需要显式删除,否则会一直保留在List中,处理大量消息时可能会导致内存问题。
    • Streams:消息即使被消费,仍然保留在Stream中,可以随时查询历史消息,且不会因消费而被移除。
  4. 复杂的读取操作

    • List:List提供的操作相对简单,复杂的读取逻辑(如按时间范围查询)需要额外的逻辑来实现。
    • Streams:提供了复杂的查询命令,如XRANGEXREVRANGE,可以按ID范围(时间范围)查询消息。
  5. 消息确认和重试

    • List:需要手动实现消息确认和重试机制,管理起来较为复杂。
    • Streams:提供了消息确认(XACK)和挂起消息查询(XPENDING)的功能,使得消息的重试和故障处理更加容易。

相比于专业高级队列的不足:

  1. 事务和消息持久性保证

    • Redis Streams:虽然提供持久化,但在处理复杂事务和确保消息持久性方面不如一些专业的消息队列系统(如Kafka的WAL日志)。
  2. 集群和分区

    • Redis Streams:在集群环境下使用稍显复杂,且对于数据分区和扩展性的支持不如专业的消息队列系统(如Kafka的分区机制)。
  3. 管理和监控工具

    • Redis Streams:虽然有基本的监控命令,但没有一些高级消息队列系统提供的丰富的管理界面和监控工具。
  4. 高级消息路由和过滤

    • Redis Streams:缺乏一些高级消息队列提供的消息路由和过滤功能(如RabbitMQ的Exchange和Binding)。
  5. 消息传递语义

    • Redis Streams:提供了基础的至少一次处理语义,但可能不像某些系统那样支持严格的只处理一次语义。

总结

Redis Streams提供了一个轻量级、高性能且功能丰富的消息队列实现,解决了使用List作为队列时的许多痛点,特别适合需要快速部署、低延迟和简单可靠的场景。然而,对于需要复杂事务处理、高级路由和过滤、或更丰富管理工具的复杂应用场景,专业的消息队列系统可能更加适合。选择哪种方案,应根据你的具体需求、资源和技术栈来决定。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/1337133.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

1.决策树

目录 1. 什么是决策树? 2. 决策树的原理 2.1 如何构建决策树&#xff1f; 2.2 构建决策树的数据算法 2.2.1 信息熵 2.2.2 ID3算法 2.2.2.1 信息的定义 2.2.2.2 信息增益 2.2.2.3 ID3算法举例 2.2.2.4 ID3算法优缺点 2.2.3 C4.5算法 2.2.3.1 C4.5算法举例 2.2.4 CART算法 2.2.4…

智能优化算法应用:基于孔雀算法3D无线传感器网络(WSN)覆盖优化 - 附代码

智能优化算法应用&#xff1a;基于孔雀算法3D无线传感器网络(WSN)覆盖优化 - 附代码 文章目录 智能优化算法应用&#xff1a;基于孔雀算法3D无线传感器网络(WSN)覆盖优化 - 附代码1.无线传感网络节点模型2.覆盖数学模型及分析3.孔雀算法4.实验参数设定5.算法结果6.参考文献7.MA…

机械革命极光Pro重装Win10系统图解

机械革命极光Pro是性能优秀的笔记本电脑&#xff0c;深受广大用户的喜欢&#xff0c;现在用户想给笔记本电脑重新安装一下操作系统&#xff0c;但不知道重装系统的详细步骤。下面小编将带来机械革命极光Pro笔记本电脑重装系统Win10版本的步骤介绍&#xff0c;帮助更多的用户完成…

Python 基础面试第三弹

1. 获取当前目录下所有文件名 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 import os def get_all_files(directory): file_list []<br> # <code>os.walk</code>返回一个生成器&#xff0c;每次迭代时返回当前目录路径、子目录列表和文件列表 for…

【Kafka】Kafka客户端认证失败:Cluster authorization failed.

背景 kafka客户端是公司内部基于spring-kafka封装的spring-boot版本&#xff1a;3.xspring-kafka版本&#xff1a;2.1.11.RELEASE集群认证方式&#xff1a;SASL_PLAINTEXT/SCRAM-SHA-512经过多年的经验&#xff0c;以及实际验证&#xff0c;配置是没问题的&#xff0c;但是业务…

数据结构:图文详解 树与二叉树(树与二叉树的概念和性质,存储,遍历)

目录 一.树的概念 二.树中重要的概念 三.二叉树的概念 满二叉树 完全二叉树 四.二叉树的性质 五.二叉树的存储 六.二叉树的遍历 前序遍历 中序遍历 后序遍历 一.树的概念 树是一种非线性数据结构&#xff0c;它由节点和边组成。树的每个节点可以有零个或多个子节点…

113基于matlab的PSO-SVM多输入单输出预测程序

基于matlab的PSO-SVM多输入单输出预测程序。PSO对SVM的两个参数进行优化得到最佳参数值进行预测。并输出预测误差等相应结果。程序已调通&#xff0c;可直接运行。 113matlabPSO-SVM多输入单输出 (xiaohongshu.com)

普中STM32-PZ6806L开发板(STM32CubeMX创建项目并点亮LED灯)

简介 搭建一个用于驱动 STM32F103ZET6 GPIO点亮LED灯的任务;电路原理图 LED电路原理图 芯片引脚连接LED驱动引脚原理图 创建一个点亮LED灯的Keil 5项目 创建STM32CubeMX项目 New Project -> 单击 -> 芯片搜索STM32F103ZET6->双击创建 初始化时钟 初始化LED G…

基于双闭环PI的SMO无速度控制系统simulink建模与仿真

目录 1.课题概述 2.系统仿真结果 3.核心程序与模型 4.系统原理简介 5.完整工程文件 1.课题概述 基于双闭环PI的SMO无速度控制系统simulink建模与仿真&#xff0c;基于双闭环PI的SMO无速度控制系统主要由两个闭环组成&#xff1a;一个是电流环&#xff0c;另一个是速度环。…

Flink CDC 1.0至3.0回忆录

Flink CDC 1.0至3.0回忆录 一、引言二、CDC概述三、Flink CDC 1.0&#xff1a;扬帆起航3.1 架构设计3.2 版本痛点 四、Flink CDC 2.0&#xff1a;成长突破4.1 DBlog 无锁算法4.2 FLIP-27 架构实现4.3 整体流程 五、Flink CDC 3.0&#xff1a;应运而生六、Flink CDC 的影响和价值…

数据库原理及应用·数据库保护

7.1 事务 7.1.1 事务定义 1.事务是用户定义的一个数据操作序列&#xff0c;这些操作要么全部执行、要么全部不执行&#xff0c;是一个不可分割的工作单元。 事务是恢复和并发控制的基本单位 事务的两种方式&#xff1a; 7.1.2 事务处理模型 1.ISO事务处理模型&#xff1a…

隐私第一:在几分钟内部署本地大语言模型!

彻底改变您的数据安全游戏&#xff1a;快速无缝部署本地大语言模型&#xff0c;实现无与伦比的隐私! 2023年是人工智能领域加速发展的一年。除了健壮的商业上可用的大型语言模型之外&#xff0c;还出现了许多值得称赞的开源方案&#xff0c;例如Llama2、Codellama、Mistral和Vi…

鸿蒙开发中的坑(持续更新……)

最近在使用鸿蒙开发时&#xff0c;碰到了一些坑&#xff0c;特做记录&#xff0c;如&#xff1a;鸿蒙的preview不能预览&#xff0c;轮播图组件Swiper使用时的问题&#xff0c;console.log() 打印的内容 一、鸿蒙的preview不能预览 首先&#xff0c;只有 ets文件才能预览。 其…

HarmonyOS应用抓包实战

Charles抓包原理 Charles是一个HTTP代理服务器,HTTP监视器,反转代理服务器&#xff0c;当浏览器连接Charles的代理访问互联网时&#xff0c;Charles可以监控浏览器发送和接收的所有数据。 在开发OpenHarmony/HarmonyOS应用开发时&#xff0c;我们使用的是ohos/axios来进行网络…

2023.12.25 关于 Redis 数据类型 Hash 常用命令、内部编码、应用场景

目录 Hash 数据类型 Hash 操作命令 HSET HGET HEXISTS HDEL HKEYS HVALS HGETALL HMGET HLEN HSETNX HINCRBY HINCRBYFLOAT HSTRLEN Hash 编码方式 理解什么是压缩 Hash 实际应用 Cache 缓存 Hash 数据类型 整体上来说 Redis 是键值对结构&#xff0c;其中 …

基于JSP+Servlet+Mysql的学生宿舍管理系统(简单的增删改查)

基于JSPServletMysql的学生宿舍管理系统 一、系统介绍二、功能展示1.登陆、注册2.主页3.增加3.修改4.删除 四、其它1.其他系统实现五.获取源码 一、系统介绍 项目名称&#xff1a;基于JSPServletMysql的学生宿舍管理系统(简单的增删改查) 项目架构&#xff1a;B/S架构 开发语…

电视盒子什么品牌好?经销商分享线下热销电视盒子排行榜

做实体数码店已经超过六年了&#xff0c;我对电视盒子这行是非常了解的&#xff0c;品牌的优势和特色都有研究&#xff0c;超级多网友在讨论电视盒子什么品牌好&#xff0c;我整理了店铺内销量最高的电视盒子排行榜&#xff0c;想知道目前哪些电视盒子最受消费者欢迎&#xff0…

真实案例扫描APP开发——基于实例分割实现拍照文档实时边缘检测(C++/JNI实现)

前言 这是一个安卓NDK的项目&#xff0c;想要实现的效果就是拍照扫描&#xff0c;这里只涉及到的只有边缘检测&#xff0c;之后会写文档滤镜、证件识别与证件1比1打印&#xff0c;OCR、版面分析之后的文档还原。我的开发环境是Android Studio 北极狐&#xff0c;真机是华为mat…

详解Keras3.0 Layer API: LSTM layer

LSTM layer 用于实现长短时记忆网络&#xff0c;它的主要作用是对序列数据进行建模和预测。 遗忘门&#xff08;Forget Gate&#xff09;&#xff1a;根据当前输入和上一个时间步的隐藏状态&#xff0c;计算遗忘门的值。遗忘门的作用是控制哪些信息应该被遗忘&#xff0c;哪些…

最新版手机无人直播硬改虚拟摄像头,支持多平台修改手机摄像头【硬改神器+使用教程】

最新版手机无人直播助手App安卓版介绍&#xff1a; 顺哥轻创V:shundazy1 这是一款兼容性强大的手机无人直播工具&#xff0c;是无人直播神器&#xff0c;不依赖电脑&#xff0c;手机无需root权限&#xff0c;不需要装xp框架&#xff0c;支持主流平台兼容性极佳&#xff0c;1V…