SpringBoot 基于Redis的消息队列(基于发布订阅模型)

news2025/1/12 21:06:19

SpringBoot下Redis消息队列(基于发布订阅模型)

1. 什么是生产者/消费者模式?

消息队列一般是有两种场景

1、种是发布者订阅者模式
2、种是生产者消费者模式

生产者消费者模式 :生产者生产消息放到队列里,多个消费者同时监听队列,谁先抢到消息谁就会从队列中取走消息;即对于每个消息只能被最多一个消费者拥有。

发布者订阅者模式:发布者生产消息放到队列里,多个监听队列的消费者都会收到同一份消息;即正常情况下每个消费者收到的消息应该都是一样的。

2. 用Redis 实现发布订阅模型

此处演示用Redis实现的发布订阅模型

也可以用此模型广播消息时, 在消息接收方加入Redis分布式锁 实现单一消费者消费

2.1. 新建通道枚举类RedisMqChannelEnum

import io.swagger.annotations.ApiModel;
import lombok.AllArgsConstructor;
import lombok.Getter;
import lombok.NoArgsConstructor;

/**
 * @Author:
 * @Date:2023/7/3 16:17
 * @Des: MessageChannelEnum 消息队列通道枚举
 */
@Getter
@AllArgsConstructor
@NoArgsConstructor
@ApiModel("消息队列通道枚举类")
public enum RedisMqChannelEnum {

    WEBSOCKET_MSG_NOTIFY("channelId-websocket-notify", "websocket消息推送广播通道");

    private String channelId;

    private String channelString;


}

2.2. 消息队列通道注册类 RedisMqMessageListenerConfig



import com.sinotrans.gtp.enums.RedisMqChannelEnum;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.data.redis.connection.RedisConnectionFactory;
import org.springframework.data.redis.listener.PatternTopic;
import org.springframework.data.redis.listener.RedisMessageListenerContainer;

/**
 * @Author:
 * @Date:2023/7/3 16:11
 * @Des: RedisMessageListenerConfig 消息队列通道注册
 */
@Configuration
public class RedisMqMessageListenerConfig {

    @Bean
    public RedisMessageListenerContainer redisMessageListenerContainer(RedisConnectionFactory connectionFactory,
                                                                       RedisMqMessageReceiver redisMqMessageReceiver
    ) {
        RedisMessageListenerContainer container = new RedisMessageListenerContainer();
        container.setConnectionFactory(connectionFactory);

        container.addMessageListener(redisMqMessageReceiver, new PatternTopic(RedisMqChannelEnum.WEBSOCKET_MSG_NOTIFY.getChannelId())); // 订阅指定频道的消息


        // ...可监听多个通道
        return container;
    }
}

2.3. 消息发布类 RedisMqMessagePublisher



import lombok.extern.slf4j.Slf4j;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.stereotype.Service;

/**
 * @Author:
 * @Date:2023/7/3 16:10
 * @Des: MessagePublisher
 */
@Service
@Slf4j
public class RedisMqMessagePublisher {

    private final RedisTemplate<String, String> redisTemplate;

    public RedisMqMessagePublisher(RedisTemplate<String, String> redisTemplate) {
        this.redisTemplate = redisTemplate;
    }

    /**
     * 往消息队列发布消息
     *
     * @param channel 通道名称
     * @param message 消息
     */
    public void publishMessage(String channel, String message) {
        log.info("消息队列产生消息: channel:{}, message:{}", channel, message);
        redisTemplate.convertAndSend(channel, message);
    }
}

2.4. 消息订阅类RedisMqMessageReceiver



import com.sinotrans.gtp.entity.TransmissionReport;
import com.sinotrans.gtp.enums.RedisMqChannelEnum;
import com.sinotrans.gtp.service.CallService;
import com.sinotrans.gtp.utils.GsonUtils;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.data.redis.connection.Message;
import org.springframework.data.redis.connection.MessageListener;
import org.springframework.stereotype.Component;

/**
 * @Author:
 * @Date:2023/7/3 16:07
 * @Des: RedisMessageReceiver 消息接收
 */
@Component
@Slf4j
public class RedisMqMessageReceiver implements MessageListener {

    @Autowired
    private CallService callService;

