Redis 消息队列 Stream

news2025/1/10 16:39:16

tip:作为程序员一定学习编程之道,一定要对代码的编写有追求,不能实现就完事了。我们应该让自己写的代码更加优雅,即使这会费时费力。

💕💕 推荐:体系化学习Java(Java面试专题)

文章目录

  • 1、什么是 Stream
  • 2、为什么要设计 Stream
  • 3、Stream 命令详解
  • 4、java 写一点 Stream 的 demo
  • 5、Stream 的应用场景

1、什么是 Stream

Stream 是 Redis 5.0 版本中新增的一种数据结构,它是一个高性能、持久化的消息队列,可以用于实现消息的发布和订阅。Stream 可以看作是一个有序的消息队列,每个消息都有一个唯一的 ID,可以根据 ID 进行消息的查找、删除和确认。在 Stream 中,消息以键值对的形式存储,可以存储任意类型的数据。Stream 还支持多个消费者组,每个消费者组可以独立消费消息,避免消息重复消费。Stream 的引入使得 Redis 在消息队列领域更具竞争力,同时也为开发者提供了一种高效、可靠的消息处理方式。

2、为什么要设计 Stream

Redis 设计 Stream 的原因主要是为了满足大规模实时数据处理的需求。在传统的消息队列中,消息的消费者只能消费最新的消息,而无法消费过去的消息。而在实时数据处理中,往往需要对历史数据进行分析和处理,因此需要一种能够存储大量历史数据并支持快速查询和消费的数据结构。Stream 的引入解决了这个问题,它支持持久化存储和快速查询,可以存储大量历史数据,并且支持多个消费者组独立消费消息,从而满足了大规模实时数据处理的需求。此外,Stream 还支持消息的延迟和重试等功能,使得 Redis 在消息队列领域更具竞争力。

3、Stream 命令详解

Stream 是 Redis 5.0 版本新增的一种数据结构,支持高性能、持久化的消息队列。下面是 Stream 命令的详细介绍:

  1. XADD key ID field1 value1 [field2 value2 …]:向指定的 Stream 中添加一条消息,消息的 ID 由用户指定,消息的字段和值由用户指定。
127.0.0.1:6379> XADD mystream 1000 name John age 30
"1000-0"
  1. XRANGE key start end [COUNT count]:返回指定 Stream 中指定范围内的消息,范围由 start 和 end 指定,可以使用 “-” 表示最大或最小 ID,COUNT 参数表示返回消息的数量。
127.0.0.1:6379> XRANGE mystream 1000-0 1000-1
1) 1) "1000-0"
      2) 1) "name"
         2) "John"
         3) "age"
         4) "30"
  1. XREVRANGE key end start [COUNT count]:同 XRANGE 命令,但是返回的消息是逆序的。
127.0.0.1:6379> XADD mystream 1001 name Tom age 25
"1001-0"
127.0.0.1:6379> XADD mystream 1002 name Mary age 28
"1002-0"
127.0.0.1:6379> XADD mystream 1003 name Jack age 30
"1003-0"
127.0.0.1:6379> XREVRANGE mystream 1003-0 1001-0
1) 1) "1003-0"
      2) 1) "name"
         2) "Jack"
         3) "age"
         4) "30"
   2) 1) "1002-0"
      2) 1) "name"
         2) "Mary"
         3) "age"
         4) "28"
   3) 1) "1001-0"
      2) 1) "name"
         2) "Tom"
         3) "age"
         4) "25"
  1. XLEN key:返回指定 Stream 中消息的数量。
127.0.0.1:6379> XLEN mystream
1
  1. XREAD [COUNT count] [BLOCK milliseconds] STREAMS key [ID]:从指定 Stream 中读取消息,可以指定读取的消息数量和阻塞时间,如果没有新的消息,则等待指定时间后返回空结果。
127.0.0.1:6379> XADD mystream 1001 name Tom age 25
"1001-0"
127.0.0.1:6379> XADD mystream 1002 name Mary age 28
"1002-0"
127.0.0.1:6379> XADD mystream 1003 name Jack age 30
"1003-0"
127.0.0.1:6379> XREAD COUNT 2 BLOCK 1000 STREAMS mystream 0
1) 1) "mystream"
   2) 1) 1) "1001-0"
         2) 1) "name"
            2) "Tom"
            3) "age"
            4) "25"
      2) 1) "1002-0"
         2) 1) "name"
            2) "Mary"
            3) "age"
            4) "28"
  1. XACK key group ID [ID …]:确认指定消费者组已经处理了指定 ID 的消息。
