【Redis】Redis 的学习教程(九)之 发布 Pub、订阅 Sub

news2025/1/11 0:32:31

1. Pub/Sub 介绍

Redis 的发布订阅(Pub/Sub)模式是一种消息传递机制,它允许在发送者和接收者之间建立松耦合的通信关系。在这种模式中,发送者(发布者)将消息发布到一个指定的频道或模式,而接收者(订阅者)可以订阅一个或多个频道,以便接收发布的消息。

以下是Redis发布订阅模式的主要组件:

  • 发布者(Publisher):发布者是产生并发布消息的实体。它可以将消息发送到指定的频道或模式。
  • 订阅者(Subscriber):订阅者是接收并处理消息的实体。它可以订阅一个或多个频道或模式,以便接收相关的消息。
  • 频道(Channel):频道是发布者和订阅者之间的通信渠道。发布者将消息发送到频道,而订阅者从频道接收消息。

可以看下图,Publisher 和 Subscriber、Channel 的关系很清晰:

在这里插入图片描述
发布者往 “Channel A” 通道发布消息:Hello World!,消息的所有订阅者就会收到这个消息

2. 使用 Pub/Sub 实现发布订阅

Redis实现 发布/订阅 一共有两种模式:

  1. 使用频道(Channel)进行发布订阅
  2. 使用模式(Pattern)进行发布订阅

Redis 可以支持多个数据库,每个数据库都有自己的命名空间和数据。通过使用多个数据库,可以实现数据隔离、分区和组织

但是值得注意的是:这种发布订阅机制与 数据分区空间无关,比如在 db 0 发布消息, 其他区的订阅者都会收到消息

Redis 使用以下命令操作 Pub/Sub 工作:

  • SUBSCRIBE:订阅一个或多个频道
    • 语法:SUBSCRIBE channel [channel ...]
  • UNSUBSCRIBE :取消订阅一个或多个频道
    • 语法:UNSUBSCRIBE [channel [channel ...]]
  • PSUBSCRIBE:订阅一个或多个模式
    • 语法:PSUBSCRIBE pattern [pattern ...]
  • PUNSUBSCRIBE取消订阅一个或多个模式
    • 语法:PUNSUBSCRIBE [pattern [pattern ...]]
  • PUBSUB CHANNELS [pattern]:列出活跃的 channel
  • PUBSUB NUMSUB [channel-1 ... channel-N]:列出 channel 的订阅者个数

2.1 通过频道(Channel)进行发布订阅

通过频道(Channel)进行发布订阅过程如下:

  1. Subscriber 订阅某个 Channel,实现对 Channel 的监听
  2. Publisher 对 Channel 这个服务中心媒介发布消息
  3. 所有订阅 Channel 的 Subscriber 接收到消息

2.1.1 订阅者订阅频道

订阅后:

在这里插入图片描述
使用客户端 [subscriber A] 订阅 Channel [mychannel] 来接收消息。从上面可以看出响应的信息:

  • “subscribe” :消息类型,枚举是 subscribe、message、unsubscribe
  • “mychannel” :频道的名称
  • 最后的消息内容:不同的消息类型代表不同含义。

进入订阅后的客户端可以收到 3 种枚举类型的消息:

  • subscribe:订阅成功的消息类型,第 2 个值是订阅成功的频道名称,第 3 个值是当前客户端订阅的频道数量。
  • message:客户端接收消息的消息类型,第 2 个值表示产生消息的频道名称,第 3 个值是消息的内容。
  • unsubscribe:取消订阅的消息类型,第 2 个值是对应的频道名称,第 3 个值是当前客户端订阅的频道数量。值为 0 时说明客户端一个订阅的都没有了,退出订阅状态。

2.1.2 发布者发布消息

发布消息:

在这里插入图片描述
发布的消息并不会持久化存储下来,所以消息发布之后被某个 Subcriber 订阅到的话,消息生命周期基本就完成了

2.1.3 订阅者接收消息

想要收到上面 发布者发布的消息,我们的客户端首先需要关注了 [mychannel] 频道,才能收到 “Hello, World!” 这条消息

