【Redis实战】有MQ为啥不用?用Redis作消息队列!?Redis作消息队列使用方法及底层原理高级进阶

news2025/1/22 12:50:07

 🎉🎉欢迎光临🎉🎉

🏅我是苏泽,一位对技术充满热情的探索者和分享者。🚀🚀

🌟特别推荐给大家我的最新专栏《Redis实战与进阶》

本专栏纯属为爱发电永久免费!!!

这是苏泽的个人主页可以看到我其他的内容哦👇👇

努力的苏泽icon-default.png?t=N7T8http://suzee.blog.csdn.net/

最近工作室的一个业务跟另一个业务合并 自然要用到MQ(消息队列Message Queue)那么很显然 就要部署个RabbitMQ到服务器上了  

我们用的是云托管的的服务 那自然是部署中间件到云服务上去了 服务是一路开通 结果到了需要调试的时候 怎么也连不上 (说是内网直连,但关键是 同事们都在线下做本地测试的呀)

直接无语了 面对这一场景 怎么办?业务还要继续 等着交货的  于是我想起了之前学过的技术栈 

Redis 也能作为消息队列的(不过用的比较少所以不大容易记起来 或者也没啥人知道) 于是一顿卡卡操作  步骤还比MQ简单  下面就来看是如何实现的

正片

目录

最近工作室的一个业务跟另一个业务合并 自然要用到MQ(消息队列Message Queue)那么很显然 就要部署个RabbitMQ到服务器上了  

Redis 也能作为消息队列的(不过用的比较少所以不大容易记起来 或者也没啥人知道) 于是一顿卡卡操作  步骤还比MQ简单  下面就来看是如何实现的

正片

Redis作为消息队列的优缺点:

使用Redis作为消息队列的选择相对于使用专门的消息队列系统(如RabbitMQ、Kafka等)有以下优点和:

缺点也很明显:

应用场景:

Redis实现消息队列系统 实现步骤:

配置Redis:

设置RedisTemplate的序列化器。在消息队列中,你可以使用默认的序列化器,即StringRedisSerializer,它会将消息以字符串的形式进行存储和传输。可以通过以下代码设置默认的序列化器:

实现消息的发布和订阅功能。

实战与改良

代码解释

我把消息处理的系统中心化处理,也就是说是这个监听系统他可以监听reserved通道的所有业务类型,我这里列举了四种wait,agree,refuse,over四种 但如果是更大的业务体系 同一个通道可能面临着更多可能性分支  那如果按照第一套的方案 需要对每一个具体业务实现一个监听者 工作量就很大(可能这样耦合会低一些吧)

但是我这样把消息集中处理 然后短信发送系统就专门只做短信发送的事情 xx系统就只做对应的工作 就能把工作上的耦合度大大降低

那么大家应该也注意到我的两个负责序列化和反序列化的方法了吧 这是因为业务需求 要把对象封装成一个类 所以这里的方案就是在信息中心处理器上 自定义一个序列化方案(如果再做得好一点其实可以把这个序列器抽理出来封装为一个抽象方法 用泛型来定义返回结果和参数 这样就能序列化所有引用的类型了)  

遇到的问题:

对了 中途遇到了这样一个错误

原因与分析:

实际业务中的测试

发布服务

订阅服务(监听服务)

测试结果


Redis作为消息队列的优缺点:

使用Redis作为消息队列的选择相对于使用专门的消息队列系统(如RabbitMQ、Kafka等)有以下优点和:

  1. 简单轻量:Redis是一个内存中的数据存储系统,具有轻量级和简单的特点。相比较专门的消息队列系统,使用Redis作为消息队列不需要引入额外的组件和依赖,可以减少系统的复杂性。

  2. 速度快:由于Redis存储在内存中,它具有非常高的读写性能。这对于需要低延迟的应用程序非常有优势。

  3. 多种数据结构支持:Redis提供了丰富的数据结构,如列表、发布/订阅、有序集合等。这使得Redis在处理不同类型的消息和任务时更加灵活。

  4. 数据持久化:Redis可以通过将数据持久化到磁盘来提供数据的持久性。这意味着即使Redis重启,之前的消息也不会丢失。

  5. 广泛的应用场景:Redis不仅可以用作消息队列,还可以用作缓存、数据库、分布式锁等多种用途。如果你的应用程序已经使用了Redis,那么使用Redis作为消息队列可以减少技术栈的复杂性。