127.0.0.1:6379> XACK mystream mygroup 1000-0
(integer) 1
  1. XGROUP CREATE key groupname ID [MKSTREAM]:创建一个新的消费者组,可以指定组名和起始 ID,如果指定 MKSTREAM 参数,则会自动创建 Stream。
127.0.0.1:6379> XGROUP CREATE mystream mygroup 1000-0
OK
  1. XGROUP SETID key groupname ID:设置消费者组的起始 ID。
127.0.0.1:6379> XADD mystream 1001 name Tom age 25
"1001-0"
127.0.0.1:6379> XADD mystream 1002 name Mary age 28
"1002-0"
127.0.0.1:6379> XADD mystream 1003 name Jack age 30
"1003-0"
127.0.0.1:6379> XGROUP CREATE mystream mygroup $
OK
127.0.0.1:6379> XGROUP SETID mystream mygroup 1002-0
OK
  1. XGROUP DESTROY key groupname:销毁指定的消费者组。
127.0.0.1:6379> XADD mystream 1001 name Tom age 25
"1001-0"
127.0.0.1:6379> XADD mystream 1002 name Mary age 28
"1002-0"
127.0.0.1:6379> XADD mystream 1003 name Jack age 30
"1003-0"
127.0.0.1:6379> XGROUP CREATE mystream mygroup $
OK
127.0.0.1:6379> XGROUP DESTROY mystream mygroup
(integer) 1
  1. XGROUP DELCONSUMER key groupname consumername:从指定消费者组中删除指定的消费者。
127.0.0.1:6379> XADD mystream 1001 name Tom age 25
"1001-0"
127.0.0.1:6379> XADD mystream 1002 name Mary age 28
"1002-0"
127.0.0.1:6379> XADD mystream 1003 name Jack age 30
"1003-0"

127.0.0.1:6379> XGROUP CREATE mystream mygroup $
OK
127.0.0.1:6379> XREADGROUP GROUP mygroup myconsumer COUNT 1 STREAMS mystream >
1) 1) "mystream"
   2) 1) 1) "1001-0"
         2) 1) "name"
            2) "Tom"
            3) "age"
            4) "25"

127.0.0.1:6379> XGROUP DELCONSUMER mystream mygroup myconsumer
(integer) 1
  1. XREADGROUP GROUP groupname consumername [COUNT count] [BLOCK milliseconds] STREAMS key [ID]:从指定消费者组中读取消息,可以指定读取的消息数量和阻塞时间,如果没有新的消息,则等待指定时间后返回空结果。
127.0.0.1:6379> XREADGROUP GROUP mygroup consumer1 STREAMS mystream >
1) 1) "mystream"
   2) 1) 1) "1000-0"
         2) 1) "name"
            2) "John"
            3) "age"
            4) "30"
  1. XCLAIM key groupname consumername min-idle-time ID [ID …] [IDLE milliseconds] [TIME milliseconds] [RETRYCOUNT count] [FORCE]:从指定消费者组中获取一条未被确认的消息,并将其标记为正在处理,可以指定最小空闲时间、最大空闲时间、重试次数等参数。
127.0.0.1:6379> XADD mystream 1001 name Tom age 25
"1001-0"
127.0.0.1:6379> XADD mystream 1002 name Mary age 28
"1002-0"
127.0.0.1:6379> XADD mystream 1003 name Jack age 30
"1003-0"

127.0.0.1:6379> XGROUP CREATE mystream mygroup $
OK
127.0.0.1:6379> XREADGROUP GROUP mygroup myconsumer COUNT 1 STREAMS mystream >
1) 1) "mystream"
   2) 1) 1) "1001-0"
         2) 1) "name"
            2) "Tom"
            3) "age"
            4) "25"

127.0.0.1:6379> XCLAIM mystream mygroup myconsumer 0 1001-0
1) 1) "mystream"
   2) 1) 1) "1001-0"
         2) 1) "name"
            2) "Tom"
            3) "age"
            4) "25"
  1. XDEL key ID [ID …]:从指定 Stream 中删除指定 ID 的消息。
127.0.0.1:6379> XDEL mystream 1000-0
(integer) 1
  1. XTRIM key MAXLEN [~] count:删除 Stream 中多余的消息,可以指定删除的数量或者删除到指定的 ID。