在这里插入图片描述

2.1.4 退订频道

如果你不想收到某个频道的消息了,你可以取消预订

2.2 使用模式(Pattern)匹配实现发布订阅

来看看另一种实现发布订阅的方案 ,就是模式匹配的方式:除了直接订阅的客户端之外,还会检查是否有与我们模式相匹配的 Channel,如果有,消息也会发布到对应匹配的频道上,订阅这个 Channel 的客户端也会收到消息

如下图:

在这里插入图片描述
当 Message.Queue.Area1 频道接收到消息之后,除了订阅自身频道的 Actor A 和 Actor B 能收到消息之外。因为频道与模式匹配成功,消息还会发送给订阅 Message.Queue.* 模式的所有人员。

在这里插入图片描述
因为使用匹配模式,PUBLISH 消息发布到 Message.Queue.Area2 之外,还会将该 Channel 与匹配模式的Channel进行对比,如果 Channel 与某个模式匹配的话,也将这个消息发布到订阅这个模式的客户端。

所以图中红色线条部分,包括 Actor C、Actor D、Actor E 都接受到了消息

2.2.1 订阅者订阅频道

Client A 订阅 Message.Queue.Area1:

在这里插入图片描述

Client B 订阅 Message.Queue.Area2:

在这里插入图片描述
Client C 订阅 Message.Queue.*:

在这里插入图片描述

2.2.2 发布者发布消息

在这里插入图片描述

2.2.3 订阅者接收消息

对应频道的订阅者收到消息(Client A ):

在这里插入图片描述
匹配模式的订阅者收到消息(Client C):

在这里插入图片描述

因为没有筛重策略,所以如果你既订阅了匹配模式(如 Message.Queue.* ),又订阅了对应的频道(如 Message.Queue.Area2),那么你的客户端会收到两条同样的消息,一条消息类型是message,一条类型是 pmessage

3. SpringBoot 整合 Redis 实现发布订阅模式

3.1 概述

订阅消息就是接收消息,这个比较复杂。既有对 Redis 连接的管理,也有对消费消息的线程池的管理。不过 Spring 已经把这个“重活”给干了。

Spring 提供了一个全套的解决方案,这里面包括:

  1. 订阅/取消订阅这些相关的用户操作
  2. 接收所有来自Redis的消息
  3. 把这些消息按照订阅关系分发给具体的消费者
  4. 触发消费消息的回调代码在线程池中运行

由于 Spring 已经全权代理,用户只需要提供要消费的 topic 以及对应的消费回调代码即可。

我们需要了解Spring提供的几个接口和类,才可以很好的使用:

  1. Topic 接口,表示一个订阅对象:它有两个实现类,ChannelTopicPatternTopic,前者对应 redis的 channel,后者对应 redis 的 pattern
  2. MessageListener 接口,回调接口,通过它来执行业务代码
  3. Message 接口,表示从 redis 接收到的消息
  4. RedisMessageListenerContainer 类,这个核心类,相当于一个代理,就是它负责接收 redis 的消息,并分发给 MessageListener
  5. RedisConnectionFactoryRedisMessageListenerContainer 需要此类 RedisConnectionFactory:redis连接工厂,用来获取一个redis连接,由于这个连接用于接收消息,所以它是一直阻塞着的
  6. 还可以为这个类指定一个 Executor,即线程池,这不是必须的,如果不指定它会生成一个默认的

SpringBoot集成Redis Messaging (Pub/Sub)

3.2 在 Springboot 中使用发布订阅

先说下在 springboot 中使用 redis 的发布订阅的步骤:

  1. 配置消息监听类(实现 MessageListener 接口,重写 onMessage() 方法)。
  2. 添加监听容器(配置 RedisMessageListenerContainer)。
  3. 订阅频道。
  4. 向频道发布消息。

3.2.1 配置消息监听类

添加一个订单消息监听器:

@Component
public class OrderSubscriber implements MessageListener {

    @Autowired
    private RedisTemplate<String, Object> redisTemplate;