缺点也很明显:

  1. 缺少一些高级特性:相对于专门的消息队列系统,Redis在消息队列方面的功能可能相对简单。例如,它可能缺乏一些高级消息传递功能,如消息重试、消息路由、持久化消息等。

  2. 可靠性和一致性:Redis的主要设计目标是提供高性能和低延迟,而不是强一致性和高可靠性。在某些情况下,Redis可能会丢失消息,或者在出现故障时可能无法提供持久性保证。

应用场景:

适用于简单的中小型项目 如果功能简单,访问量并不大可以考虑
如果你的应用程序对可靠性和高级功能有严格要求,并且需要处理大量的消息和复杂的消息路由,那么使用专门的消息队列系统可能更合适。

Redis实现消息队列系统 实现步骤:

配置Redis:

  1. 首先,确保你已经正确地配置了Redis和Lettuce依赖项,并创建了LettuceConnectionFactory对象。

    <dependency>
                <groupId>org.springframework.boot</groupId>
                <artifactId>spring-boot-starter-data-redis</artifactId>
            </dependency>
      redis:
        host: 
        port: 6379
        password: 
        lettuce:
          pool:
            max-active: 1000
            max-idle: 1000
            min-idle: 0
            time-between-eviction-runs: 10s
            max-wait: 10000

  2. 创建一个RedisTemplate对象,并将LettuceConnectionFactory设置为其连接工厂:

 @Bean
    public RedisTemplate<String, String> redisTemplate(LettuceConnectionFactory connectionFactory) {
        RedisTemplate<String, String> template = new RedisTemplate<>();
        template.setConnectionFactory(connectionFactory);
        template.setDefaultSerializer(new StringRedisSerializer());
        return template;
    }

设置RedisTemplate的序列化器。在消息队列中,你可以使用默认的序列化器,即StringRedisSerializer,它会将消息以字符串的形式进行存储和传输。可以通过以下代码设置默认的序列化器:

redisTemplate.setDefaultSerializer(new StringRedisSerializer());

实现消息的发布和订阅功能。

  • 发布消息:
    redisTemplate.convertAndSend("channel_name", "message_payload");

    在上述代码中,"channel_name"是消息的通道名称,"message_payload"是要发布的消息内容。

  • 订阅消息:
  • 首先,创建一个MessageListener实现类来处理接收到的消息:

public class MessageListenerImpl implements MessageListener {
    @Override
    public void onMessage(Message message, byte[] pattern) {
        // 处理接收到的消息
        String channel = new String(message.getChannel());
        String payload = new String(message.getBody());
        // 执行自定义的逻辑
    }
}

创建一个LettuceMessageListenerAdapter对象,并提供MessageListener实现类:

LettuceMessageListenerAdapter listenerAdapter = new LettuceMessageListenerAdapter(new MessageListenerImpl());
listenerAdapter.afterPropertiesSet();

创建一个RedisMessageListenerContainer对象,并配置它的LettuceConnectionFactory和监听适配器:

RedisMessageListenerContainer listenerContainer = new RedisMessageListenerContainer();
listenerContainer.setConnectionFactory(lettuceConnectionFactory);
listenerContainer.addMessageListener(listenerAdapter, new ChannelTopic("通道名称"));
listenerContainer.start();

