Redis 发布订阅机制深入探索

news2025/2/23 19:59:14

Redis 的发布订阅(pub/sub)机制是一种消息传递模式,允许消息的发送者(发布者)和消息的接收者(订阅者)通过一个中介层(频道)进行通信,而无需彼此直接交互。以下是 Redis 发布订阅机制的工作原理的详细解释:

  1. 基于事件驱动的架构
    Redis 服务器使用一个事件驱动的模型来处理所有的网络通信和客户端请求。这种架构允许 Redis 以非阻塞方式高效地处理多个并发连接。

  2. 发布(Publish)
    当一个客户端想要发送消息时,它会将消息发布到一个指定的频道(channel)上。
    发布操作是通过 PUBLISH 命令实现的。
    例如,PUBLISH channel_name message 会将 message 发送到名为 channel_name 的频道。
    在这里插入图片描述
    这里返回的代表有3个监听者

  3. 订阅(Subscribe)
    客户端使用 SUBSCRIBE 命令来监听一个或多个频道的消息。
    当客户端订阅了一个频道后,它会接收到发送到这个频道的所有消息。例如,通过执行 SUBSCRIBE channel_name,客户端就可以监听 channel_name 频道上的消息。

    订阅会返回3个参数:
    1:代表订阅
    2:订阅频道
    3:发布者数量

当发布着发布消息后,订阅者会被推送如下消息
在这里插入图片描述

  1. 消息传递
    一旦有消息被发布到频道上,Redis 服务器会将这个消息分发给所有订阅了该频道的客户端。
    这种消息传递方式是多对多的:多个发布者可以向同一个频道发送消息,而所有订阅该频道的客户端都可以接收到这些消息。
  2. 非持久化的消息
    Redis 发布订阅机制中的消息是非持久化的。这意味着一旦消息被发送,它不会被存储在服务器上,无法被之后订阅该频道的客户端接收。
  3. 多路复用和非阻塞 I/O
    Redis 使用多路复用技术(如 epoll、kqueue)来同时监听多个网络连接,这使得服务器能够同时处理多个发布和订阅操作。
    使用非阻塞 I/O 确保单个操作不会阻塞整个服务器,从而提高整体性能。

在springboot项目中可以如下:
发布:

@Resource
private RedisTemplate redisTemplate;


@PostMapping(value = "/add")
public Result<?> add(@RequestBody SysApplication sysApplication) {
	sysApplicationService.save(sysApplication);
	//这里需要用JSONObject.toJSONString转成string序列化,否则直接上传对象会带上对象路径信息等
	redisTemplate.convertAndSend(RedisListenerConstant.APP_CHANNEL, JSONObject.toJSONString(sysApplication));
	return Result.OK("添加成功!");
}

订阅者: 这里写了2个listener,因为想说明可以在RedisMessageConfig 同时监听多个频道

package com.sungrowpower.redis;

import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONObject;
import com.sungrowpower.common.system.dto.SysApplicationPermissionDto;
import com.sungrowpower.service.IGoodsSpuService;
import org.springframework.context.annotation.Configuration;
import org.springframework.data.redis.connection.Message;
import org.springframework.data.redis.connection.MessageListener;

import javax.annotation.Resource;
@Configuration
public class RedisMessageAppListener implements MessageListener {
    @Resource
    private IGoodsSpuService iGoodsSpuService;
    @Override
    public void onMessage(Message message, byte[] bytes) {
        // 假设 message 体是一个 JSON 字符串
        String body = new String(message.getBody());
        // 去除类路径
//        String json = body.substring(body.indexOf("{"), body.lastIndexOf("}") + 1);
        String msg= (String) JSON.parse(body);
        SysApplicationPermissionDto app = JSONObject.parseObject(msg, SysApplicationPermissionDto.class);
        System.out.println("s========"+msg);
        try {
        //这里可以调用本身的业务逻辑
            iGoodsSpuService.operateGoodsSpu(app);
        } catch (Exception e) {
            throw new RuntimeException();
        }
    }
}

 
package com.sungrowpower.redis;

import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONObject;
import com.sungrowpower.common.exception.AutoBizException;
import com.sungrowpower.common.system.dto.SysPermission;
import com.sungrowpower.service.IGoodsSpuService;
import org.springframework.data.redis.connection.Message;
import org.springframework.data.redis.connection.MessageListener;
import org.springframework.stereotype.Component;

import javax.annotation.Resource;

@Component
public class RedisMessageMenuListener implements MessageListener {

    @Resource
    private IGoodsSpuService iGoodsSpuService;
    @Override
    public void onMessage(Message message, byte[] bytes) {
        // 假设 message 体是一个 JSON 字符串
        String body = new String(message.getBody());
        // 去除类路径
//        String json = body.substring(body.indexOf("{"), body.lastIndexOf("}") + 1);
        String msg= (String) JSON.parse(body);
        SysPermission menu = JSONObject.parseObject(msg, SysPermission.class);
        System.out.println("s========"+msg);
        try {
                //这里可以调用本身的业务逻辑
            iGoodsSpuService.operateAttrValue(menu);
        } catch (Exception e) {
            throw new RuntimeException();
        }
    }

}