127.0.0.1:6379> XADD mystream 1001 name Tom age 25
"1001-0"
127.0.0.1:6379> XADD mystream 1002 name Mary age 28
"1002-0"
127.0.0.1:6379> XADD mystream 1003 name Jack age 30
"1003-0"
127.0.0.1:6379> XADD mystream 1004 name Lucy age 27
"1004-0"
127.0.0.1:6379> XADD mystream 1005 name Bob age 32
"1005-0"

127.0.0.1:6379> XTRIM mystream MAXLEN 3
(integer) 2

4、java 写一点 Stream 的 demo

我这边 redis 的版本是 Redis-x64-5.0.14.1,windows 上玩的,绿色版的,下载地址https://github.com/tporadowski/redis/releases

在这里插入图片描述
一下是 demo 的代码:

package com.pany.camp.redis;

import cn.hutool.core.thread.ThreadUtil;
import cn.hutool.json.JSON;
import com.alibaba.fastjson.JSONObject;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import redis.clients.jedis.Jedis;
import redis.clients.jedis.StreamConsumersInfo;
import redis.clients.jedis.StreamEntry;
import redis.clients.jedis.StreamEntryID;
import redis.clients.jedis.exceptions.JedisDataException;
import redis.clients.jedis.params.XReadGroupParams;
import redis.clients.jedis.params.XReadParams;

import java.util.*;

public class RedisStreamDemo {
    private static final Logger logger = LoggerFactory.getLogger(RedisStreamDemo.class);
    private static final int MESSAGE_READ_COUNT = 1;
    private static final long MESSAGE_READ_TIMEOUT = 120000L;

    public static void main(String[] args) {
        // 创建 Jedis 实例
        try (Jedis jedis = new Jedis("127.0.0.1", 6379)) {
            // 定义 Stream 名称和消费者组名称
            String streamName = "mystream";
            String groupName = "mygroup5";
            // 创建消费者组
            try {
                jedis.xgroupCreate(streamName, groupName, new StreamEntryID(), true);
            } catch (JedisDataException e) {
                // 如果 Stream 已经存在,则忽略异常
                if (!e.getMessage().contains("BUSYGROUP")) {
                    throw e;
                }
            }
            logger.info("消费者组已创建");
            // 添加消息到 Stream 中
            Map<String, String> fields = new HashMap<>();
            fields.put("field1", "value1");
            fields.put("field2", "value2");
            StreamEntryID messageId = jedis.xadd(streamName, StreamEntryID.NEW_ENTRY, fields);
            logger.info("消息已添加到 Stream 中,消息内容为:{}", JSONObject.toJSONString(fields));
            // 读取消息
            Map.Entry<String, StreamEntryID> streams = new AbstractMap.SimpleImmutableEntry<>(streamName,
                    new StreamEntryID().UNRECEIVED_ENTRY);
            List<Map.Entry<String, List<StreamEntry>>> messages = jedis.xreadGroup(groupName, "c1", MESSAGE_READ_COUNT,
                    MESSAGE_READ_TIMEOUT, true, streams);
            logger.info("从 Stream 中读取了 {} 条消息", messages.size());
            for (Map.Entry<String, List<StreamEntry>> entry : messages) {
                String sn = entry.getKey();
                List<StreamEntry> streamMessages = entry.getValue();
                for (StreamEntry message : streamMessages) {
                    logger.info("Stream 名称:{}", sn);
                    logger.info("Message ID:{}", message.getID());
                    logger.info("Message fields:{}", message.getFields());
                }
            }
            // 确认消息已经被消费
            jedis.xack(streamName, groupName, messageId);
            logger.info("消息已确认消费");
            // 删除消息
            jedis.xdel(streamName, messageId);
            logger.info("消息已删除");
        } catch (Exception e) {
            logger.error("执行 Redis 操作出错", e);
        }
    }
}
<dependency>
    <groupId>redis.clients</groupId>
    <artifactId>jedis</artifactId>
    <version>3.7.0</version>
</dependency>

在这里插入图片描述

5、Stream 的应用场景

Redis Stream 的常用的应用场景:

  1. 消息队列:Stream 可以作为一个高性能的消息队列使用,支持多个消费者对同一 Stream 进行消费,且支持消费者组的管理、消息确认和消息持久化等功能。

  2. 日志收集:Stream 可以作为一个分布式的日志收集系统使用,支持多个客户端将日志写入到同一 Stream 中,且支持按照时间戳和 ID 进行查询和过滤。

  3. 实时数据处理:Stream 可以作为一个实时数据处理系统使用,支持多个客户端将实时数据写入到同一 Stream 中,且支持按照时间戳和 ID 进行查询和过滤。

  4. 事件驱动架构:Stream 可以作为一个事件驱动架构的基础设施使用,支持多个事件源将事件写入到同一 Stream 中,且支持按照事件类型和时间戳进行查询和过滤。