    @Override
    public void onMessage(Message message, byte[] pattern) {
        // 处理接收到的消息
        String channel = new String(message.getChannel());
        String body = new String(message.getBody());
        log.info("消息队列消费消息: channel:{}, message:{}", channel, body);

        // 根据通道名称执行相应的逻辑
        if (RedisMqChannelEnum.WEBSOCKET_MSG_NOTIFY.getChannelId().equals(channel)) {
            log.info("转发至通道: {}", RedisMqChannelEnum.WEBSOCKET_MSG_NOTIFY.getChannelString());
            // 处理 websocket消息推送广播通道 的消息
            // callService.sendFrontClientMsg(GsonUtils.fromJson(body, TransmissionReport.class));
        }
        // ...

    }
}

2.5. 使用

@Autowired
private RedisMqMessagePublisher redisMqMessagePublisher;

// 调用消息队列模型进行发布消息
redisMqMessagePublisher.publishMessage(RedisMqChannelEnum.WEBSOCKET_MSG_NOTIFY.getChannelId(), transmissionReportJson);

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/720580.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

详解DNS协议!

前言 想要不同的计算机之间进行通信&#xff0c;是需要知道对方的IP的&#xff0c;可是为什么我们平时很少用到ip&#xff0c;而是用到域名这种东西呢&#xff1f; 其实主要是为了方便阅读&#xff0c;让我们记住一串的ip还不如记域名来的方便。 当我们访问域名的时候&#xf…

【MYSQL基础】基础知识了解