package com.sungrowpower.redis;

import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.data.redis.connection.RedisConnectionFactory;
import org.springframework.data.redis.listener.PatternTopic;
import org.springframework.data.redis.listener.RedisMessageListenerContainer;
import org.springframework.data.redis.listener.adapter.MessageListenerAdapter;
@Configuration
public class RedisMessageConfig {
    /**
     * 消息监听器适配器menu
     * @return org.springframework.data.redis.listener.adapter.MessageListenerAdapter
     */
    @Bean
    public MessageListenerAdapter listenerAdapterMenu(RedisMessageMenuListener receiver) {
        //这个地方是给 messageListenerAdapter 传入一个消息接受的处理器,利用反射的方法调用“onMessage”
        return new MessageListenerAdapter(receiver, "onMessage");
    }


    /**
     * 消息监听器适配器app
     * @return org.springframework.data.redis.listener.adapter.MessageListenerAdapter
     */
    @Bean
    public MessageListenerAdapter listenerAdapterApp(RedisMessageAppListener receiver) {
        //这个地方是给 messageListenerAdapter 传入一个消息接受的处理器,利用反射的方法调用“onMessage”
        return new MessageListenerAdapter(receiver, "onMessage");
    }

    /**
     * redis消息监听器容器
     * @return org.springframework.data.redis.listener.RedisMessageListenerContainer
     */
    @Bean
    public RedisMessageListenerContainer redisMessageListenerContainer(RedisConnectionFactory connectionFactory,
                                                                       MessageListenerAdapter listenerAdapterMenu,
                                                                       MessageListenerAdapter listenerAdapterApp) {
        RedisMessageListenerContainer container = new RedisMessageListenerContainer();
        container.setConnectionFactory(connectionFactory);
        // 订阅了一个叫 my-channel 的频道
        container.addMessageListener(listenerAdapterMenu, new PatternTopic("menu_channel"));
        container.addMessageListener(listenerAdapterApp, new PatternTopic("app_channel"));
        // 这个container 可以添加多个 messageListener
        return container;
    }

}

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/1269062.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

ARM麒麟V10 auditctl启动失败处理

问题&#xff1a; 业务服务器需要启用审计服务&#xff0c;但是启动审计服务失败&#xff0c;查看状态提示audit0。 修改配置文件/boot/efi/EFI/kylin/grub.cfg 删除audit0&#xff0c;或者设置audit1。 重启服务器后验证状态。 auditctl -D echo "-w /data -p rwxa"…

磁环电感参数计算

磁环电感参数计算 1.什么是电感磁饱和2.电感饱和的原因3.电感饱和带来的影响3.1 感应电动势变化3.2 电感值变化3.3 功率损耗增加3.4 系统稳定性受到影响4.饱和电流计算最近在做DC/DC电源,电感是用磁环绕制的,所以关注一下磁环绕制电感参数的计算,学习学习。 某款磁环参数。 …

【10】Python函数专题(上)

目录 1.什么是函数2.函数的参数2.1形参 和 实参2.2函数的传递方式2.3 不定长参数2.3.1 可变位置参数`*args`2.3.2可变关键字参数`**kwargs`2.3.3 小结2.4 参数的解包(也称拆包)1.什么是函数 在Python中,函数是一段可重复使用的代码块,用于执行特定任务。通过将代码封装在函…

C++设计模式——Bridge模式(下)

在上篇 《C设计模式——Bridge模式&#xff08;上&#xff09;》中我们对于桥接模式做了一些介绍。介于桥接模式在实际项目开发中使用广泛&#xff0c;而且也是面试中常问常新的话题。在本篇&#xff0c;我们专注bridge模式在具体的项目开发中的应用&#xff0c;举几个例子来说…

Linux系统安装-以文本模式安装rhel8

文本模式安装提供了用于安装 Red Hat Enterprise Linux 的交互式非图形界面。此安装方法对于没有图形功能的系统很有用。但是&#xff0c;在开始基于文本的安装之前&#xff0c;请务必考虑可用的替代方案。文本模式在安装过程中可以做出的选择数量有限。 目录 交互式文本模式安…

深入理解 Vue 中的指针操作(二)

文章目录 ☘️引言☘️基本用法&#x1f342;v-for指令&#x1f342;v-model指令&#x1f331;v-model适用表单控件 ☘️结论 ☘️引言 Vue.js 是一款非常流行且功能强大的前端框架&#xff0c;它以其响应式的数据绑定和组件化的开发方式赢得了众多开发者的喜爱。而在 Vue.js …

OKCC 客户中心

OKCC服务了这么多家客户中心&#xff0c;但很多小伙伴们其实并不是太了解客户中心的主要功能&#xff0c;那么我今天将从两类客户中心介绍下他们的主要功能。 一、 运营机构客户中心的功能 对于运营机构而言&#xff0c;客户中心的功能包括:能够帮助运营机构提升品牌形象&…

Chrome两个账号数据合并或者转移