    @Override
    public void onMessage(Message message, byte[] pattern) {
        // 获取消息
        byte[] messageBody = message.getBody();
        // 使用值序列化器转换
        Object msg = redisTemplate.getValueSerializer().deserialize(messageBody);
        // 获取监听的频道
        byte[] channelByte = message.getChannel();
        // 使用字符串序列化器转换
        Object channel = redisTemplate.getStringSerializer().deserialize(channelByte);
        // 渠道名称转换
        String patternStr = new String(pattern);
        System.out.println(patternStr);
        System.out.println("---频道---: " + channel);
        System.out.println("---消息内容---: " + msg);
    }

}

3.2.2 容器添加监听器、订阅频道

@Configuration
public class RedisConfig {

    @Bean
    public RedisTemplate<String, Object> redisTemplate(RedisConnectionFactory redisConnectionFactory) {
        RedisTemplate<String, Object> template = new RedisTemplate<>();
        template.setConnectionFactory(redisConnectionFactory);

        ObjectMapper om = new ObjectMapper();
        om.setVisibility(PropertyAccessor.ALL, JsonAutoDetect.Visibility.ANY);
        om.enableDefaultTyping(ObjectMapper.DefaultTyping.NON_FINAL);
        // json 序列化配置
        Jackson2JsonRedisSerializer<Object> jackson2JsonRedisSerializer = new Jackson2JsonRedisSerializer<>(Object.class);
        jackson2JsonRedisSerializer.setObjectMapper(om);
        // String 序列化
        StringRedisSerializer stringRedisSerializer = new StringRedisSerializer();
        // 所有的 key 采用 string 的序列化
        template.setKeySerializer(stringRedisSerializer);
        // 所有的 value 采用 jackson 的序列化
        template.setValueSerializer(jackson2JsonRedisSerializer);
        // hash 的 key 采用 string 的序列化
        template.setHashKeySerializer(stringRedisSerializer);
        // hash 的 value 采用 jackson 的序列化
        template.setHashValueSerializer(jackson2JsonRedisSerializer);
        template.afterPropertiesSet();
        return template;
    }

    @Bean
    public RedisMessageListenerContainer container(RedisConnectionFactory redisConnectionFactory, OrderSubscriber orderSubscriber) {
        RedisMessageListenerContainer container = new RedisMessageListenerContainer();
        // 设置连接工厂
        container.setConnectionFactory(redisConnectionFactory);
        // 监听器订阅频道
        container.addMessageListener(orderSubscriber, new ChannelTopic("order"));
        container.addMessageListener(orderSubscriber, new ChannelTopic("sms"));
        // 序列化
        Jackson2JsonRedisSerializer seria = new Jackson2JsonRedisSerializer(Object.class);
        ObjectMapper objectMapper = new ObjectMapper();
        objectMapper.setVisibility(PropertyAccessor.ALL, JsonAutoDetect.Visibility.ANY);
        objectMapper.enableDefaultTyping(ObjectMapper.DefaultTyping.NON_FINAL);
        seria.setObjectMapper(objectMapper);
        container.setTopicSerializer(seria);
        return container;
    }

}

3.2.3.1 容器添加多个监听器

添加一个短信监听器:

@Component
public class SmsSubscriber implements MessageListener {

    @Autowired
    private RedisTemplate<String, Object> redisTemplate;

    @Override
    public void onMessage(Message message, byte[] pattern) {
        // 获取消息
        byte[] messageBody = message.getBody();
        // 使用值序列化器转换
    }
}

修改配置:

@Bean
public RedisMessageListenerContainer container(RedisConnectionFactory redisConnectionFactory, OrderSubscriber orderSubscriber, SmsSubscriber smsSubscriber) {
    RedisMessageListenerContainer container = new RedisMessageListenerContainer();
    // 设置连接工厂
    container.setConnectionFactory(redisConnectionFactory);
    // 监听器订阅频道
    container.addMessageListener(orderSubscriber, Arrays.asList(new ChannelTopic("order"), new ChannelTopic("sms")));
    container.addMessageListener(smsSubscriber, new ChannelTopic("sms"));
    // 序列化
    Jackson2JsonRedisSerializer seria = new Jackson2JsonRedisSerializer(Object.class);
    ObjectMapper objectMapper = new ObjectMapper();
    objectMapper.setVisibility(PropertyAccessor.ALL, JsonAutoDetect.Visibility.ANY);
    objectMapper.enableDefaultTyping(ObjectMapper.DefaultTyping.NON_FINAL);
    seria.setObjectMapper(objectMapper);
    container.setTopicSerializer(seria);
    return container;
}

