使用Redis实现消息队列:List、Pub/Sub和Stream的实践

news2025/1/8 5:51:18

摘要

Redis是一个高性能的键值存储系统,它的多种数据结构使其成为实现消息队列的理想选择。本文将探讨如何使用Redis的List、Pub/Sub和Stream数据结构来实现一个高效的消息队列系统。

1. 消息队列的基本概念

消息队列是一种应用程序之间进行通信的机制,允许应用程序以异步的方式发送和接收消息。它在分布式系统中用于解耦服务组件,提高系统的可扩展性和可靠性。

2. Redis作为消息队列的优势

  • 高性能:Redis是基于内存的操作,读写速度极快。
  • 多种数据结构:支持List、Set、Pub/Sub等多种数据结构,适用于不同的使用场景。
  • 持久化:支持数据的持久化,保证消息的不丢失。
  • 原子操作:支持事务和原子操作,确保消息队列操作的一致性。

3. 使用List实现消息队列

List是Redis中的基本数据结构之一,可以用作简单的消息队列。

3.1 基本操作

  • 生产者:使用LPUSH命令将消息插入到List的头部。
  • 消费者:使用BRPOP命令从List的尾部阻塞式地获取消息。

3.2 实现示例

// 生产者
jedis.lpush("queue", "message");

// 消费者
String message = jedis.brpop(0, "queue");

4. 使用Pub/Sub实现发布/订阅模式

Pub/Sub是一种消息发布和订阅的模式,可以实现一对多的消息传递。

4.1 基本操作

  • 发布者:使用PUBLISH命令发布消息到指定的频道。
  • 订阅者:使用SUBSCRIBE命令订阅频道,接收消息。

4.2 实现示例

// 发布者
jedis.publish("channel", "message");

// 订阅者
jedis.subscribe(new JedisPubSub() {
    @Override
    public void onMessage(String channel, String message) {
        System.out.println("Received: " + message);
    }
}, "channel");

5. 使用Stream实现消息队列

Stream是Redis 5.0引入的新的持久化数据结构,专为消息队列和日志功能设计。

5.1 基本操作

  • 生产者:使用XADD命令向Stream添加消息。
  • 消费者:使用XREAD命令从Stream中读取消息。

5.2 实现示例

// 生产者
String messageId = jedis.xadd("stream", StreamEntry.entry("field1", "value1"));

// 消费者
List<Map.Entry<String, String>> messages = jedis.xread(StreamsXReadParams.STREAMS.entry("stream", messageId));

5.3 使用Lua脚本和Redis Stream实现高效消息队列

1. Lua脚本在Redis中的优势
  • 原子性:Lua脚本在Redis内部执行,保证了操作的原子性。
  • 性能:减少了网络往返次数,提高了执行效率。
  • 灵活性:可以编写复杂的逻辑,适应不同的业务需求。
2. 使用Lua脚本操作Redis Stream
2.1 基本操作
  • 生产者:使用XADD命令向Stream添加消息。
  • 消费者:使用XREAD命令从Stream中读取消息。
  • 消费者组:使用XREADGROUP命令实现消费者组的功能。
2.2 Lua脚本示例

以下是一个简单的Lua脚本示例,用于实现生产者和消费者的基本操作。

-- 生产者脚本
local function produce(streamKey, message)
    local result = redis.call('XADD', streamKey, '*', 'message', message)
    return result
end

-- 消费者脚本
local function consume(streamKey, groupName, consumerName)
    local result = redis.call('XREADGROUP', 'GROUP', groupName, consumerName, 'BLOCK', 5000, 'STREAMS', streamKey, 0)
    return result
end

-- 调用脚本
local streamKey = 'myStream'
local message = 'Hello, Redis Stream!'
local groupName = 'myGroup'
local consumerName = 'myConsumer'

-- 生产消息
local messageId = produce(streamKey, message)

-- 消费消息
local messages = consume(streamKey, groupName, consumerName)

3. 消费者组的使用

消费者组是Redis Stream的一个特性,允许多个消费者实例协调工作,共同消费Stream中的消息。

3.1 创建消费者组
redis.call('XGROUP', 'CREATE', streamKey, groupName, '$', 'MKSTREAM')
3.2 消费者组的读取
redis.call('XREADGROUP', 'GROUP', groupName, consumerName, 'BLOCK', 5000, 'STREAMS', streamKey, '>')

4. 总结

