Spring Boot整合RabbitMQ之路由模式(Direct)

news2025/1/20 19:27:48

RabbitMQ中的路由模式(Direct模式)应该是在实际工作中运用的比较多的一种模式了,这个模式和发布与订阅模式的区别在于路由模式需要有一个routingKey,在配置上,交换机类型需要注入DirectExchange类型的交换机bean对象。在交换机和队列的绑定过程中,绑定关系需要在绑定一个路由key。由于在实际的工作中不大可能会用自动确认的模式,所以我们在整合路由模式的过程中,依然采用发送消息双确认机制和消费端手动确认的机制来保证消息的准确送达与消息防丢失。

1. 添加配置

在配置文件中,配置rabbitmq的相关账号信息,开启消息发送回调机制,配置文件其实和发布订阅模式是一样的。配置详情如下:

server:
  port: 10001

spring:
  application:
    name: springboot-rabbitmq-s1
  rabbitmq:
    host: 127.0.0.1
    port: 5672
    virtual-host: /
    username: admin
    password: admin
    # 发送者开启 return 确认机制
    publisher-returns: true
    # 发送者开启 confirm 确认机制
    publisher-confirm-type: correlated

2. 创建配置类

    创建配置类RabbitMQConfig,用于声明交换机、队列,建立队列和交换机的绑定关系,注入RabbitTemplate的bean对象。配置类详情如下:
package com.study.rabbitmq.config;

import cn.hutool.json.JSONUtil;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.*;
import org.springframework.amqp.rabbit.connection.ConnectionFactory;
import org.springframework.amqp.rabbit.connection.CorrelationData;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

/**
 * @Author alen
 * @DATE 2022/6/7 23:50
 */
@Slf4j
@Configuration
public class RabbitMQConfig {

    public static final String EXCHANGE_NAME = "direct-order-exchange";
    public static final String SMS_QUEUE = "sms-direct-queue";
    public static final String EMAIL_QUEUE = "email-direct-queue";
    public static final String WECHAT_QUEUE = "wechat-direct-queue";

    /**
     * 1.
     * 声明交换机
     * @return
     */
    @Bean
    public DirectExchange directExchange() {
        /**
         * directExchange的参数说明:
         * 1. 交换机名称
         * 2. 是否持久化 true:持久化,交换机一直保留 false:不持久化,用完就删除
         * 3. 是否自动删除 false:不自动删除 true:自动删除
         */
        return new DirectExchange(EXCHANGE_NAME, true, false);
    }

    /**
     * 2.
     * 声明队列
     * @return
     */
    @Bean
    public Queue smsQueue() {
        /**
         * Queue构造函数参数说明
         * 1. 队列名
         * 2. 是否持久化 true:持久化 false:不持久化
         */
        return new Queue(SMS_QUEUE, true);
    }

    @Bean
    public Queue emailQueue() {
        return new Queue(EMAIL_QUEUE, true);
    }

    @Bean
    public Queue wechatQueue() {
        return new Queue(WECHAT_QUEUE, true);
    }

    /**
     * 3.
     * 队列与交换机绑定
     */
    @Bean
    public Binding smsBinding() {
        return BindingBuilder.bind(smsQueue()).to(directExchange()).with("sms");
    }

    @Bean
    public Binding emailBinding() {
        return BindingBuilder.bind(emailQueue()).to(directExchange()).with("email");
    }

    @Bean
    public Binding wechatBinding() {
        return BindingBuilder.bind(wechatQueue()).to(directExchange()).with("wechat");
    }