通过以上步骤,我们创建了一个LettuceConnectionFactory对象来与Redis服务器建立连接。然后,我们创建了一个MessageListener实现类来处理接收到的消息。接下来,我们创建了一个LettuceMessageListenerAdapter对象,并提供MessageListener实现类。最后,我们创建了一个RedisMessageListenerContainer对象,并配置它的LettuceConnectionFactory和监听适配器,然后启动容器以开始监听指定通道上的消息。

以上的方案 好处就是 可以很明显的知道监听者在哪个部分 监听对应通道的信息 然而 业务当中 如果每一个对应模块的业务和通道都建立一个监听者来进行监听(我们假设每一个就业务所要得到消息以后所执行的逻辑都不相同) 那这个工作量就会暴增 

于是就有了第二种写法 :

实战与改良

/***
 * @title MessageManager
 * @author SUZE
 * @Date 2-17
 **/
@Component
public class ReservedMessageManager {
    private String ListenerId;
    private String UserId;
    private String message;
    private final RedisTemplate<String, String> redisTemplate;

    @Autowired
    public ReservedMessageManager(RedisTemplate<String, String> redisTemplate) {
        this.redisTemplate = redisTemplate;
        subscribeToChannel("reserved");
    }
    @Resource
    private SmsServer smsServer;

    public void publishMessage(String channel, reserveMessage message) {
        String  Message=serialize(message);
        redisTemplate.convertAndSend("channel_name", "message_payload");
        redisTemplate.convertAndSend(channel, Message);
    }
    // 接收到消息时触发的事件
    private void handleReserveMessage(String channel, reserveMessage reserveMessage) {
        if (reserveMessage != null) {
            String userId = reserveMessage.getUserId();
            String ListenerId=reserveMessage.getListenerId();
            String message = reserveMessage.getMessage();
            //TODO 处理接收到的消息逻辑 这里后续要对Message进行一个检测他有wait agree和refused和over四种状态 思种状态就是不一样的发送内容
            switch (message){
                //TODO 消息要给两边都发 所以要发两份 发信息的文案
                case "wait":

                    smsServer.sendSms(userId,ListenerId,message);
                    break;
                case "agree":

                    smsServer.sendSms(userId,ListenerId,message);
                    break;
                case "refuse":

                    smsServer.sendSms(userId,ListenerId,message);
                    break;
                case "over":
                    //这里要操作文档系统了

                    //拒绝的话 那就要监听一下
                    smsServer.sendSms(userId,ListenerId,message);
                    break;

            }
            //smsServer.sendSms(userId,ListenerId,message);
            // 其他处理逻辑...
        }
    }

    public void subscribeToChannel(String channel) {
        redisTemplate.execute((RedisCallback<Object>) (connection) -> {
            connection.subscribe((message, pattern) -> {
                String channelName = new String(message.getChannel());
                byte[] body = message.getBody();
                // 解析接收到的消息
                switch (channelName){
                    case "reserved":
                        reserveMessage reserveMessage = deserializeMessage(new String(body));
                        handleReserveMessage(channelName, reserveMessage);
                        break;
                    //还有其他的通道 例如refuse就是一个 拒绝通道 专门监听拒绝的理由
                }
            }, channel.getBytes());
            return null;
        });
    }
    // 反序列化
    private reserveMessage deserializeMessage(String body) {
        ObjectMapper objectMapper = new ObjectMapper();
        try {
            return objectMapper.readValue(body, reserveMessage.class);
        } catch (IOException e) {
            // 处理反序列化异常
            e.printStackTrace();
            return null;
        }
    }

    // 序列化
    public String serialize(reserveMessage reserveMessage) throws SerializationException {
        if (reserveMessage == null) {
            return null;
        }
        try {
            ObjectMapper objectMapper = new ObjectMapper();
            return objectMapper.writeValueAsString(reserveMessage);
        } catch (JsonProcessingException e) {
            throw new SerializationException("Error serializing object", e);
        }
    }

}

