RabbitMQ(Direct 订阅模型-路由模式)的使用

news2025/1/18 21:07:08

RabbitMQ(Direct 订阅模型-路由模式)

一,Direct 订阅模型-路由模式介绍(Routing)

​ 订阅模型-路由模式,此时生产者发送消息时需要指定 RoutingKey,即路由 Key,Exchange 接收到消息时转发到与 RoutingKey 相匹配的队列中。

​ direct的意思是直接的,direct类型的Exchange会将消息转发到指定Routing key的Queue上,Routing key的解析规则为精确匹配。也就是只有当producer发送的消息的Routing key与某个Binding key相等时,消息才会被分发到对应的Queue上。
路由模式:
在这里插入图片描述

1、每个消费者监听自己的队列,并且设置routingkey;

2、生产者将消息发给交换机,由交换机根据routingkey来转发消息到指定的队列;

应用场景:用户通知,当用户充值成功或转账完成系统通知用户,通知方式有短信、邮件多种方法,

二,使用

1.添加依赖

<!--引入rabbitMQ的依赖-->
<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-amqp</artifactId>
</dependency>

2.修改配置文件

  rabbitmq:
    host: 192.168.193.131
    publisher-confirm-type: correlated  #开启异步消息确认
    publisher-returns: true #开启publisher-return的机制
    template:
      mandatory: true  #消息路由失败,调用ReturnCallback
    listener:
      simple:  #消息监听器的一种简单配置方式
        retry:
          enabled: true # 开启消费者失败重试
          initial-interval: 1000 # 初识的失败等待时长为1秒
          multiplier: 1 # 失败的等待时长倍数,下次等待时长 = multiplier * last-interval
          max-attempts: 3 # 最大重试次数
          stateless: true # true无状态;false有状态。如果业务中包含事务,这里改为false
        acknowledge-mode: manual
      direct: #特定的消息监听器配置类型或者是指定一个直连型的交换机),用于消息的路由和分发
        retry:
          enabled: true # 开启消费者失败重试
          initial-interval: 1000 # 初识的失败等待时长为1秒
          multiplier: 1 # 失败的等待时长倍数,下次等待时长 = multiplier * last-interval
          max-attempts: 3 # 最大重试次数
          stateless: true # true无状态;false有状态。如果业务中包含事务,这里改为false
        acknowledge-mode: manual #手动确认接收到的消息

3.创建配置类

import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.DirectExchange;
import org.springframework.amqp.core.Queue;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

/**
 * @Description:rabbitMQ配置类 会在应用启动时被加载。
 * @bean  :用于指示一个方法将会产生一个 bean 对象,并添加到 Spring 容器中。
 */
@Configuration
public class MyRabbitMQConfig {


    /**
     * @Description:创建一个名为 "loginQueue" 的 RabbitMQ 队列。
     * 参数 true 表示这个队列是持久化的,这意味着在 RabbitMQ 服务器重启后,这个队列仍然会存在,以保证消息不丢失
     */
    @Bean
    public Queue loginQueue(){
        /*第一个参数为队列的名称 第二个参数为true 表示队列持久化 保障消息不丢失*/
        return new Queue("loginQueue",true);
    }

    /**
     * @Description:方法用于创建一个名为 "directExchange" 的 Direct Exchange(直连交换机)。
     * Direct Exchange 是 RabbitMQ 中一种简单的交换机类型,它会根据消息的 routing key 将消息路由到相应的队列
     */
    @Bean
    public DirectExchange directExchange(){
        return new DirectExchange("directExchange");
    }


    /**
     * @Description:创建一个绑定,将之前创建的 "loginQueue" 队列绑定到 "directExchange" 交换机上,并且指定 routing key 为 "login"     
     */
    @Bean
    public Binding loginQueueBinding(){
        return BindingBuilder.bind(loginQueue()).to(directExchange()).with("login");
    }

}

4.注入RabbitMQ模版引擎