3.2.3.2 使用 PatternTopic

container.addMessageListener(smsSubscriber, new PatternTopic("redis.*"));

3.2.3 向频道发布消息

@RestController
@RequestMapping("/pub")
public class PubController {

    @Autowired
    private RedisTemplate<String, Object> redisTemplate;

    @GetMapping("/publish")
    public String publish() {
        redisTemplate.convertAndSend("order", "该订单已过期");
        redisTemplate.convertAndSend("sms", "该短信已发送");
        return "publish";
    }

}

3.3 使用 MessageListenerAdapter 实现发布订阅

1、定义 一个消息接受类

@Component
public class OrderMessageReceiver {

    public void receiveMessage(String message, String channel){
        System.out.println("---频道---: " + channel);
        System.out.println("---消息内容---: " + message);
    }
}

2、配置一个 MessageListenerAdapter

@Bean
public RedisMessageListenerContainer container(RedisConnectionFactory redisConnectionFactory, MessageListenerAdapter adapter, SmsSubscriber smsSubscriber) {
    RedisMessageListenerContainer container = new RedisMessageListenerContainer();
    // 设置连接工厂
    container.setConnectionFactory(redisConnectionFactory);
    // 监听器订阅频道
    container.addMessageListener(adapter, Arrays.asList(new ChannelTopic("order"), new ChannelTopic("sms")));
    container.addMessageListener(smsSubscriber, new PatternTopic("redis.*"));
    // 序列化
    Jackson2JsonRedisSerializer seria = new Jackson2JsonRedisSerializer(Object.class);
    ObjectMapper objectMapper = new ObjectMapper();
    objectMapper.setVisibility(PropertyAccessor.ALL, JsonAutoDetect.Visibility.ANY);
    objectMapper.enableDefaultTyping(ObjectMapper.DefaultTyping.NON_FINAL);
    seria.setObjectMapper(objectMapper);
    container.setTopicSerializer(seria);
    return container;
}

@Bean
public MessageListenerAdapter smsExpirationListener(OrderMessageReceiver messageListener) {
    MessageListenerAdapter receiveMessage = new MessageListenerAdapter(messageListener, "receiveMessage");
    // 序列化
    Jackson2JsonRedisSerializer seria = new Jackson2JsonRedisSerializer(Object.class);
    ObjectMapper objectMapper = new ObjectMapper();
    objectMapper.setVisibility(PropertyAccessor.ALL, JsonAutoDetect.Visibility.ANY);
    objectMapper.enableDefaultTyping(ObjectMapper.DefaultTyping.NON_FINAL);
    seria.setObjectMapper(objectMapper);
    receiveMessage.setSerializer(seria);
    return receiveMessage;
}

Spring boot整合Redis实现发布订阅(超详细)
springboot中使用redis发布订阅

4. 总结

当使用 Pattern 进行发布订阅的时候。如果有消息发布出来,除了订阅该 Channel 的 Client 之外,所有订阅了与 Channel 匹配的模式的 Client 同样会收到消息。

另外,Redis 发布订阅的消息不会被持久化,所以无历史消息,也不支持 ACK 机制,

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/999441.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

用户体验设计师是什么,一篇文章读懂!

我是设计师l1m0&#xff0c;今天要给大家分享一个有趣的职业&#xff1a;UX设计师。 在我们日常生活中&#xff0c;我们无时无刻都在与产品发生交互行为&#xff0c;例如使用应用APP、访问网站、与实体陈燕萍进行交互&#xff08;例如试穿衣服&#xff09;或者享受某个服务&am…