代码解释

  1. subscribeToChannel方法接受一个channel参数,用于指定要订阅的通道名称。
  2. redisTemplate.execute方法用于执行Redis操作,并传入一个RedisCallback回调函数。
  3. 回调函数使用lambda表达式的形式实现,接受一个connection参数,表示与Redis的连接。
  4. 在回调函数中,调用connection.subscribe方法来订阅通道。该方法接受一个回调函数作为参数,用于处理接收到的消息。
  5. 在消息回调函数中,首先从message对象中获取通道名称和消息体。
  6. 使用new String(message.getChannel())将通道名称转换为字符串类型,并存储在channelName变量中。
  7. 使用message.getBody()获取消息体的字节数组表示,并存储在body变量中。
  8. switch语句中,根据通道名称进行不同的处理。在这个例子中,仅处理"reserved"通道。
  9. 对于"reserved"通道的处理,调用deserializeMessage方法将消息体反序列化为reserveMessage对象,并将其存储在名为reserveMessage的局部变量中。
  10. 调用handleReserveMessage方法,将通道名称和反序列化后的reserveMessage对象作为参数进行处理。
  11. handleReserveMessage方法用于处理接收到的保留消息的逻辑。它检查消息类型,并根据类型执行不同的操作。根据消息类型,它调用smsServer.sendSms方法向指定的userIdlistenerId发送短信。

我把消息处理的系统中心化处理,也就是说是这个监听系统他可以监听reserved通道的所有业务类型,我这里列举了四种wait,agree,refuse,over四种 但如果是更大的业务体系 同一个通道可能面临着更多可能性分支  那如果按照第一套的方案 需要对每一个具体业务实现一个监听者 工作量就很大(可能这样耦合会低一些吧)

但是我这样把消息集中处理 然后短信发送系统就专门只做短信发送的事情 xx系统就只做对应的工作 就能把工作上的耦合度大大降低

那么大家应该也注意到我的两个负责序列化和反序列化的方法了吧 这是因为业务需求 要把对象封装成一个类 所以这里的方案就是在信息中心处理器上 自定义一个序列化方案(如果再做得好一点其实可以把这个序列器抽理出来封装为一个抽象方法 用泛型来定义返回结果和参数 这样就能序列化所有引用的类型了)  

遇到的问题:


对了 中途遇到了这样一个错误

错误信息:com.fasterxml.jackson.databind.exc.InvalidDefinitionException: Cannot construct instance of `TopOne.MessageSystem.entity.reserveMessage` (no Creators, like default constructor, exist): cannot deserialize from Object value (no delegate- or property-based Creator)

原因与分析:

reserveMessage类缺少默认构造函数,这导致Jackson库无法构造该类的实例。错误消息中提到了以下内容:"Cannot construct instance of TopOne.MessageSystem.entity.reserveMessage (no Creators, like default constructor, exist)"。
为了使Jackson能够正确地反序列化对象,需要在reserveMessage类中添加一个默认构造函数。默认构造函数是一个无参数的构造函数,它不需要任何参数来创建对象。
在你的reserveMessage类中

这个是改好的封装类:
 

@Data
public class reserveMessage {
    private String UserId;
    private String ListenerId;
    private String message;


    public reserveMessage() {
        // 默认构造函数
    }
    public reserveMessage(String userId, String ListenerId,String message) {
        this.UserId = userId;
        this.ListenerId = ListenerId;
        this.message=message;
    }


}

实际业务中的测试

发布服务

订阅服务(监听服务)

测试结果

成功

这里面的打印是代替了原本业务中的短信发送 也算是成了

这一期就到这 感谢观看

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/1453159.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

报文鉴别、实体鉴别

目录 鉴别 1 报文鉴别 1.1 用数字签名进行鉴别&#xff08;原理&#xff09; 可保证机密性的数字签名 1.2 密码散列函数 MD5 算法 MD5 算法计算步骤 安全散列算法 SHA-1 1.3 用报文鉴别码实现报文鉴别 用报文鉴别码 MAC 鉴别报文 使用已签名的报文鉴别码 MAC 对报…