    /**
     * 将自定义的RabbitTemplate对象注入bean容器
     *
     * @param connectionFactory
     * @return
     */
    @Bean
    public RabbitTemplate createRabbitTemplate(ConnectionFactory connectionFactory) {
        RabbitTemplate rabbitTemplate = new RabbitTemplate();
        rabbitTemplate.setConnectionFactory(connectionFactory);
        //设置开启消息推送结果回调
        rabbitTemplate.setMandatory(true);
        //设置ConfirmCallback回调
        rabbitTemplate.setConfirmCallback(new RabbitTemplate.ConfirmCallback() {
            @Override
            public void confirm(CorrelationData correlationData, boolean ack, String cause) {
                log.info("==============ConfirmCallback start ===============");
                log.info("回调数据:{}", correlationData);
                log.info("确认结果:{}", ack);
                log.info("返回原因:{}", cause);
                log.info("==============ConfirmCallback end =================");
            }
        });
        //设置ReturnCallback回调
        rabbitTemplate.setReturnCallback(new RabbitTemplate.ReturnCallback() {
            @Override
            public void returnedMessage(Message message, int replyCode, String replyText, String exchange, String routingKey) {
                log.info("==============ReturnCallback start ===============");
                log.info("发送消息:{}", JSONUtil.toJsonStr(message));
                log.info("结果状态码:{}", replyCode);
                log.info("结果状态信息:{}", replyText);
                log.info("交换机:{}", exchange);
                log.info("路由key:{}", routingKey);
                log.info("==============ReturnCallback end =================");
            }
        });
        return rabbitTemplate;
    }
}

3. 消费者配置

    在消费者项目的配置文件中开启手动确认,配置详情如下:
server:
  port: 10002

spring:
  application:
    name: springboot-rabbitmq-s2
  rabbitmq:
    host: 127.0.0.1
    port: 5672
    virtual-host: /
    username: admin
    password: admin
    listener:
      simple:
        # 表示消费者消费成功消息以后需要手工的进行签收(ack确认),默认为 auto
        acknowledge-mode: manual

4. 创建消费者

分别创建三个消费者,DirectEmailConsumer、DirectSmsConsumer、DirectWechatConsumer来监听对应的队列,有消息后进行消费,三个消费者大同小异,分别如下

4.1 DirectEmailConsumer

package com.study.rabbitmq.service.direct;

import com.rabbitmq.client.Channel;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Service;

import java.io.IOException;

/**
 * @Author alen
 * @DATE 2022/6/10 22:54
 */
@Slf4j
@Service
@RabbitListener(queues = {"email-direct-queue"}) //监听队列
public class DirectEmailConsumer {

    //标记消费者逻辑执行方法
    @RabbitHandler
    public void emailMessage(String msg, Channel channel, Message message) throws IOException {

        try {
            log.info("Email direct --接收到消息:{}", msg);

            channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
        } catch (Exception e) {
            if (message.getMessageProperties().getRedelivered()) {
                log.error("消息已重复处理失败,拒绝再次接收...");
                //basicReject: 拒绝消息,与basicNack区别在于不能进行批量操作,其他用法很相似 false表示消息不再重新进入队列
                channel.basicReject(message.getMessageProperties().getDeliveryTag(), false); // 拒绝消息
            } else {
                log.error("消息即将再次返回队列处理...");
                // basicNack:表示失败确认,一般在消费消息业务异常时用到此方法,可以将消息重新投递入队列
                channel.basicNack(message.getMessageProperties().getDeliveryTag(), false, true);
            }
        }
    }
}

4.2 DirectSmsConsumer

package com.study.rabbitmq.service.direct;

import com.rabbitmq.client.Channel;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Service;

import java.io.IOException;

/**
 * @Author alen
 * @DATE 2022/6/10 22:55
 */
@Slf4j
@Service
@RabbitListener(queues = {"sms-direct-queue"}) //监听队列
public class DirectSmsConsumer {

    @RabbitHandler
    public void smsMessage(String msg, Channel channel, Message message) throws IOException {

        try {
            log.info("sms direct --接收到消息:{}", msg);

            channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
        } catch (Exception e) {
            if (message.getMessageProperties().getRedelivered()) {
                log.error("消息已重复处理失败,拒绝再次接收...");
                //basicReject: 拒绝消息,与basicNack区别在于不能进行批量操作,其他用法很相似 false表示消息不再重新进入队列
                channel.basicReject(message.getMessageProperties().getDeliveryTag(), false); // 拒绝消息
            } else {
                log.error("消息即将再次返回队列处理...");
                // basicNack:表示失败确认,一般在消费消息业务异常时用到此方法,可以将消息重新投递入队列
                channel.basicNack(message.getMessageProperties().getDeliveryTag(), false, true);
            }
        }
    }
}