恒运资本:多股涨停!“吃药”行情卷土重来;政策利好,元宇宙又可以了!

今日早盘&#xff0c;A股小幅震荡反弹&#xff0c;科创50指数继续围绕900点打开争夺。 盘面上&#xff0c;医药&#xff0c;轿车、元世界、煤炭等板块涨幅居前&#xff0c;航空、家居用品、卫星导航、房地产等板块跌幅居前。北上资金净流出4.4亿元。 医药股全线走强 医药股早…

12个小朋友手拉手站成一个圆圈 约瑟夫环 + 字节历险记

目录 12个小朋友手拉手站成一个圆圈&#xff0c;从某一个小朋友开始报数&#xff0c;报到7的那个小朋友退到圈外&#xff0c;然后他的下一位重新报“1”。这样继续下去&#xff0c;直到最后只剩下一个小朋友求解这个小朋友原来站在什么位置上呢? 请问在互联网公司中,OKR是什…

C语言访问Mysql

文章目录 C语言访问Mysql1. 环境设置2. mysql接口介绍(1) 初始化mysql_init()(2) 链接数据库mysql_real_connect(3) 下发mysql命令mysql_query()(4) 获取执行结果mysql_store_result(5) 释放结果集mysql_free_result()(6) 获取结果行数mysql_num_rows(7) 获取结果列数mysql_num…

Java——》synchronized互斥性

推荐链接&#xff1a; 总结——》【Java】 总结——》【Mysql】 总结——》【Redis】 总结——》【Kafka】 总结——》【Spring】 总结——》【SpringBoot】 总结——》【MyBatis、MyBatis-Plus】 总结——》【Linux】 总结——》【MongoD…

Solidity 小白教程:13. 继承

Solidity 小白教程&#xff1a;13. 继承 这一讲&#xff0c;我们介绍solidity中的继承&#xff08;inheritance&#xff09;&#xff0c;包括简单继承&#xff0c;多重继承&#xff0c;以及修饰器&#xff08;modifier&#xff09;和构造函数&#xff08;constructor&#xff…

新手必看!Python爬虫 教程:IP池的使用

前言 嗨喽~大家好呀&#xff0c;这里是小曼呐 ❤ ~! 一、简介 爬虫中为什么需要使用代理 一些网站会有相应的反爬虫措施&#xff0c;例如很多网站会检测某一段时间某个IP的访问次数&#xff0c;如果访问频率太快以至于看起来不像正常访客&#xff0c;它可能就会禁止这个IP的访…

CE单相智能电力仪表ADL200

安科瑞 华楠 ADL200 单相电子式电能表主要用于计量低压网络的单相有功电能&#xff0c;同时可测量电压、电流、功率等电量&#xff0c; 并可选配 RS485 通讯功能&#xff0c;方便用户进行用电监测、集抄和管理。可灵活安装于配电箱内&#xff0c;实现对不同区域和不 同 负 荷 …

分布式系统第三讲:全局唯一ID实现方案

分布式系统第三讲&#xff1a;全局唯一ID实现方案 本文主要介绍常见的分布式ID生成方式&#xff0c;大致分类的话可以分为两类&#xff1a;一种是类DB型的&#xff0c;根据设置不同起始值和步长来实现趋势递增&#xff0c;需要考虑服务的容错性和可用性; 另一种是类snowflake型…

信息化发展34

IT 审计目的 1 、IT 审计的目的是指通过开展IT 审计工作&#xff0c; 了解组织IT 系统与IT 活动的总体状况&#xff0c; 对组织是否实现口目标进行审查和评价&#xff0c; 充分识别与评估相关口风险&#xff0c;提出评价意见及改进建议&#xff0c; 促进组织实现IT 目标。 2 、…

禁用或卸载没那么复杂!如何在Windows 11上禁用或删除McAfee

​当你从Microsoft以外的制造商处购买新的Windows设备时,该设备可能已安装McAfee防病毒软件。虽然在你的电脑上安装防病毒软件总是一个好主意,但你可能更喜欢与McAfee不同的程序。在这种情况下,了解如何卸载McAfee是很重要的。 另一方面,你可能对McAfee很满意,但需要暂时…