💕💕 本文由激流原创,首发于CSDN博客,博客主页 https://blog.csdn.net/qq_37967783?spm=1010.2135.3001.5421
💕💕喜欢的话记得点赞收藏啊

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/630089.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

电脑技巧:Windows微信3.9.5更新一览

目录 01、新增锁定功能 02、可直接撤回正在发送的消息 03、翻译多个网页 04、搜一搜新增历史记录 05、视频号页面再次优化 近期&#xff0c;Windows微信又更新至3.9.5版本&#xff0c;新增了许多实用的功能&#xff0c;以下将对这些新功能进行介绍。 官方更新内容&#x…

mybatis-plus用法(一)

MyBatis-plus 是一款 Mybatis 增强工具&#xff0c;用于简化开发&#xff0c;提高效率。下文使用缩写 mp来简化表示 MyBatis-plus&#xff0c;本文主要介绍 mp 整合 Spring Boot 的使用。 (5条消息) mybatis-plus用法&#xff08;二&#xff09;_渣娃工程师的博客-CSDN博客 1…

【K哥爬虫普法】一个人、一年半、挣了2000万!

我国目前并未出台专门针对网络爬虫技术的法律规范&#xff0c;但在司法实践中&#xff0c;相关判决已屡见不鲜&#xff0c;K 哥特设了“K哥爬虫普法”专栏&#xff0c;本栏目通过对真实案例的分析&#xff0c;旨在提高广大爬虫工程师的法律意识&#xff0c;知晓如何合法合规利用…

2023免费版电脑视频剪辑软件会声会影

提到视频剪辑软件&#xff0c;浮现在我们脑海的可能就是满屏的功能键和眼花缭乱的操作界面。类似pr、AE之类的视频软件&#xff0c;操作界面看起来十分复杂&#xff0c;很多用户上手困难。而会声会影界面简单&#xff0c;功能齐全&#xff0c;也能完成专业级的视频制作。操作简…

基于SSM+JSP的疫情居家办公OA系统设计与实现

博主介绍&#xff1a; 大家好&#xff0c;我是一名在Java圈混迹十余年的程序员&#xff0c;精通Java编程语言&#xff0c;同时也熟练掌握微信小程序、Python和Android等技术&#xff0c;能够为大家提供全方位的技术支持和交流。 我擅长在JavaWeb、SSH、SSM、SpringBoot等框架下…

springboot+vue.js大学生竞赛报名作品评分管理系统

本文介绍了大学生竞赛管理系统的开发全过程。通过分析大学生竞赛管理系统管理的不足&#xff0c;创建了一个计算机管理大学生竞赛管理系统的方案。文章介绍了大学生竞赛管理系统的系统分析部分&#xff0c;包括可行性分析等&#xff0c;系统设计部分主要介绍了系统功能设计和数…

Get请求如何传递数组、对象

文章目录 Get请求如何传递数组1、将数组参数传递多次2、直接将数组指用逗号分隔 Get请求如何传递对象 Get请求如何传递数组 1、将数组参数传递多次 可以将数组参数传递多次&#xff0c;springmvc会将多个同名参数自动封装成数组或者集合对象&#xff0c;示例如下&#xff1a;…

JavaScript库:jQuery,简化编程

1. jQuery介绍 官方网站 : https://jquery.com jQuery 是一个 JavaScript 库 。极大地简化了 JavaScript 编程&#xff0c;例如 JS 原生代码几十行 实现的功 能&#xff0c; jQuery 可能一两行就可以实现&#xff0c;因此得到前端程序猿广泛应用。 发展至今&#xff0…

chatgpt赋能python:Python如何保存数据到CSV文件中

Python如何保存数据到CSV文件中 作为一门广泛应用于数据分析和机器学习的编程语言&#xff0c;Python提供了许多方法来处理和保存数据。其中之一是将数据保存到CSV文件中。本篇文章将介绍如何使用Python保存数据到CSV文件&#xff0c;在此过程中&#xff0c;我们会提到一些有用…

【保姆级】如何创建一个Vue工程

如何创建一个Vue工程 文章目录 如何创建一个Vue工程1、下载安装Node.js2、配置环境变量3、npm 安装淘宝镜像4、安装Vue CliVue 安装失败原因 5、在线创建工程创建工程启动服务启动报错停止服务重启服务 1、下载安装Node.js Node.js是一个js运行环境&#xff0c;Vue工程需要建立…