使用Lua脚本和Redis Stream实现消息队列,可以充分利用Redis的高性能和Lua脚本的原子性,构建一个高效、可靠的消息队列系统。消费者组的特性进一步增强了消息队列的可用性和扩展性。

5. 注意事项
  • 确保Lua脚本在执行前进行了充分的测试。
  • 考虑到消息的持久化和安全性,合理配置Redis的持久化策略。
  • 在生产环境中,监控消息队列的性能和状态,确保系统的稳定运行。
6. 参考文献
  • Redis Stream官方文档
  • Redis Lua脚本文档

6. 总结

Redis提供了多种方式来实现消息队列,每种方式都有其适用场景。List适用于简单的队列需求,Pub/Sub适用于发布/订阅模式,而Stream则提供了更强大的消息队列功能,包括持久化、消费者组等特性。
在这里插入图片描述

7. 参考文献

  • Redis官方文档
  • Redisson - Redisson provides several distributed data structures

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/1890757.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

docker容器间网络仿真工具-pumba

docker-tc&pumba docker-tc:docker-tc项目仓库 pumba:pumba项目仓库 这两个项目理论上都可以实现对容器间的网络环境进行各种模拟干预&#xff0c;包括延迟&#xff0c;丢包&#xff0c;带宽限制等。 但是我在实际使用时&#xff0c;发现docker-tc这个工具在进行网络进行模…

Docker 部署 Minio 对象存储服务器

文章目录 Github官网文档简介dockerdocker-compose.ymlmc 客户端mc 基础命令Golang 示例创建 test 账号密钥文件上传示例 Github https://github.com/minio/minio 官网 https://min.io/https://www.minio.org.cn/ 文档 https://www.minio.org.cn/docs/minio/kubernetes/up…

摸爬滚打半年,我是如何从小白进阶到渗透测试工程师

前言 工作也好几年了&#xff0c;在这摸爬滚打中&#xff0c;遇到了服务器被黑&#xff0c;网站被人DDOS攻击&#xff0c;数据库被篡改等等。服务器也不是你说不让人上就不让人上的&#xff0c;所以IT安全这个话题还是比较沉重的&#xff0c;涉及的东西很多&#xff0c;只有你…

个人博客|PHP源码|支持多国语言切换

一. 前言 今天小编给大家带来了一款可学习&#xff0c;可商用的&#xff0c;支持多国语言的个人博客网站源码&#xff0c;支持二开&#xff0c;无加密。此博客相当简洁&#xff0c;也适合海外。详细界面和功能见下面视频演示。 如果您正好有此需求源码&#xff0c;请联系小编…

SSM学习4:spring整合mybatis、spring整合Junit

spring整合mybatis 之前的内容是有service层&#xff08;业务实现层&#xff09;、dao层&#xff08;操作数据库&#xff09;&#xff0c;现在新添加一个domain&#xff08;与业务相关的实体类&#xff09; 依赖配置 pom.xml <?xml version"1.0" encoding&quo…

一篇文章搞懂弹性云服务器和轻量云服务器的区别

前言 在众多的云服务器类型中&#xff0c;弹性云服务器和轻量云服务器因其各自的特点和优势&#xff0c;受到了广大用户的青睐。那么&#xff0c;这两者之间到底有哪些区别呢&#xff1f;本文将为您详细解析。 弹性云服务器&#xff1a;灵活多变的计算资源池 弹性云服务器&…

鸿蒙开发设备管理:【@ohos.sensor (传感器)】

传感器 说明&#xff1a; 本模块首批接口从API version 8开始支持。后续版本的新增接口&#xff0c;采用上角标单独标记接口的起始版本。 导入模块 import sensor from ohos.sensor;sensor.on ACCELEROMETER on(type: SensorType.SENSOR_TYPE_ID_ACCELEROMETER, callback: C…

04-ArcGIS For JavaScript的可视域分析功能

文章目录 综述代码实现代码解析结果 综述 在数字孪生或者实景三维的项目中&#xff0c;视频融合和可视域分析&#xff0c;一直都是热点问题。Cesium中&#xff0c;支持对阴影的后处理操作&#xff0c;通过重新编写GLSL代码就能实现视域和视频融合的功能。ArcGIS之前支持的可视…

Vue+Xterm.js+WebSocket+JSch实现Web Shell终端

一、需求 在系统中使用Web Shell连接集群的登录节点 二、实现 前端使用Vue&#xff0c;WebSocket实现前后端通信&#xff0c;后端使用JSch ssh通讯包。 1. 前端核心代码 <template><div class"shell-container"><div id"shell"/>&l…