工程师日常:海丰县附城镇鹿境元宵开灯活动

海丰县附城镇鹿境元宵开灯活动 &#xff08;蔡惠进搜集整理&#xff09; 鹿境乡春节正月初十大老热&#xff0c;全县家喻户晓。为纪念先祖功德&#xff0c;在本乡车地建立蔡氏“济阳堂”大祖祠&#xff0c;并定年初十为开灯日&#xff0c;大祖开灯代代相传。凡移居外乡裔孙、“…

机器学习8-决策树

决策树&#xff08;Decision Tree&#xff09;是一种强大且灵活的机器学习算法&#xff0c;可用于分类和回归问题。它通过从数据中学习一系列规则来建立模型&#xff0c;这些规则对输入数据进行递归的分割&#xff0c;直到达到某个终止条件。 决策树的构建过程&#xff1a; 1.…

java 宠物医院系统Myeclipse开发mysql数据库web结构jsp编程计算机网页项目

一、源码特点 java 宠物医院系统是一套完善的java web信息管理系统&#xff0c;对理解JSP java编程开发语言有帮助&#xff0c;系统具有完整的源代码和数据库&#xff0c;系统主要采用B/S模式开发。开发环境为TOMCAT7.0,Myeclipse8.5开发&#xff0c;数据库为Mysql5.0&…

463. Island Perimeter(岛屿的周长)

问题描述 给定一个 row x col 的二维网格地图 grid &#xff0c;其中&#xff1a;grid[i][j] 1 表示陆地&#xff0c; grid[i][j] 0 表示水域。 网格中的格子 水平和垂直 方向相连&#xff08;对角线方向不相连&#xff09;。整个网格被水完全包围&#xff0c;但其中恰好有…

希捷与索尼集团合作生产HAMR写头激光二极管

最近有报道指出&#xff0c;希捷&#xff08;Seagate&#xff09;在生产其采用热辅助磁记录&#xff08;HAMR&#xff09;技术的大容量硬盘时&#xff0c;并非所有组件都在内部制造。根据日经新闻的一份新报告&#xff0c;希捷已与索尼集团合作&#xff0c;由索尼为其HAMR写头生…

qml中边界图片BorderImage的使用

1、基本概念 2、案例 原图&#xff0c;120*120像素 &#xff08;1&#xff09;水平和垂直方向上都设置为拉伸模式 import QtQuick 2.12 import QtQuick.Window 2.12 import QtQuick.Controls 2.12 import QtQuick.Layouts 1.12ApplicationWindow {id: windowvisible: truew…

【c语言】人生重开模拟器

前言&#xff1a; 人生重开模拟器是前段时间非常火的一个小游戏&#xff0c;接下来我们将一起学习使用c语言写一个简易版的人生重开模拟器。 网页版游戏&#xff1a; 人生重开模拟器 (ytecn.com) 1.实现一个简化版的人生重开模拟器 &#xff08;1&#xff09; 游戏开始的时…

【学习笔记】一文打通Docker!(项目部署orCTF)

Docker What is Docker? 利用Docker 可以快速安装应用&#xff0c;Docker会自动搜索并下载应用镜像(image)。镜像不仅包含应用本身&#xff0c;还包含应用运行所需要的环境&#xff0c;配置&#xff0c;系统函数库。 注意这个系统函数库&#xff0c;相当于在不同的操作版本…

反向迭代器------封装的力量

目录 一、list封装中模板参数Ref和Ptr的理解 二、反向迭代器的实现 一、list封装中模板参数Ref和Ptr的理解 对于反向迭代器&#xff0c;是我们在前面STL模拟实现中留下的一个问题。在之前的文章中&#xff0c;我们极大程度上的利用了模板&#xff0c;从而减少了许多的代码&…

【半监督图像分割 2023 】BHPC