忽悠苹果,销量垫底?小屏iPhone已经落寞,iPhone13mini即将停产

苹果公司的小屏iPhone系列可能即将走到尽头。据彭博社报道&#xff0c;苹果最新发布的 iPhone 13 mini 库存已经见底&#xff0c;部分款式在美国官网发货时间显示为2-3周&#xff0c;甚至最久达到了6-8周。这种现象表明&#xff0c;在周二的发布活动结束后&#xff0c;苹果最后…

SpringBoot底层注解

文章目录 前言一、Configuration二、Import导入组件三、Conditional条件装配四、ImportResource导入Spring配置文件五、ConfigurationProperties配置绑定总结 前言 本文主要讲诉Configuration、Import、Conditional、ImportResource、ConfigurationProperties注解。 先将实体…

一心堂正式启动“心链-SRM”项目,携手企企通搭建引领发展的采购供应链协同平台

近日&#xff0c;中国药品连锁零售行业首家上市企业一心堂药业集团股份有限公司&#xff08;以下简称“一心堂”&#xff09;与企企通成功召开“心链-SRM”采购供应链管理项目启动会。双方高层领导及项目团队关键成员出席现场&#xff0c;各子公司管理团队通过视频会议形式参与…

双碳目标下基于“遥感+”集成技术的碳储量、碳排放、碳循环、温室气体等多领域监测与模拟实践

卫星遥感具有客观、连续、稳定、大范围、重复观测的优点&#xff0c;已成为监测全球碳盘查不可或缺的技术手段&#xff0c;卫星遥感也正在成为新一代 、国际认可的全球碳核查方法。目的就是梳理碳中和与碳达峰对卫星遥感的现实需求&#xff0c;系统总结遥感技术在生态系统碳储量…

利用VB宏设置将多个excel表合并为一个

多个excel合并成一个excel文件-应用哥科技 Sub MergeExcelFiles() 声明变量 Dim MyFile As Variant Dim MySheet As Worksheet, MySheet1 As Worksheet Dim LastRow As Long, LastRow1 As Long Dim NextRow As Long 设置初始值 NextRow 1 打开多个Excel文件 MyFile Applicati…

【算法】分治法的应用——快速排序

创作不易&#xff0c;本篇文章如果帮助到了你&#xff0c;还请点赞 关注支持一下♡>&#x16966;<)!! 主页专栏有更多知识&#xff0c;如有疑问欢迎大家指正讨论&#xff0c;共同进步&#xff01; 给大家跳段街舞感谢支持&#xff01;ጿ ኈ ቼ ዽ ጿ ኈ ቼ ዽ ጿ ኈ ቼ …

30岁+文转码程序媛求职路复盘:也算是逆袭了!

这篇文章来自一位群友的分享&#xff1a; 这篇文章写于下班路上&#xff0c;刚刚入职不久&#xff0c;我想再冲刺一下大厂&#xff0c;阳哥建议我坚持总结打卡&#xff0c;可以尝试写写博客。 那我就从这篇开始吧&#xff0c;希望开个好头&#xff01; 上班的感觉真好 今天是…

bim与数字孪生智能建造的关系

随着建筑业数字化改革的推进&#xff0c;我们正迈入数字孪生时代&#xff0c;而真正实现建筑物数字孪生的智能建造&#xff0c;其基础前提是建造对象和建造过程的高度数字化&#xff0c;这样一个过程唯有依托BIM建立数据模型才能实现&#xff0c;真正达到智能建造或智慧运维。 …

内网穿透的应用-如何搭建WordPress博客网站,并且发布至公网上?

文章目录 如何搭建WordPress博客网站&#xff0c;并且发布至公网上&#xff1f;概述前置准备1 安装数据库管理工具1.1 安装图形图数据库管理工具&#xff0c;SQL_Front 2 创建一个新数据库2.1 创建数据库2.2 为数据库创建一个用户 3 安装PHP7.44. 创建一个新站点4.1 创建站点根…