redis原理 6:小道消息 —— PubSub

news2025/1/11 20:01:07

前面我们讲了 Redis 消息队列的使用方法,但是没有提到 Redis 消息队列的不足之处,那就是它不支持消息的多播机制

img
img

消息多播

消息多播允许生产者生产一次消息,中间件负责将消息复制到多个消息队列,每个消息队列由相应的消费组进行消费。它是分布式系统常用的一种解耦方式,用于将多个消费组的逻辑进行拆分。支持了消息多播,多个消费组的逻辑就可以放到不同的子系统中。

如果是普通的消息队列,就得将多个不同的消费组逻辑串接起来放在一个子系统中,进行连续消费。

img
img

PubSub

为了支持消息多播,Redis 不能再依赖于那 5 种基本数据类型了。它单独使用了一个模块来支持消息多播,这个模块的名字叫着 PubSub,也就是 PublisherSubscriber,发布者订阅者模型。我们使用 Python 语言来演示一下 PubSub 如何使用。

python复制代码# -*- coding: utf-8 -*-
import time
import redis

client = redis.StrictRedis()
p = client.pubsub()
p.subscribe("codehole")
time.sleep(1)
print p.get_message()
client.publish("codehole""java comes")
time.sleep(1)
print p.get_message()
client.publish("codehole""python comes")
time.sleep(1)
print p.get_message()
print p.get_message()
python复制代码{'pattern'None'type''subscribe''channel''codehole''data'1L}
{'pattern'None'type''message''channel''codehole''data''java comes'}
{'pattern'None'type''message''channel''codehole''data''python comes'}
None

img 客户端发起订阅命令后,Redis 会立即给予一个反馈消息通知订阅成功。因为有网络传输延迟,在 subscribe 命令发出后,需要休眠一会,再通过 get\_message 才能拿到反馈消息。客户端接下来执行发布命令,发布了一条消息。同样因为网络延迟,在 publish 命令发出后,需要休眠一会,再通过 get\_message 才能拿到发布的消息。如果当前没有消息,get\_message 会返回空,告知当前没有消息,所以它不是阻塞的。

Redis PubSub 的生产者和消费者是不同的连接,也就是上面这个例子实际上使用了两个 Redis 的连接。这是必须的,因为 Redis 不允许连接在 subscribe 等待消息时还要进行其它的操作。

在生产环境中,我们很少将生产者和消费者放在同一个线程里。如果它们真要在同一个线程里,何必通过中间件来流转,直接使用函数调用就行。所以我们应该将生产者和消费者分离,接下来我们看看分离后的代码要怎么写。

消费者

py复制代码# -*- coding: utf-8 -*-
import time
import redis

client = redis.StrictRedis()
p = client.pubsub()
p.subscribe("codehole")
while True:
    msg = p.get_message()
    if not msg:
        time.sleep(1)
        continue
    print msg

生产者

py复制代码# -*- coding: utf-8 -*-
import redis

client = redis.StrictRedis()
client.publish("codehole""python comes")
client.publish("codehole""java comes")
client.publish("codehole""golang comes")

必须先启动消费者,然后再执行生产者,消费者我们可以启动多个,pubsub 会保证它们收到的是相同的消息序列。

python复制代码{'pattern'None'type''subscribe''channel''codehole''data'1L}
{'pattern'None'type''message''channel''codehole''data''python comes'}
{'pattern'None'type''message''channel''codehole''data''java comes'}
{'pattern'None'type''message''channel''codehole''data''golang comes'}

我们从消费者的控制台窗口可以看到上面的输出,每个消费者窗口都是同样的输出。第一行是订阅成功消息,它很快就会输出,后面的三行会在生产者进程执行的时候立即输出。 上面的消费者是通过轮询 get_message 来收取消息的,如果收取不到就休眠 1s。这让我们想起了第 3 节的消息队列模型,我们使用 blpop 来代替休眠来提高消息处理的及时性。

PubSub 的消费者如果使用休眠的方式来轮询消息,也会遭遇消息处理不及时的问题。不过我们可以使用 listen 来阻塞监听消息来进行处理,这点同 blpop 原理是一样的。下面我们改造一下消费者