4.3 DirectWechatConsumer

package com.study.rabbitmq.service.direct;

import com.rabbitmq.client.Channel;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Service;

import java.io.IOException;

/**
 * @Author chaoxian.wu
 * @DATE 2022/6/10 22:55
 */
@Slf4j
@Service
@RabbitListener(queues = {"wechat-direct-queue"}) //监听队列
public class DirectWechatConsumer {

    @RabbitHandler
    public void wechatlMessage(String msg, Channel channel, Message message) throws IOException {

        try {
            log.info("wechat direct --接收到消息:{}", msg);

            channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
        } catch (Exception e) {
            if (message.getMessageProperties().getRedelivered()) {
                log.error("消息已重复处理失败,拒绝再次接收...");
                //basicReject: 拒绝消息,与basicNack区别在于不能进行批量操作,其他用法很相似 false表示消息不再重新进入队列
                channel.basicReject(message.getMessageProperties().getDeliveryTag(), false); // 拒绝消息
            } else {
                log.error("消息即将再次返回队列处理...");
                // basicNack:表示失败确认,一般在消费消息业务异常时用到此方法,可以将消息重新投递入队列
                channel.basicNack(message.getMessageProperties().getDeliveryTag(), false, true);
            }
        }
    }
}

以上就是全部的代码部分,接下来我们在进入测试,看看实际效果如何,先发布一个routingKey=sms的消息,查看是不是只有对应的一个队列中接收到消息,消息发送详情:

package com.study.rabbitmq;

import com.study.rabbitmq.entity.Order;
import com.study.rabbitmq.service.OrderService;
import org.junit.jupiter.api.Test;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;

import java.util.UUID;

@SpringBootTest
class SpringbootRabbitmqS1ApplicationTests {

    @Autowired
    private OrderService orderService;

    @Test
    void contextLoads() {
        for (long i = 1; i < 2; i++) {
            //交换机名称
            String exchangeName = "direct-order-exchange";
            //路由key
            String routingKey = "sms";
            Order order = buildOrder(i);
            orderService.createOrder(order, routingKey, exchangeName);
        }
    }

    private Order buildOrder(long id) {
        Order order = new Order();
        order.setRequestId(id);
        order.setUserId(id);
        order.setOrderNo(UUID.randomUUID().toString());
        order.setAmount(10L);
        order.setGoodsNum(1);
        order.setTotalAmount(10L);
        return order;
    }
}

我们登录rabbitmq管理后台查看下,只有sms-direct-queue这个队列有一条消息,效果如下:

我们启动消费者,看下是不是只有监听了sms-direct-queue这个队列的消费者有消费日志,效果如下:

再发一条routingKey=email的消息,消费的日志,效果图示如下

到此其实已经springboot整合rabbitmq的路由模式结束了,这种模式在工作中还是比较常见的,我们演示的是单点的效果,实际工作中,不大可能会使用服务单点部署,现在都讲究服务的高可用,就得服务集群部署,又会涉及到消息重复消费的问题需要处理,我个人觉得,遇到重复消费问题,我第一时间想到的就是分布式锁,哈哈~。但是锁什么呢?肯定是消息中的具备唯一性的属性。来达到防止消息的重复消费。

整个过程中,其实还存在一个小问题没有验证,就是ReturnCallback回调机制没有触发,因为这个得发生在交换机将消息发送到队列的时候失败才会触发,那么我们就发送一个不存在的routingKey就可以触发了,我们发送一个routingKey=duanxin的消息,这个肯定不会发送成功,我们通过断点来看看效果,效果如下:

然后我们常见的就全部整合完成了,当然,开启了双确认机制,虽然我们可以检测到消息投送的结果,然后可以针对投送失败的结果进行预警。但是开启了这个操作,就必然会对消息的处理效率产生影响。所以还得根据实际业务场景而定是否需要使用这个确认机制。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/933881.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

自动化测试 —— Pytest测试框架