//RabbitTemplate是Spring AMQP项目中的一个关键组件,用于在应用程序和RabbitMQ消息代理之间发送和接收消息。
@Autowired
private RabbitTemplate rabbitTemplate;

5.消息的发送

//记录登陆日志 记录到RabbitMQ
//定义消息的唯一ID 防止消息重复消费
String msgId = "msg-" + UUID.randomUUID().toString();
//定义消息内容
String msgBody = JSON.toJSONString(tbUser);

//将消息唯一表示存入redis缓存  防止消息重复消费
stringRedisTemplate.opsForValue().set(msgId, msgBody);

/*组装消息体 发送消息队列*/
MessageVO messageVO = new MessageVO();
messageVO.setMsgID(msgId);
messageVO.setMsgBody(msgBody);

/*convertAndSend: 这是RabbitTemplate提供的一个方法,用于将消息转换并发送到RabbitMQ队列或交换机。
        "directExchange": 这是消息要发送到的目标交换机的名称。在RabbitMQ中,交换机是消息的分发中心,它根据特定的路由规则将消息发送到一个或多个队列。
        "login": 这是消息的路由键  routing key 。在直接交换机模式下,消息的路由键与队列的绑定键一致时,消息会被发送到该队列。
        JSON.toJSONString(messageVO): 这是要发送的消息内容。在这里,messageVO对象被转换成JSON格式的字符串,然后作为消息发送到RabbitMQ。
        这行代码的作用是将一个消息发送到名为directExchange的交换机,并且路由键为login,消息内容为messageVO对象的JSON字符串表示。*/
rabbitTemplate.convertAndSend("directExchange", "login", JSON.toJSONString(messageVO));

6.消息的接收(监听)

/**
     * @param (org.springframework.amqp.core.Message)message :RabbitMQ 消息的对象,包含消息体和消息属性等信息。
     * @param(com.rabbitmq.client.Channel;)channel:RabbitMQ 的通道,用于手动确认消息消费。
     * @return void
     * @date 2024/5/24 19:59
     * @Description: TODO
     * @RabbitListener 注解:主要用于消费名为 "loginQueue" 的队列中的消息。
     *
     * @RabbitListener(queues = "loginQueue"):这个注解用于声明一个 RabbitMQ 消息监听器,
     * 它会监听名为 "loginQueue" 的队列,一旦有消息到达这个队列,就会触发 recvLogMessage 方法进行消费。
     */
    @RabbitListener(queues = "loginQueue")
    public void recvLogMessage(Message message, Channel channel) {
        try {
            //-- 接收消息  获取消息体,然后使用 JSON 解析成 MessageVo 对象
            String s = new String(message.getBody());
            MessageVO messageVO = JSON.parseObject(s, MessageVO.class);
            System.out.println("RabbitMQ 收到消息:" + messageVO.getMsgID());

            //-- 判断消息重复消费
            String msgId = messageVO.getMsgID();
            if (!stringRedisTemplate.hasKey(msgId)) {
                // 消息已经消费过了,删除消息
                System.out.println("RabbitMQ 消息重复了:" + messageVO.getMsgID());
                // channel: 这是一个通道对象,用于与消息队列服务进行通信。通常在消息队列框架中,你需要首先建立一个通道(channel),然后通过这个通道发送、接收消息等操作。
                // basicAck: 这是一个确认消息的方法。在消息队列中,消费者(consumer)收到消息后,需要向消息队列服务确认(acknowledge)已经成功处理了这条消息,以便消息队列服务可以将这条消息标记为已处理。basicAck就是用来进行这个确认操作的方法。
                // message.getMessageProperties().getDeliveryTag(): 这部分代码用于获取消息的交付标签(delivery tag)。消息队列服务在向消费者传递消息时,会给每条消息分配一个唯一的标识符,即交付标签。消费者收到消息后,可以通过这个标签来确认消息。
                // true: 这个参数表示是否批量确认。如果设置为true,表示确认当前标签之前的所有未确认消息;如果设置为false,则仅确认当前标签所代表的消息。
                // 综合起来,这行代码的作用是:使用给定的通道对象(channel)对收到的消息进行确认,确认方式是通过交付标签(delivery tag)来指定。
                channel.basicAck(message.getMessageProperties().getDeliveryTag(), true);
                return;
            }                                                
                               
            //-- 消费消息
            TbUser tbUser = JSON.parseObject(messageVO.getMsgBody(), TbUser.class);
            TbLog tbLog = new TbLog();
            tbLog.setCreateTime(new Date());
            tbLog.setLogContent(messageVO.getMsgBody());

            tbLogMapper.insert(tbLog);

            //-- 删除消息
            channel.basicAck(message.getMessageProperties().getDeliveryTag(), true);

            //-- 删除消息的唯一ID,防止重复消费
            stringRedisTemplate.delete(msgId);
            System.out.println("RabbitMQ 登录日志消息,消费成功了~~~~~~~~~~~~~~~~~~~~~~");
        } catch (IOException e) {
            System.out.println("登录日志接收失败了~~~~~~~~~~~~~~~~~~~~~");
            e.printStackTrace();
        }
    }