文章目录 情况1情况2操作 情况1 「旧」账号&#xff1a;出于种种原因决定停用&#xff0c;但是账号里还有书签、历史记录、密码、设置等数据。 「新」账号&#xff1a;未来的主力账号。 需求是将「旧账号」的数据迁移到「新」账号之中。 情况描述&#xff1a;https://www.z…

openGauss学习笔记-134 openGauss 数据库运维-例行维护-检查操作系统参数

文章目录 openGauss学习笔记-134 openGauss 数据库运维-例行维护-检查操作系统参数134.1 检查办法134.2 异常处理 openGauss学习笔记-134 openGauss 数据库运维-例行维护-检查操作系统参数 134.1 检查办法 通过openGauss提供的gs_checkos工具可以完成操作系统状态检查。 前提…

Golang数据类型(字符串)

字符串重要概念 根据Go语言官方的定义&#xff1a; In Go, a string is in effect a read-only slice of bytes. 意思是Go中的字符串是一组只读的字节切片&#xff08;slice of bytes&#xff09;&#xff0c;每个字符串都使用一个或多个字节表示&#xff08;当字符为 ASCII 码…

医院室内导航解决方案:智慧医疗的重要组成部分

医院作为人们生活中不可或缺的一部分&#xff0c;面临着巨大的挑战。每天都有大量的患者前来就医&#xff0c;而医院内部的复杂结构和科室众多&#xff0c;常常让患者感到困惑和迷失。为了解决这个问题&#xff0c;医院室内导航解决方案应运而生&#xff0c;以其创新的技术和卓…

4G工业路由器智慧楼宇门禁无人值守、实时监控

门禁是我们日常生活中常见的基础设施&#xff0c;就像是现代社会智慧城市中的“门神”&#xff0c;在楼宇管理领域中普遍采用的安防卫士。4G工业路由器的物联网应用则为楼宇门禁管理带来了更加便捷和高效的解决方案。 在传统的楼宇门禁系统中&#xff0c;人员需要手动刷卡、输…

多个加速度计/麦克风连接指引

座舱内的振动投诉&#xff1a;如乘客/驾驶员在车厢内感受到传动轴、方向盘抖动剧烈 图1.三轴模式下的单个加速度计 图2.软件设置界面 如果您只有一个加速度计&#xff0c;可以在三轴模式下使用一个加速度计找出客户投诉车厢内振动最强烈的区域。例如将加速度计连接到驾驶员座椅…

PCIe学习必读——《PCI Express System Architecture》

PCI Express (peripheral component interconnect express) 简称 PCIe&#xff0c;是一种高速串行计算机扩展总线标准。是一种全双工总线&#xff0c;使用高速串行传送方式&#xff0c;能够支持更高的频率&#xff0c;连接的设备不再像 PCI 总线那样共享总线带宽。PCIe目前发布…

完美滤波器

完美滤波器 如下图所示&#xff0c;第 j j j级为输入图像&#xff0c;其中第 j − 1 j-1 j−1级为第 j j j级的尺寸减半的存在&#xff0c;直至为 1 1 1\times 1 11 的大小&#xff0c;这样的模式被称为图像金字塔 设原图像像素点个数为 N 2 N^2 N2&#xff0c;则图像金字塔的…

IP地址规划的基本方法与最佳实践

IP地址规划是建立网络基础设施的关键步骤之一&#xff0c;它涉及到为网络中的设备分配唯一的IP地址&#xff0c;确保网络的高效性和可管理性。本文将介绍IP地址规划的基本方法和最佳实践&#xff0c;以帮助网络管理员设计并维护高效的IP地址方案。 1. 了解网络拓扑结构&#x…

Java的字符流和字节流

Java的字节流&#xff1a;把数据从程序存储到文件&#xff0c;把数据从文件读取程序中 File&#xff1a;只操作文件和文件属性&#xff0c;createNewFile getPath--->项目目录 ,getAbsolutePath() getName()---->substring() list()-->列出当前目录的所有文件或文…

vue+less+style-resources-loader 配置全局颜色变量

全局统一样式后&#xff0c;可配置vue.config.js实现全局颜色变量&#xff0c;方便在编写时使用统一风格的色彩 一、新建global.less 二、下载安装style-resources-loader npm i style-resources-loader --save-dev三、在vue.config.js中进行配置 module.exports {pluginOpt…

架构图是什么,该怎么制作?

架构图是指可视化展示软件、系统、应用程序、网络等各种体系结构的一类图表或图形&#xff0c;它能够形象地展示体系结构中各个组成部分和它们之间的关系。 架构图的类型 架构图的种类比较多&#xff0c;逐一列举不太合适&#xff0c;这里只列举一些常见的架构图类型&#…

爬虫-基于python的旅游景点推荐系统的设计与实现-计算机毕业设计源码24044

摘 要 21世纪时信息化的时代&#xff0c;几乎任何一个行业都离不开计算机&#xff0c;将计算机运用于旅游景点管理也是十分常见的。过去使用手工的管理方式对旅游景点进行管理&#xff0c;造成了管理繁琐、难以维护等问题&#xff0c;如今使用计算机对旅游景点的各项基本信息进…