01 | 简介 Pytest是一个非常成熟的全功能的Python测试框架&#xff0c;主要有以下特点&#xff1a; 简单灵活&#xff0c;容易上手&#xff0c;文档丰富 支持参数化&#xff0c;可以细粒度地控制测试用例 支持简单的单元测试与复杂的功能测试&#xff0c;还可以用来做Seleni…

软考A计划-系统集成项目管理工程师-法律法规-下

点击跳转专栏>Unity3D特效百例点击跳转专栏>案例项目实战源码点击跳转专栏>游戏脚本-辅助自动化点击跳转专栏>Android控件全解手册点击跳转专栏>Scratch编程案例点击跳转>软考全系列点击跳转>蓝桥系列 &#x1f449;关于作者 专注于Android/Unity和各种游…

小研究 - Java虚拟机即时编译器的一种实现原理

深入分析了 &#xff2b;&#xff41;&#xff46;&#xff46;&#xff45;虚拟机的 &#xff2a;&#xff29;&#xff34;&#xff08;&#xff2a;&#xff55;&#xff53;&#xff54;&#xff0d;&#xff29;&#xff4e;&#xff0d;&#xff34;&#xff49;&#xf…

websocket和uni-app里使用websocket

一、HTTP是无状态协议 特点&#xff1a; 1、浏览器发送请求时&#xff0c;浏览器和服务器会建立一个连接。完成请求和响应。在http1.0之前&#xff0c;每次请求响应完毕后&#xff0c;会立即断开连接。在http1.1之后&#xff0c;当前网页的所有请求响应完毕后&#xff0c;才断…

64位ATT汇编语言调用自己编写的两个数相加函数,使用printf输出,发现报错Segmentation fault

cat /etc/redhat-release看到操作系统是CentOS Linux release 7.6.1810&#xff0c;uname -r看到内核版本是3.10.0-957.el7.x86_64&#xff0c;gcc --version可以看到gcc版本是12.2.0&#xff0c;gdb --version可以看到gdb版本是12.1。 twoNumberPlus.s里边的内容如下&#x…

开源游戏开发:机会与挑战

&#x1f337;&#x1f341; 博主猫头虎 带您 Go to New World.✨&#x1f341; &#x1f984; 博客首页——猫头虎的博客&#x1f390; &#x1f433;《面试题大全专栏》 文章图文并茂&#x1f995;生动形象&#x1f996;简单易学&#xff01;欢迎大家来踩踩~&#x1f33a; &a…

机器学习技术

机器学习技术是什么&#xff1f; 机器学习技术&#xff08;Machine Learning&#xff0c;ML&#xff09;是一种人工智能的分支&#xff0c;它关注如何通过数据和模型&#xff0c;让计算机自动从经验中学习&#xff0c;改进性能&#xff0c;并不断提高任务的准确性。机器学习的…

网络安全(红队)自学学习路线

想自学网络安全&#xff08;黑客技术&#xff09;首先你得了解什么是网络安全&#xff01;什么是黑客&#xff01; 网络安全可以基于攻击和防御视角来分类&#xff0c;我们经常听到的 “红队”、“渗透测试” 等就是研究攻击技术&#xff0c;而“蓝队”、“安全运营”、“安全…

Go并发可视化解释 - Select语句

昨天&#xff0c;我发布了一篇文章&#xff0c;用可视化的方式解释了Golang中通道&#xff08;Channel&#xff09;的工作原理。如果你对通道的理解仍然存在困难&#xff0c;最好呢请在阅读本文之前先查看那篇文章。作为一个快速的复习&#xff1a;Partier、Candier 和 Stringe…

ChatGPT 与 Python进行动态可视化分析

Python数据分析目前最为热门的岗位操作。 想使用Python进行可视化分析&#xff0c;但是又不想写代码&#xff0c;测试&#xff0c;验证。可以交给ChatGPT&#xff0c;open AI 来进行操作。 这样的动态图显示&#xff0c;我们只需要给ChatGPT发送一个指令&#xff0c;人工智能就…

AUTOSAR规范与ECU软件开发(实践篇)6.6 BSW模块代码生成