7.设置回调函数 和 确认回调

/**
     * @param
     * @return void
     * @date 2024/5/24 20:03
     * @Description: TODO 这是一个自定义方法名,用于初始化RabbitMQ相关设置。
     * @PostConstruct: 这是一个Spring注解,用于标记在bean初始化之后立即执行的方法。
     */
    @PostConstruct
    public void initRabbitMQ() {
        System.out.println("设置RabbitMQ消息回调,确保消息不丢失");
        //-- return 回调 : 消息发送失败了,会被调用
        /*当消息发送失败时,RabbitMQ会调用此回调函数。回调函数会重新发送消息*/
        rabbitTemplate.setReturnCallback(
                new RabbitTemplate.ReturnCallback() {
                    /*returnedMessage 方法中,包含了以下四个参数:
					  Message message:表示发送失败的消息对象,包含消息的内容和元数据。
                      int replyCode:表示返回的错误码,用于指示消息发送失败的原因。
                      String replyText:表示返回的错误信息,用于指示消息发送失败的具体描述。
                      String exchange 和 String routingKey:分别表示发送消息时使用的交换机和路由键,用于指示消息发送的目的地。*/
                    @Override
                    public void returnedMessage(Message message, int replyCode, String replyText, String exchange, String routingKey) {
                        // 消息发送失败了,再次发送
                        System.out.println("RabbitMQ 消息发送失败,再次发送");
                        rabbitTemplate.convertAndSend(exchange, routingKey, message);
                    }
                }
        );

        //-- confirm 回调 : 消息发送无论成功还是失败,都会被回调
        //当消息发送成功或失败时,RabbitMQ都会调用此回调函数。回调函数会根据ack参数判断消息是否成功发送。
        rabbitTemplate.setConfirmCallback(
                new RabbitTemplate.ConfirmCallback() {
                    /*CorrelationData correlationData:这个参数是用来关联生产者发送的消息和消息发送确认结果的数据对象。在发送消息时,可以通过 CorrelationData 设置一个唯一的标识符来关联消息,在确认时可以通过这个标识符来确定是哪条消息的确认结果。
                   boolean ack:这个参数表示消息是否成功发送到 RabbitMQ 服务器并得到了确认。当 ack 为 true 时,表示消息发送成功;当 ack 为 false 时,表示消息发送失败。
                   String cause:这个参数用来描述消息发送失败的原因。当 ack 为 false 时,cause 参数会包含失败的具体原因,可以用来进行日志记录或者错误处理。*/
                    @Override
                    public void confirm(CorrelationData correlationData, boolean ack, String cause) {
                        if (ack) {
                            System.out.println("RabbitMQ 消息发送成功了");
                        } else {
                            System.out.println("RabbitMQ 消息发送失败了");
                        }
                    }
                }
        );
    }

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/1719865.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

2024.5.30学习记录

1 面经复习 LRU 手写 等 2 代码随想录二刷 3 rosebush完成 upload组件 初步完成 form组件