day01-切片和索引

day01-切片和索引 ndarray对象的内容可以通过索引或切片来访问和修改&#xff0c;与 Python 中list 的切片操作一样。 ndarray数组可以基于0-n的下标进行索引 注意&#xff0c;数组切片并不像列表切片会重新开辟一片空间&#xff0c;而是地址引用&#xff0c;需要使用.copy()…

【Threejs进阶教程-着色器篇】2. Uniform的基本用法与Uniform的调试

Uniform的基本用法与Uniform的调试 关于本Shader教程优化上一篇的效果优化光栅栏高度让透明度和颜色变的更平滑pow()函数借助数学工具更好的理解函数 Unifoms简介编写uniforms修改片元着色器代码借助lil.gui调试uniforms使用uniform控制颜色继续在uniforms添加颜色在着色器中接…

动态住宅代理IP的3个优点

在大数据时代的背景下&#xff0c;代理IP成为了很多企业顺利开展的重要工具。代理IP地址可以分为住宅代理IP地址和数据中心代理IP地址。选择住宅代理IP的好处是可以实现真正的高匿名性&#xff0c;而使用数据中心代理IP可能会暴露自己使用代理的情况。 住宅代理IP是指互联网服务…

JavaScript中location对象的主要属性和方法

属性 href&#xff1a;获取或设置整个URL。protocol&#xff1a;获取URL的协议部分&#xff0c;如"http:"或"https:"。host&#xff1a;获取URL的主机名&#xff08;包括端口号&#xff0c;如果有的话&#xff09;。hostname&#xff1a;获取URL的主机名&…

Android studio 打包低版本的Android项目报错

一、报错内容 Execution failed for task :app:packageRelease. > A failure occurred while executing com.android.build.gradle.internal.tasks.Workers$ActionFacade> com.android.ide.common.signing.KeytoolException: Failed to read key key0 from store "…

【Portswigger 学院】路径遍历

路径遍历&#xff08;Path traversal&#xff09;又称目录遍历&#xff08;Directory traversal&#xff09;&#xff0c;允许攻击者通过应用程序读取或写入服务器上的任意文件&#xff0c;例如读取应用程序源代码和数据、凭证和操作系统文件&#xff0c;或写入应用程序所访问或…

10 Posix API与网络协议栈

POSIX概念 POSIX是由IEEE指定的一系列标准,用于澄清和统一Unix-y操作系统提供的应用程序编程接口(以及辅助问题,如命令行shell实用程序),当您编写程序以依赖POSIX标准时,您可以非常肯定能够轻松地将它们移植到大量的Unix衍生产品系列中(包括Linux,但不限于此!)。 如…

奇瑞被曝强制加班,“896”成常态且没有加班费

ChatGPT狂飙160天&#xff0c;世界已经不是之前的样子。 更多资源欢迎关注 7 月 2 日消息&#xff0c;一位认证为“奇瑞员工”的网友近期发帖引发热议&#xff0c;奇瑞汽车内部存在强制加班行为&#xff0c;每周加班时长需大于 20 小时并且没有加班费&#xff0c;仅补贴 10 元…

人口萎缩,韩国釜山“进入消失阶段”

KlipC报道&#xff1a;调查显示&#xff0c;随着低生育率和人口老化&#xff0c;釜山人口逐渐萎缩&#xff0c;韩国第二大城市釜山显现出“进入消失阶段”的迹象。 据悉&#xff0c;“消失风险指数”是将20岁至39岁女性人口总数除以65岁及以上人口得到的数值。当该指数大于1.5…

第T3周:天气识别

&#x1f368; 本文为&#x1f517;365天深度学习训练营 中的学习记录博客&#x1f356; 原作者&#xff1a;K同学啊 一、前期工作 本文将采用CNN实现多云、下雨、晴、日出四种天气状态的识别。较上篇文章&#xff0c;本文为了增加模型的泛化能力&#xff0c;新增了Dropout层并…

【计算机体系结构】缓存的false sharing

在介绍缓存的false sharing之前&#xff0c;本文先介绍一下多核系统中缓存一致性是如何维护的。 目前主流的多核系统中的缓存一致性协议是MESI协议及其衍生协议。 MESI协议 MESI协议的4种状态 MESI协议有4种状态。MESI是4种状态的首字母缩写&#xff0c;缓存行的4种状态分别…