阻塞消费者

py复制代码# -*- coding: utf-8 -*-
import time
import redis

client = redis.StrictRedis()
p = client.pubsub()
p.subscribe("codehole")
for msg in p.listen():
    print msg

代码简短了很多,不需要再休眠了,消息处理也及时了。

模式订阅

上面提到的订阅模式是基于名称订阅的,消费者订阅一个主题是必须明确指定主题的名称。如果我们想要订阅多个主题,那就 subscribe 多个名称。

sh复制代码> subscribe codehole.image codehole.text codehole.blog  # 同时订阅三个主题,会有三条订阅成功反馈信息
1) "subscribe"
2) "codehole.image"
3) (integer) 1
1) "subscribe"
2) "codehole.text"
3) (integer) 2
1) "subscribe"
2) "codehole.blog"
3) (integer) 3

这样生产者向这三个主题发布的消息,这个消费者都可以接收到。

sh复制代码> publish codehole.image https://www.google.com/dudo.png
(integer) 1
> publish codehole.text " 你好,欢迎加入码洞 "
(integer) 1
> publish codehole.blog '{"content": "hello, everyone", "title": "welcome"}'
(integer) 1

如果现在要增加一个主题codehole.group,客户端必须也跟着增加一个订阅指令才可以收到新开主题的消息推送。

为了简化订阅的繁琐,redis 提供了模式订阅功能Pattern Subscribe,这样就可以一次订阅多个主题,即使生产者新增加了同模式的主题,消费者也可以立即收到消息

bash复制代码> psubscribe codehole.*  # 用模式匹配一次订阅多个主题,主题以 codehole. 字符开头的消息都可以收到
1) "psubscribe"
2) "codehole.*"
3) (integer) 1

消息结构

前面的消费者消息输出时都是下面的这样一个字典形式

python复制代码{'pattern'None'type''subscribe''channel''codehole''data'1L}
{'pattern'None'type''message''channel''codehole''data''python comes'}
{'pattern'None'type''message''channel''codehole''data''java comes'}
{'pattern'None'type''message''channel''codehole''data''golang comes'}

那这几个字段是什么含义呢?

data 这个毫无疑问就是消息的内容,一个字符串。

channel 这个也很明显,它表示当前订阅的主题名称。

type 它表示消息的类型,如果是一个普通的消息,那么类型就是 message,如果是控制消息,比如订阅指令的反馈,它的类型就是 subscribe,如果是模式订阅的反馈,它的类型就是 psubscribe,还有取消订阅指令的反馈 unsubscribe 和 punsubscribe。

pattern 它表示当前消息是使用哪种模式订阅到的,如果是通过 subscribe 指令订阅的,那么这个字段就是空。

PubSub 缺点

PubSub 的生产者传递过来一个消息,Redis 会直接找到相应的消费者传递过去。如果一个消费者都没有,那么消息直接丢弃。如果开始有三个消费者,一个消费者突然挂掉了,生产者会继续发送消息,另外两个消费者可以持续收到消息。但是挂掉的消费者重新连上的时候,这断连期间生产者发送的消息,对于这个消费者来说就是彻底丢失了。

如果 Redis 停机重启,PubSub 的消息是不会持久化的,毕竟 Redis 宕机就相当于一个消费者都没有,所有的消息直接被丢弃。

正是因为 PubSub 有这些缺点,它几乎找不到合适的应用场景。所以 Redis 的作者单独开启了一个项目 Disque 专门用来做多播消息队列。该项目目前没有成熟,一直长期处于 Beta 版本,但是相应的客户端 sdk 已经非常丰富了,就待 Redis 作者临门一脚发布一个 Release 版本。关于 Disque 的更多细节,本小册不会多做详细介绍,感兴趣的同学可以去阅读相关文档。

补充

近期 Redis5.0 新增了 Stream 数据结构,这个功能给 Redis 带来了持久化消息队列,从此 PubSub 可以消失了,Disqueue 估计也永远发不出它的 Release 版本了。