在BCT界面中配置完所需要的BSW模块后&#xff0c; 可以进行BSW模块相关代码与描述文件的生成&#xff0c; 点击ISOLAR-A主菜单中“ ”右边箭头&#xff0c; 选择Run Configuraions&#xff0c; 如图6.57所示。 将弹出如图6.58所示的界面。 图6.57 Run Configuraions配置&#x…

软件测试 day2

今天目标 能对穷举场景设计测试点 能对限定边界规则设计测试点 能对多条件依赖关系进行设计测试点 能对于项目业务进行设计测试点一、解决穷举场景 重点&#xff1a;使用等价类划分法 1.1 等价类划分法 重点&#xff1a;有效等价和单个无效等价各取1个即可。 步骤&#xff1a;…

R包开发-2.1:在RStudio中使用Rcpp制作R-Package(更新于2023.8.23)

目录 0-前言 1-在RStudio中创建R包项目 2-创建R包 2.1通过R函数创建新包 2.2在RStudio通过菜单来创建一个新包 2.3关于R包创建的说明 3-添加R自定义函数 4-添加C函数 0-前言 目标&#xff1a;在RStudio中创建一个R包&#xff0c;这个R包中包含C函数&#xff0c;接口是Rc…

帆软报表系统SSRF

有子曰&#xff1a;“信近于义&#xff0c;言可复也。恭近礼&#xff0c;远耻辱也。因不失其亲&#xff0c;亦可宗也。” SSRF 构造payload&#xff0c;访问漏洞url&#xff1a; /ReportServer?opresource&resourcehttp://x.x.x漏洞证明&#xff1a; 文笔生疏&#xf…

Redisson分布式锁 原理源码 分析

# 基于setnx实现的分布式锁存在的问题&#xff1a; # 为了解决上面的问题&#xff0c;可以用Redisson # Redisson入门 # Redisson可重入锁原理 获取锁的Lua脚本&#xff1a; 释放锁的Lua脚本&#xff1a; # 锁重试原理分析 tryLock&#xff08;&#xff09;底层代码分析 tim…

在metallb基础上使用 ingress-nginx

vi nginx-ingress.yaml 由于使用了metallb &#xff0c;这里需要把对外暴露service的方式改成 LoadBalancer type: LoadBalancer#type: NodePort apiVersion: v1 kind: Namespace metadata:name: ingress-nginx --- apiVersion: v1 automountServiceAccountToken: true kind…

2022年09月 C/C++(四级)真题解析#中国电子学会#全国青少年软件编程等级考试

第1题:最长上升子序列 一个数的序列bi,当b1 < b2 < … < bS的时候,我们称这个序列是上升的。对于给定的一个序列(a1, a2, …, aN),我们可以得到一些上升的子序列(ai1, ai2, …, aiK),这里1 <= i1 < i2 < … < iK <= N。比如,对于序列(1, 7, 3, 5…

计算机丢失msvcp110.dll是什么意思?有哪些方法可以修复

今天&#xff0c;我将和大家一起探讨一个关于计算机的问题——“计算机丢失msvcp110.dll是什么意思&#xff1f;有哪些方法可以修复&#xff1f;”这个问题在我们的日常生活中非常常见&#xff0c;尤其是在使用Windows系统的过程中&#xff0c;可能会遇到这样的问题。那么&…

基于Java+SpringBoot+Vue前后端分离体育馆管理系统设计和实现

博主介绍&#xff1a;✌全网粉丝30W,csdn特邀作者、博客专家、CSDN新星计划导师、Java领域优质创作者,博客之星、掘金/华为云/阿里云/InfoQ等平台优质作者、专注于Java技术领域和毕业项目实战✌ &#x1f345;文末获取源码联系&#x1f345; &#x1f447;&#x1f3fb; 精彩专…

代码随想录算法训练营第四十六天|139.单词拆分、多重背包、背包问题总结

139.单词拆分 ★ 文档讲解 &#xff1a; 代码随想录 - 139.单词拆分 状态&#xff1a;再次回顾。&#xff08;★&#xff1a;需要多次回顾并重点回顾&#xff09; 本题其实不套完全背包思路来理解反而更简单易懂一点。 动态规划五部曲&#xff1a; 确定dp数组&#xff08;dp ta…