【半监督图像分割 2023 】BHPC 论文题目&#xff1a;Semi-supervised medical image segmentation via hard positives oriented contrastive learning 中文题目&#xff1a;通过面向硬阳性的对比学习进行半监督医学图像分割 论文链接&#xff1a; 论文代码&#xff1a;https:/…

CCF编程能力等级认证GESP—C++6级—20231209

CCF编程能力等级认证GESP—C6级—20231209 单选题&#xff08;每题 2 分&#xff0c;共 30 分&#xff09;判断题&#xff08;每题 2 分&#xff0c;共 20 分&#xff09;编程题 (每题 25 分&#xff0c;共 50 分)闯关游戏工作沟通 答案及解析单选题判断题编程题1编程题2 单选题…

Midjourney绘图欣赏系列(五)

Midjourney介绍 Midjourney 是生成式人工智能的一个很好的例子&#xff0c;它根据文本提示创建图像。它与 Dall-E 和 Stable Diffusion 一起成为最流行的 AI 艺术创作工具之一。与竞争对手不同&#xff0c;Midjourney 是自筹资金且闭源的&#xff0c;因此确切了解其幕后内容尚不…

2024023期传足14场胜负前瞻

新的一年祝大家行大运、发大财、中大奖&#xff01;2024023期赛事由英超2场&#xff0c;德甲2场、意甲4场、西甲3场、法甲3场组成。售止时间为2月18日&#xff08;周六&#xff09;21点30分&#xff0c;敬请留意&#xff1a; 本期中深盘较少&#xff0c;1.5以下赔率仅1场&#…

二、ActiveMQ安装

ActiveMQ安装 一、相关环境二、安装Java8三、下载安装包四、启动五、其他命令六、开放端口七、后台管理 一、相关环境 环境&#xff1a;Centos7.9安装ActiveMQ版本&#xff1a;5.15.9JDK8 二、安装Java8 安装教程&#xff1a;https://qingsi.blog.csdn.net/article/details/…

RCS系统之:基础算法

设计仓库机器人的控制管理系统涉及到路径规划、任务分配、库存管理、通信系统等方面。以下是一个基本的仓库机器人控制管理系统方案的概述&#xff1a; 路径规划&#xff1a;设计一个路径规划系统&#xff0c;用于确定机器人在仓库内的最佳行驶路径&#xff0c;以最大程度地提…

学习数据结构和算法的第9天

题目讲解 移除元素 ​ 给你一个数组nums和一个值 val&#xff0c;你需要 原地 移除所有数值等于 val的元素&#xff0c;并返回移除后数组的新长度。 ​ 不要使用额外的数组空间&#xff0c;你必须仅使用0(1)额外空间并 原地 修改输入数组。 ​ 元素的顺序可以改变。你不需要…

使用Taro开发鸿蒙原生应用——快速上手,鸿蒙应用开发指南

导读 本指南为开发者提供了使用 Taro 框架开发鸿蒙原生应用的快速入门方法。Taro&#xff0c;作为一个多端统一开发框架&#xff0c;让开发者能够使用一套代码同时适配多个平台&#xff0c;包括鸿蒙系统。文章将详细介绍如何配置开发环境&#xff0c;以及如何利用 Taro 的特性…

leetcode hot100 拆分整数

在本题目中&#xff0c;我们需要拆分一个整数n&#xff0c;让其拆分的整数积最大。因为每拆分一次都和之前上一次拆分有关系&#xff0c;比如拆分6可以拆成2x4&#xff0c;还可以拆成2x2x2&#xff0c;那么我们可以采用动态规划来做。 首先确定dp数组的含义&#xff0c;这里dp…

linux学习进程控制【创建-终止-等待】

目录 1.进程创建 1.1fork函数 1.2写时拷贝 2.进程终止 2.1进程退出场景 2.2进程退出方式 3.进程等待 3.1进程等待的必要性 3.2等待方式 3.2.1wait&#xff08;&#xff09; 3.2.2waitpid&#xff08;&#xff09; 3.3轮训等待 总结&#xff1a; 1.进程创建 …