本文由 mdnice 多平台发布

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/834347.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

OPENCV C++(一) 二进制和灰度原理 处理每个像素点值的方法

#include <opencv2/opencv.hpp> using namespace std; using namespace cv;必须包含的头文件&#xff01; 才能开始编写代码 读取相片 一般来说加个保护程序 不至于出error和卡死 Mat image imread("test.webp"); //存放自己图像的路径 if (image.empty()){p…

虚拟机网络图标不见了

有3台虚拟机之前正常运行的&#xff0c;有一天打开虚拟机发现2台虚拟机的网络连接图标不见了&#xff0c;也ping不通另外两台。 解决&#xff1a;在终端执行以下命令&#xff0c;即可ping通 [roothadoop103 ~]# sudo nmcli network off [roothadoop103 ~]# sudo nmcli network…

大数据Flink(五十六):Standalone伪分布环境(开发测试)

文章目录 Standalone伪分布环境(开发测试) 一、架构图 二、环境准备 三、下载安装包</

Android LinearLayout dynamic add child ImageView,Glide load,kotlin

Android LinearLayout dynamic add child ImageView&#xff0c;Glide load&#xff0c;kotlin images.xml <?xml version"1.0" encoding"utf-8"?> <LinearLayout xmlns:android"http://schemas.android.com/apk/res/android"andro…

postgresql|数据库|MySQL数据库向postgresql数据库迁移的工具pgloader的部署和初步使用

前言&#xff1a; MySQL数据库和postgresql数据库之间的差异并不多&#xff0c;这里的差异指的是对SQL语言的支持两者并不大&#xff0c;但底层的东西差异是非常多的&#xff0c;例如&#xff0c;MySQL的innodb引擎概念&#xff0c;数据库用户管理&#xff0c;这些和postgresq…

循环队列——数据结构与算法

&#x1f636;‍&#x1f32b;️Take your time ! &#x1f636;‍&#x1f32b;️ &#x1f4a5;个人主页&#xff1a;&#x1f525;&#x1f525;&#x1f525;大魔王&#x1f525;&#x1f525;&#x1f525; &#x1f4a5;代码仓库&#xff1a;&#x1f525;&#x1f525;魔…

24考研数据结构-哈夫曼树与哈夫曼编码

这里写目录标题 5.5树与二叉树的应用5.5.1 哈夫曼树和哈夫曼编码1. 带权路径长度的定义2. 哈夫曼树的定义&#xff08;最优二叉树&#xff0c;不唯一&#xff09;3. 哈夫曼树的构造4. 哈夫曼树的特点5.哈夫曼编码&#xff08;最短二进制前缀编码&#xff09; 数据结构&#xff…

Windows terminal 添加 git bash 解决git中文乱码显示问题

Windows terminal 添加 git bash 解决git中文乱码显示问题 在 windows terminal 中配置git 说明&#xff1a; 点击箭头选择设置 说明&#xff1a; 点击"添加新配置文件"配置名称命令行&#xff0c;可执行文件的具体语句 C:\Program Files\Git\bin\bash.exe启动目录…

【100天精通python】Day25:python的编程方式以及并发编程详解

目录 专栏导读 1 python的编程方式 2 顺序编程 3 面向对象编程 4 函数式编程 5 并发编程 5.1 多线程编程 threading模块常用用法 1 创建线程&#xff1a; 2 启动线程&#xff1a; 3 等待线程执行完毕&#xff1a; 4 获取当前活动线程数量&#xff1a; 5 获取当前线程…

elasticSearch常见的面试题

常见的面试问题 描述使用场景 es集群架构3个节点&#xff0c;根据不同的服务创建不同的索引&#xff0c;根据日期和环境&#xff0c;平均每天递增60*2&#xff0c;大约60Gb的数据。 调优技巧 原文参考&#xff1a;干货 | BAT等一线大厂 Elasticsearch面试题解读 - 掘金 设计阶…

Mac 安装不在 Apple 商店授权的应用程序