如何设置手机的DNS

DNS 服务器 IP 地址 苹果 华为 小米 OPPO VIVO DNS 服务器 IP 地址 中国大陆部分地区会被运营商屏蔽网络导致无法访问&#xff0c;可修改手机DNS解决。 推荐 阿里的DNS (223.5.5.5&#xff09;或 114 (114.114.114.114和114.114.115.115) 更多公开DNS参考&#xff1a; 苹果…

鸿蒙开发接口媒体:【@ohos.multimedia.media (媒体服务)】

媒体服务 说明&#xff1a; 本模块首批接口从API version 6开始支持。后续版本的新增接口&#xff0c;采用上角标单独标记接口的起始版本。 开发前请熟悉鸿蒙开发指导文档&#xff1a; gitee.com/li-shizhen-skin/harmony-os/blob/master/README.md点击或者复制转到。 媒体子系…

dubbo复习:(11)使用grpc客户端访问tripple协议的dubbo 服务器

一、服务器端依赖&#xff1a; <?xml version"1.0" encoding"UTF-8"?> <project xmlns"http://maven.apache.org/POM/4.0.0"xmlns:xsi"http://www.w3.org/2001/XMLSchema-instance"xsi:schemaLocation"http://maven.…

尝试用智谱机器人+知识库,制作pytorch测试用例生成器

尝试用智谱机器人知识库,制作pytorch测试用例生成器 1 保存pytorch算子文档到txt2 创建知识库3 创建聊天机器人4 测试效果5 分享 背景:是否能将API的接口文档和sample放到RAG知识库,让LLM编写API相关的程序呢 小结:当前的实验效果并不理想,可以生成代码,但几乎都存在BUG 1 保存…

Window系统安装Docker

因为docker只适合在liunx系统上运行&#xff0c;如果在window上安装的话&#xff0c;就需要开启window的虚拟化&#xff0c;打开控制面板&#xff0c;点击程序&#xff0c;在程序和功能中可以看到启动和关闭window功能&#xff0c;点开后&#xff0c;找到Hyper-V&#xff0c;Wi…

DevExpress开发WPF应用实现对话框总结

说明&#xff1a; 完整代码Github​&#xff08;https://github.com/VinciYan/DXMessageBoxDemos.git&#xff09;DevExpree v23.2.4&#xff08;链接&#xff1a;https://pan.baidu.com/s/1eGWwCKAr8lJ_PBWZ_R6SkQ?pwd9jwc 提取码&#xff1a;9jwc&#xff09;使用Visual St…

FFmpeg 中 Filters 使用文档介绍

描述 这份文档描述了由libavfilter库提供的过滤器Filters、源sources和接收器sinks。 滤镜介绍 FFmpeg通过libavfilter库启用过滤功能。在libavfilter中,一个过滤器可以有多个输入和多个输出。为了说明可能的类型,我们考虑以下过滤器图: 这个过滤器图将输入流分成两个流,然…

微信小程序-wx.showToast超长文字展示不全

wx.showToast超长文字展示不全 问题解决方法1 问题 根据官方文档&#xff0c;iconnone&#xff0c;最多显示两行文字。所以如果提示信息较多&#xff0c;超过两行&#xff0c;就需要用其他方式解决。 解决方法1 使用vant组件里面的tost 根据官方例子使用&#xff1a; 1、在…

实用软件分享---- i茅台 在windows上自动预约和自动获取小茅运的软件

专栏介绍:本专栏主要分享一些实用的软件(Po Jie版); 声明1:软件不保证时效性;只能保证在写本文时,该软件是可用的;不保证后续时间该软件能一直正常运行;不保证没有bug;如果软件不可用了,我知道后会第一时间在题目上注明(已失效)。介意者请勿订阅。 声明2:本专栏的…

设计模式22——备忘录模式

写文章的初心主要是用来帮助自己快速的回忆这个模式该怎么用&#xff0c;主要是下面的UML图可以起到大作用&#xff0c;在你学习过一遍以后可能会遗忘&#xff0c;忘记了不要紧&#xff0c;只要看一眼UML图就能想起来了。同时也请大家多多指教。 备忘录模式&#xff08;Mement…