基础概念 数据库(DataBase&#xff0c;简称DB)&#xff0c;用于存储和管理大量数据的仓库。 数据库特点 持久化存储数据的。其实数据库就是一个文件系统方便存储和管理数据使用了统一的方式操作数据库-- SQL 数据库有哪些 MYSQL: 开源免费的数据库&#xff0c;小型的数据库…

初出茅庐的小李博客之链表知识1

链表知识1&#xff1a; 数组的特点&#xff1a; 空间连续&#xff0c;方便访问&#xff0c;只要知道首元素地址&#xff0c;就可以访问每个元素 数组的缺点&#xff1a; 需要提前分配固定大小的空间&#xff0c;一旦分配大小就不能改变 &#xff0c;空间分配小了不够用&…

NeurIPS 2022 | HUMUS-Net:用于加速MRI重建的混合展开多尺度网络结构

文章目录 NeurIPS 2022 | HUMUS-Net&#xff1a;用于加速MRI重建的混合展开多尺度网络结构摘要方法 NeurIPS 2022 | HUMUS-Net&#xff1a;用于加速MRI重建的混合展开多尺度网络结构 Code: https://github.com/z-fabian/HUMUS-Net 摘要 在加速MRI重建中&#xff0c;患者的解剖…

vscode解决本地浏览器运行项目时的跨域问题-Live server

提示&#xff1a;文章写完后&#xff0c;目录可以自动生成&#xff0c;如何生成可参考右边的帮助文档 文章目录 总结 最近在用face-api.js开发前端的实时人脸识别&#xff0c;加载已经训练好的tf模型&#xff0c;这一步需要加载模型json文件&#xff0c;但是本地测试的时候控制…

GAMES101笔记 Lecture 09 Shading3(Texture Mapping Cont.)

目录 Interpolation Across Triangles: Barycentric Coordinates(重心坐标)Interpolation Across Triangles(三角形内的插值)Why do we want to interpolate(我们为什么要在三角形内插值)What do we want to interpolate?(我们想插值得到什么&#xff1f;)How do we interpola…

vue echarts k线图 子功能设置

1 图中自定义选择区间, 手动鼠标拉取区间显示 2 底部数据选择条 dataZoom: [{type: inside,xAxisIndex: [0, 1],start: 98,end: 100},{show: true, // 这个是打开数据 选择条xAxisIndex: [0, 1],type: slider,top: 85%,start: 98,end: 100}], 3 鼠标在 k线图 选择区域 显示 的…

短视频抖音账号矩阵系统源码开发者自研(三)

一、站在开发者角度布局开发本套体系系统 开发技术新的方向。通过持续的技术创新和用户需求分析&#xff0c;我们将更加注重个性化开发本套短视频矩阵源码系统&#xff0c;目前市面上涵盖的基本功能有视频剪辑、绑定发布、智能回复、数据统计等一些基本的功能。此外我们在开发…

El-table 懒加载表格中新增、删除遇到的问题

前言 我是用的版本是&#xff1a; 官方给的例子中只是一个单纯的展示&#xff0c;但实际需求中可能会有一些其他需求&#xff0c;比如新增、修改。 然后遇到了各种问题&#xff0c;因此记录一下。 记录 :tree-props"{ children: children, hasChildren: hasChildren }…

NumPy怎样使用花式索引或布尔索引访问元素?

除了整数索引之外&#xff0c;NumPy中还提供了两个形式比较复杂的索引——花式索引和布尔索引&#xff0c;下面对这两种索引的基本用法进行详细的讲解。 1.花式索引 花式索引指以整数组成的数组或列表为索引。当使用花式索引访问一维数组时&#xff0c;程序会将花式索引对应的…

音视频基础 - YUV和RGB

1. 音视频名词概念 1.1 像素 像素是一张图片的基础单位&#xff0c;pixel&#xff0c;简称px 无数个像素组合在一起&#xff0c;就形成了一张图片。 1.2 分辨率 分辨率 垂直像素*水平像素&#xff0c;(理论上) 图像的分辨率越高&#xff0c;图像就越清晰。 比如下面左边这张…

【链表OJ】链表的回文结构

⭐️ 往期相关文章 &#x1f4ab;链接1&#xff1a;链表分割 &#x1f4ab;链接2&#xff1a;链表中倒数第k个结点(快慢指针问题) &#x1f4ab;链接3&#xff1a;leetcode 876.链表的中间结点(快慢指针问题) &#x1f4ab;链接4&#xff1a;leetcode 206.反转链表 &#x1f4…

游游画U(秒用c++ string函数)

看到这道题&#xff0c;第一反应是简单的模拟&#xff0c;上手就写&#xff0c;后来看大佬代码&#xff0c;还是我太蠢了 我的&#xff1a; #include <bits/stdc.h> using namespace std; typedef long long LL;int main() {int n;cin>>n;string s ""…

【Spark】RDD转换算子

目录 map mapPartitions mapPartitionsWithIndex flatMap glom groupBy shuffle filter sample distinct coalesce repartition sortBy ByKey intersection union subtract zip partitionBy reduceByKey groupByKey reduceByKey 和 groupByKey 的区别 a…

kafka的broker和replica和文件存储

zookeeper中存储的kafka信息 /brokers/ids&#xff0c;记录存在的服务器id/brokers/topics/test/partitions/0/state&#xff0c;记录leader和可用副本服务器/comsumers&#xff0c;0.9版本之前存储消费者的offset信息&#xff0c;但是会产生zookeeper和broker的跨节点通信/co…

SPSS读取数据文件(一)

1.读取Excel数据文件 &#xff08;1&#xff09;选择“文件”-“打开”-“数据”,在弹出的“打开数据”对话框下选择Excel文件&#xff0c;如图所示 &#xff08;2&#xff09;选择要打开的Excel文件&#xff0c;点击“打开”&#xff0c;如图所示 &#xff08;3&#xff09;可…

[论文总结]YOLO v1、YOLO v2、YOLO v3、YOLO v4、YOLOv5

背景 在这里我们主要介绍YOLO 系列的相关目标检测算法&#xff0c;从最开始的YOLO v1 一直到 YOLO v5。本文也借鉴了其他文档和原始论文。总结下来这五个方法的演进线路如下表格所示。 对比维度YOLO v1YOLO v2YOLO v3YOLO v4YOLO v5backboneVGGdarknet19darknet53darknet53da…

Lua学习笔记:浅谈对垃圾回收的理解

前言 本篇在讲什么 Lua的垃圾回收 本篇适合什么 适合初学Lua的小白 本篇需要什么 对Lua语法有简单认知 依赖Sublime Text编辑器 本篇的特色 具有全流程的图文教学 重实践&#xff0c;轻理论&#xff0c;快速上手 提供全流程的源码内容 ★提高阅读体验★ &#x1f…

Unity:sentinel key not found (h0007)

SSD换电脑&#xff0c;unity 编辑器无法打开&#xff1b; 具体步骤&#xff1a; 删除这个路径下的文件 C:\ProgramData\SafeNet 下 Sentinel LDK 打开Windows 的Cmd 命令行&#xff0c;输入编辑器版本下Unity.exe的路径&#xff0c; CD E:\Dev_Env\Unity\Hub\Editor\2020.3.3…

如何将 arduino-esp32 库作为 ESP-IDF 组件使用?

相关文档 arduino-esp32 SDKESP-IDF SDKESP-IDF 软件环境搭建说明Arduino 软件环境使用说明Arduino as an ESP-IDF component &#xff08;官方说明&#xff09; 环境准备 目前&#xff0c;最新 Master 版本的 arduino-esp32 SDK 要求使用 v4.4 版本的 ESP-IDF SDK 软件编译环…