文章目录 一、场景介绍二、实操说明 一、场景介绍 在日常的工作生活中&#xff0c;发现一些好用的应用程序&#xff0c;但是出于某些原因&#xff0c;应用程序的开发者并没有将安装包上架到苹果商店。 那么这些优秀的应用程序下载安装以后就会出现如下弹框被拒之门外 二、实操…

【react】react生命周期钩子函数:

文章目录 一、生命周期概念:二、生命周期:三、挂载阶段&#xff08;constructor > render > componentDidMount&#xff09;&#xff1a;四、更新阶段&#xff08;render > componentDidUpdate&#xff09;&#xff1a;五、卸载阶段&#xff08;componentWillUnmount …

基于STM32+微信小程序设计的个人健康助手(腾讯云IOT)

一、设计需求 1.1 项目背景 21世纪,社会高速发展,生活物质越来越丰富。为了追求更高的物质享受,人们不断消耗人体健康机制去拼搏,导致身体抵抗能力下降,引发各种疾病。因此,身体健康状况越来越备受大家的关注,健康意识也得到普遍提高。正常的体温是保障人体内部器官工…

【雕爷学编程】MicroPython动手做(36)——MixPY之Hello world 2

MixPY——让爱(AI)触手可及 主控芯片&#xff1a;K210&#xff08;64位双核带硬件FPU和卷积加速器的 RISC-V CPU&#xff09; 显示屏&#xff1a;LCD_2.8寸 320*240分辨率&#xff0c;支持电阻触摸 摄像头&#xff1a;OV2640&#xff0c;200W像素 扬声器&#xff1a;3W单…

Linux中提示No such file or directory解决方法

说明&#xff1a; 在linux下&#xff0c;./xxx.sh执行shell脚本时会提示No such file or directory。但shell明明存在&#xff0c;为什么就是会提示这个呢&#xff1f; 这种其实是因为编码方式不对&#xff0c;如你在win下编辑sh&#xff0c;然后直接复制到linux下面 实现&…

使用idea如何生成webservice客户端

需求阐述 在和外围系统对接的时候&#xff0c;对方只给了wsdl地址&#xff0c;记得之前了解到的webservice&#xff0c;可以用idea生成客户端代码。先记录生成的步骤 使用idea如何生成webservice客户端 1.创建一个Java项目 2.第二步生成代码 我的idea再右键要生成文件目录里…

使用 LangChain 搭建基于 Amazon DynamoDB 的大语言模型应用

LangChain 是一个旨在简化使用大型语言模型创建应用程序的框架。作为语言模型集成框架&#xff0c;在这个应用场景中&#xff0c;LangChain 将与 Amazon DynamoDB 紧密结合&#xff0c;构建一个完整的基于大语言模型的聊天应用。 本次活动&#xff0c;我们特意邀请了亚马逊云科…

stm32与上位机电脑间最快的通信方式是什么?

对于小型多关节机械臂的控制电路设计&#xff0c;选择合适的通信方式可以提高MCU与上位机之间的实时性。以下是一些在STM32上常用的通信方式&#xff0c;你可以根据你的具体需求选择适合的&#xff1a; 串口通信&#xff08;UART&#xff09;&#xff1a;串口通信是一种常见的…

小米平板6将推14英寸版!与MIX Fold 3同步推出

今天&#xff0c;知名数码博主数码闲聊站爆料消息&#xff0c;称小米平板6将推出一款Max版本&#xff0c;预计与小米MIX Fold 3同步推出。 据介绍&#xff0c;小米平板6 Max将是小米首款14英寸大屏的旗舰平板&#xff0c;平板搭载骁龙8处理器&#xff0c;在性能释放、影音表现、…

硅谷AI启示录,中国式AI 避坑指南

点击关注 ​ 《AI未来指北》栏目由腾讯新闻推出&#xff0c;邀约全球业内专家、创业者、投资人&#xff0c;探讨AI领域的技术发展、商业模式、应用场景及治理挑战&#xff0c;本期聚焦硅谷近期AI投融资现状。 文丨郝 鑫 编丨苏扬、刘雨琦、王一粟 一扫去年裁员、股价暴跌的阴霾…