Unity 资源 之 100+风格化武器包-幻想RPG资源分享

Unity 资源 之 100风格化武器包-幻想RPG资源分享 前言资源包内容领取兑换码 前言 在游戏开发的广阔天地中&#xff0c;Unity 一直是备受青睐的强大引擎。而今天&#xff0c;我们要着重为大家介绍一个令人瞩目的 Unity 资源——100风格化武器包-幻想 RPG。 这个武器包就像是一…

移植其他命令行Vivado IDE的工具

移植其他命令行Vivado IDE的工具 介绍 本章介绍如何迁移各种AMD命令行工具以在AMD中使用 Vivado™集成设计环境&#xff08;IDE&#xff09;。 迁移ISE Partgen命令行工具 ISE™Design Suite Partgen工具可获得&#xff1a; •系统上安装的所有设备的信息 •详细的包装信息 您可…

2.5Bump Mapping 凹凸映射

一、Bump Mapping 介绍 我们想要在屏幕上绘制物体的细节&#xff0c;从尺度上讲&#xff0c;一个物体的细节分为&#xff1a;宏观、中观、微观宏观尺度中其特征会覆盖多个像素&#xff0c;中观尺度只覆盖几个像素&#xff0c;微观尺度的特征就会小于一个像素宏观尺度是由顶点或…

谷歌上架,可以用云手机来完成开发者个人号“20+14”封测?

根据谷歌的政策要求&#xff0c;自2023年11月13日之后创建的个人开发者账号&#xff0c;其应用必须满足20人连续14天封闭测试的要求&#xff0c;才能在Google Play 中上架正式版应用。 20人连续测试14天&#xff0c;这对大多数开发者&#xff0c;尤其是那些采用矩阵方式上架的开…

免费生物蛋白质的类chatgpt工具助手copilot:小分子、蛋白的折叠、对接

参考: https://310.ai/copilot 可以通过自然语言通话晚上蛋白质的相关处理:生成序列、折叠等 应该是agent技术调用不同工具实现 从UniProt数据库中搜索和加载蛋白质。使用ESM Fold方法折叠蛋白质。使用310.ai基础模型设计新蛋白质。使用TM-Align方法比较蛋白质。利用Protei…

neutron学习小结

概述 基于yoga版本学习neutron&#xff0c;通过源码、官方文档、部署环境进行学习 neutron-dhcp-agent neutron.agent.dhcp_agent.main 创建server&#xff0c;调oslo_service launch server&#xff0c;最后实际调了server的start方法 neutron.service.Service.start Serv…

编程入门(七)【虚拟机VMware安装Linux系统Ubuntu】

读者大大们好呀&#xff01;&#xff01;!☀️☀️☀️ &#x1f525; 欢迎来到我的博客 &#x1f440;期待大大的关注哦❗️❗️❗️ &#x1f680;欢迎收看我的主页文章➡️寻至善的主页 文章目录 &#x1f525;前言&#x1f680;Ubuntu知多少&#x1f680;安装的前期准备&am…

STM32定时器与PWM对LED灯的控制

文章目录 一、定时器——Timer&#xff08;一&#xff09;概念&#xff08;二&#xff09;分类&#xff08;三&#xff09;功能&#xff08;四&#xff09;结构1.模块一——时基单元2.模块二——输出比较模块 二、实验内容&#xff08;一&#xff09;标准库点亮LED灯1.实验说明…

冯喜运:5.31晚间黄金原油行情分析及尾盘操作策略

【黄金消息面分析】&#xff1a;周五&#xff08;5月31日&#xff09;&#xff0c;最新发布的数据显示&#xff0c;美国4月核心PCE物价指数月率录得0.2%&#xff0c;低于预期(0.3%)&#xff0c;经济学家认为&#xff0c;核心指数比整体指数更能反映通胀。除此之外&#xff0c;美…