端午节特别活动 | 在 Python 中制作端午节游戏

端午节将至&#xff0c;为了丰富人们的节日生活&#xff0c;CSDN为大家带来了特别的端午礼包。作为一名程序员&#xff0c;我们又该如何在节日中发挥自己的特长呢&#xff1f;在本篇文章中&#xff0c;我们将使用 Python 制作一个端午节相关的小游戏&#xff0c;让大家在游戏中…

mac m1/m2 芯片安装 ps 2023 插件无法显示扩展界面

碎碎念&#xff1a;一直在踩坑的路上&#xff0c;甚至想休息时间玩一会儿 ps 都能踩坑 问题描述 新的 m2 芯片 mac 安装了色环插件后&#xff0c;在窗口界面中没有找到扩展&#xff0c;且在首选项->增效工具的旧版扩展也是灰色的 题外话&#xff1a;记录一下 mac 的 photo…

字节跳动大数据容器化构建与落地实践

动手点关注 干货不迷路 随着字节跳动旗下业务的快速发展&#xff0c;数据急剧膨胀&#xff0c;原有的大数据架构在面临日趋复杂的业务需求时逐渐显现疲态。而伴随着大数据架构向云原生演进的行业趋势&#xff0c;字节跳动也对大数据体系进行了云原生改造。本文将详细介绍字节跳…

指令模板:技术文档设计与结构化内容架构 | AIGC实践

【题外话】 在上一篇文章中&#xff0c;有朋友反馈说&#xff0c;【见睿思齐】的字号设置得太小了&#xff0c;读起来有点儿费劲。 首先&#xff0c;特别感谢这位热心读者&#xff0c;开诚布公地与我分享感受&#xff0c;提出宝贵意见&#xff0c;帮助我做得更好。 因此在这篇文…

钉钉机器人客服系统AI知识库对接

钉钉机器人比较灵活方便&#xff0c;可以按照下面的方式操作&#xff0c;我们现在创建企业内部应用机器人可以单聊&#xff0c;也可以在群里进行&#xff0c;机器人会通过GPT私有数据知识库自动回复 应用场景 企业内部知识库机器人&#xff0c;企业员工可以在钉钉上对内部的知识…

基于SSM+Vue的药品商超销售进销存网站设计与实现

博主介绍&#xff1a; 大家好&#xff0c;我是一名在Java圈混迹十余年的程序员&#xff0c;精通Java编程语言&#xff0c;同时也熟练掌握微信小程序、Python和Android等技术&#xff0c;能够为大家提供全方位的技术支持和交流。 我擅长在JavaWeb、SSH、SSM、SpringBoot等框架下…

硬件设计电源系列文章-DCDC转换器电感和电容的选择

文章目录 概要 整体架构流程 技术名词解释 技术细节 1、计算必要的电感值 2、计算电感的最大电流 3. 输出电容的选定 4.输入电容的选定 5.Cbypass去耦电容 小结 概要 提示&#xff1a;这里可以添加技术概要 例如&#xff1a; 本文将深入分享在电源设计中一些设计规则。 整体…

一文彻底弄懂ConcurrentHashMap,轻松应对面试官!

文章目录 相关锁synchronizedCASvolatile&#xff08;非锁&#xff09;自旋锁分段锁ReentrantLock 正文JDK1.7 中的 ConcurrentHashMapget方法put方法 JDK1.8 中的 ConcurrentHashMapput方法initTable 初始化数组helpTransfer 协助扩容addCount 扩容判断get方法reomve 方法 提问…

解读共识算法Raft

文章目录 共识算法的特征1、领导者选举1.1 成员身份1.2 节点如何通信&#xff1f;1.3 什么是任期&#xff1f;1.4 选举领导者的过程以及选举的规则1.5 如何理解随机超时时间&#xff1f; 2、日志复制2.1 如何理解日志&#xff1f;2.2 如何复制日志&#xff1f;2.3 如何实现日志…

不愧是阿里大牛整理的java高级工程师面试 1000 题,面面俱到,太全了

4 月份利用空余时间收集整理了一份《java 高级工程师面试 1000 题》&#xff0c;初衷也很简单&#xff0c;就是希望在面试复习的时候能够帮助到大家&#xff0c;减轻大家的负担和节省时间。 前几天定了初稿&#xff0c;朋友圈分享了一波&#xff0c;收到了很多建议&#xff0c…