使用SpringBoot实现RabbitMQ各个模式

news2024/12/27 12:14:30

实现了RabbitMQ各个模式(simple、topic、direct、fanout及发送方确认和接收方确认)的一个demo
源码:https://gitee.com/xunan29/study-rabbitmq-test-project

参考文章:
https://blog.csdn.net/K_kzj_K/article/details/106642250
https://blog.csdn.net/qq_41466440/article/details/118567253

项目介绍

这个项目主要是实现了rabbitMQ各个模式(simple、topic、direct、fanout及发送方确认和接收方确认)的一个demo

rabbitmq-publisher-model

rabbitmq-receiver-model

上面俩是一起的,一个生产者,一个消费者

mq-message-callback-publisher-model9092

mq-message-callback-consumer-model9093

上面俩是一起的,一个生产者,一个消费者,其主要是根据上面那两个模块实现了发送方确认和接收方确认模式功能

依赖

		<!--rabbitmq-->
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-amqp</artifactId>
            <version>2.5.3</version>
        </dependency>

配置文件

server:
  port: 9090
  #优雅关停
  shutdown: graceful

spring:
  lifecycle:
    #强制关停时间(配合优雅关停)
    timeout-per-shutdown-phase: 60s

  application:
    name: rabbitmq-publisher-model

  #配置rabbitMq 服务器
  rabbitmq:
    #    host: 172.2200.10.2
    host: 127.0.0.1
    port: 5672
    username: guest
    password: guest
    #虚拟host 可以不设置,使用server默认host
    virtual-host: /

其中需要注意rabbitmq代码访问端口是5672,而不是15672,15672是用来访问可视化管理界面的

config
在这里插入图片描述

config是按各个模式需求,配置了相应的队列、交换机和binding,为什么在消费者中也配置了:是因为如果没配置在最开始时没有先启动生产者,消费者这边会报错,消费者这边配置了就避免了这个问题。

controller

生产者的controller用来生产消息

service

消费者的service用来消费信息

其中work模式我没有实现,如果要写的话它就是在simple模式上,多加个消费者就可,它只会有一个消费者消费消息,参考:https://blog.csdn.net/qq_39240694/article/details/106911755

还有需要注意,如果消息传递java对象则必须java对象是在同一个模块或者提取到一个公用模块中(即消费者和生产者在一个模块才能使用),我项目中这种虽然在两个模块但同包同类名同路径也是不可以使用的

发送方确认和接收方确认

配置文件中加一些新配置

#配置rabbitMq 服务器
  rabbitmq:
    #    host: 172.2200.10.2
    host: 127.0.0.1
    port: 5672
    username: guest
    password: guest
    #虚拟host 可以不设置,使用server默认host
    virtual-host: /

    #确认消息已发送到交换机(Exchange)
    #springboot.rabbitmq.publisher-confirm 新版本(2.2.0之后)已被弃用,现在使用 spring.rabbitmq.publisher-confirm-type = correlated 实现相同效果
    #publisher-confirms: true
    publisher-confirm-type: correlated
    #确认消息已发送到队列(Queue)
    publisher-returns: true

配置相关的消息确认回调函数,RabbitConfig.java

	@Bean
    public RabbitTemplate createRabbitTemplate(ConnectionFactory connectionFactory) {
        
        RabbitTemplate rabbitTemplate = new RabbitTemplate();
        rabbitTemplate.setConnectionFactory(connectionFactory);
        //设置开启Mandatory,才能触发回调函数,无论消息推送结果怎么样都强制调用回调函数
        rabbitTemplate.setMandatory(true);
 
        rabbitTemplate.setConfirmCallback(new RabbitTemplate.ConfirmCallback() {
            @Override
            public void confirm(CorrelationData correlationData, boolean ack, String cause) {
                System.out.println("ConfirmCallback:     "+"相关数据:"+correlationData);
                System.out.println("ConfirmCallback:     "+"确认情况:"+ack);
                System.out.println("ConfirmCallback:     "+"原因:"+cause);
            }
        });
 
        rabbitTemplate.setReturnCallback(new RabbitTemplate.ReturnCallback() {
            @Override
            public void returnedMessage(Message message, int replyCode, String replyText, String exchange, String routingKey) {
                System.out.println("ReturnCallback:     "+"消息:"+message);
                System.out.println("ReturnCallback:     "+"回应码:"+replyCode);
                System.out.println("ReturnCallback:     "+"回应信息:"+replyText);
                System.out.println("ReturnCallback:     "+"交换机:"+exchange);
                System.out.println("ReturnCallback:     "+"路由键:"+routingKey);
            }
        });

上面写了两个回调函数,一个叫 ConfirmCallback ,一个叫 RetrunCallback;
那么以上这两种回调函数都是在什么情况会触发呢?

先从总体的情况分析,推送消息存在四种情况:

①消息推送到server,但是在server里找不到交换机
②消息推送到server,找到交换机了,但是没找到队列
③消息推送到sever,交换机和队列啥都没找到
④消息推送成功

①消息推送到server,但是在server里找不到交换机

也就是这里有交换机但是没有创建也没有配置

结论: ①这种情况触发的是 ConfirmCallback 回调函数。

②消息推送到server,找到交换机了,但是没找到队列

这种情况就是需要新增一个交换机,但是不给这个交换机绑定队列,我来简单地在DirectRabitConfig里面新增一个直连交换机,名叫‘lonelyDirectExchange’,但没给它做任何绑定配置操作

结论:②这种情况触发的是 ConfirmCallback和RetrunCallback两个回调函数。

③消息推送到sever,交换机和队列啥都没找到

结论: ③这种情况触发的是 ConfirmCallback 回调函数。

④消息推送成功

结论: ④这种情况触发的是 ConfirmCallback 回调函数。

消费者接收到消息的消息确认机制

和生产者的消息确认机制不同,因为消息接收本来就是在监听消息,符合条件的消息就会消费下来。
所以,消息接收的确认机制主要存在三种模式:

①自动确认, 这也是默认的消息确认情况。  AcknowledgeMode.NONE
RabbitMQ成功将消息发出(即将消息成功写入TCP Socket)中立即认为本次投递已经被正确处理,不管消费者端是否成功处理本次投递。
所以这种情况如果消费端消费逻辑抛出异常,也就是消费端没有处理成功这条消息,那么就相当于丢失了消息。
一般这种情况我们都是使用try catch捕捉异常后,打印日志用于追踪数据,这样找出对应数据再做后续处理。

② 根据情况确认, 这个不做介绍
③ 手动确认 , 这个比较关键,也是我们配置接收消息确认机制时,多数选择的模式。
消费者收到消息后,手动调用basic.ack/basic.nack/basic.reject后,RabbitMQ收到这些消息后,才认为本次投递成功。
basic.ack用于肯定确认 
basic.nack用于否定确认(注意:这是AMQP 0-9-1的RabbitMQ扩展) 
basic.reject用于否定确认,但与basic.nack相比有一个限制:一次只能拒绝单条消息 

消费者端以上的3个方法都表示消息已经被正确投递,但是basic.ack表示消息已经被正确处理。
而basic.nack,basic.reject表示没有被正确处理:

着重讲下reject,因为有时候一些场景是需要重新入列的。

channel.basicReject(deliveryTag, true);  拒绝消费当前消息,如果第二参数传入true,就是将数据重新丢回队列里,那么下次还会消费这消息。设置false,就是告诉服务器,我已经知道这条消息数据了,因为一些原因拒绝它,而且服务器也把这个消息丢掉就行。 下次不想再消费这条消息了。

使用拒绝后重新入列这个确认模式要谨慎,因为一般都是出现异常的时候,catch异常再拒绝入列,选择是否重入列。

但是如果使用不当会导致一些每次都被你重入列的消息一直消费-入列-消费-入列这样循环,会导致消息积压。

 

顺便也简单讲讲 nack,这个也是相当于设置不消费某条消息。

channel.basicNack(deliveryTag, false, true);
第一个参数依然是当前消息到的数据的唯一id;
第二个参数是指是否针对多条消息;如果是true,也就是说一次性针对当前通道的消息的tagID小于当前这条消息的,都拒绝确认。
第三个参数是指是否重新入列,也就是指不确认的消息是否重新丢回到队列里面去。

同样使用不确认后重新入列这个确认模式要谨慎,因为这里也可能因为考虑不周出现消息一直被重新丢回去的情况,导致积压。

项目中MessageListener 的config类进行了消息接收 手动确认 配置

==MyAckReceiver ==(在service里)是对应的手动确认消息监听类

上述的类每个都有两个,一个对应处理一个队列,另一个对应处理多个队列

里面配置在注释中有详细介绍,

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/51043.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

[ Linux ] Linux信号概述 信号的产生

目录 0.问题引入&#xff1a; 0.1 将进程设置为后台进程 0.2 查看后台进程并将后台进程提至前台 0.3 将前台进程设置为后台进程 1.信号的概念 2.查看信号列表 3.信号处理的常见方式 4.信号的产生 4.1 用户层产生信号的方式 4.1.1通过终端按键产生信号 4.1.2调用系统函…

【数据集研究】PASCAL VOC 2007

目录1、数据集地址2、适用的比赛1&#xff09;Main Competitions2&#xff09;Taster Competitions3、类别及类别的定义1&#xff09;数据集包含的类别2&#xff09;类别的定义4、数据集1&#xff09;训练集、验证集、测试集2&#xff09;图片和待检测物在类别的分布详情5、标注…

Kamiya丨Kamiya艾美捷小鼠BDNF ELISA原理分析

Kamiya艾美捷小鼠BDNF ELISA预期用途&#xff1a; 小鼠BDNF ELISA用于定量测定小鼠细胞培养物上清液、细胞裂解物、细胞培养物中的BDNF&#xff0c; 血清和血浆&#xff08;肝素、EDTA、柠檬酸盐&#xff09;。仅供研究使用。 引言&#xff1a; 脑源性神经营养因子&#xff…

多线程与高并发(13)——Java常见并发容器总结

本文总结常见的并发容器&#xff0c;包含ConcurrentHashMap、CopyOnWriteArrayList 、ConcurrentLinkedQueue、BlockingQueue 、ConcurrentSkipListMap&#xff0c;本文仅做简单的总结&#xff0c;不做详细的源码分析。 一、ConcurrentHashMap HashMap不是线程安全的&#xf…

Linux基本命令(三)——服务器搭建

搭建简单Web服务器 安装web服务 yum -y install httpd 启动httpd服务 systemctl start httpd查看httpd是否开启成功 service httpd status以下是状态信息&#xff1a; 重新启动httpd systemctl restart httpd6.进入主配置文件 vim /etc/httpd/conf/httpd.conf编辑自配置文件 v…

FPGA控制W5500完成UDP环回测试

FPGA控制W5500完成UDP环回测试&#xff11; 前言&#xff12; 前期准备&#xff13; &#xff37;5500寄存器描述4 &#xff37;5500 环回测试4.1 W5500初始化4.1.1 通用寄存器初始化4.1.2 socket寄存器初始化4.2 W5500数据接收4.3 W5500数据发送4.4 数据环回5 总结&#x…

RKMEDIA--VO的使用

这一节主要说说rkmedia vo模块的使用。 rkmedia的vo是对DRM接口的封装&#xff0c;提供给用户更方便的使用&#xff0c;rv1126/rv1109支持两个vo图层。 1、首先先介绍一下DRM的测试命令--modetest&#xff0c;用来确认当前屏幕能够正常点亮。 modetest -M rockchip //打印出…

融云艾瑞发布《政企数智办公平台行业研究报告》,解读数智化时代的办公新趋势

关注公众号文章扫码报名融云&艾瑞“政企数智办公报告及新品发布会” 近期&#xff0c;安全可信的通信云服务商融云&#xff0c;携手业内权威研究机构艾瑞咨询联合发布《2022 年中国政企数智办公平台行业研究报告》&#xff08;下简称《报告》&#xff09;&#xff0c;回顾政…

Kotlin高仿微信-第28篇-朋友圈-预览图片、预览小视频

Kotlin高仿微信-项目实践58篇详细讲解了各个功能点&#xff0c;包括&#xff1a;注册、登录、主页、单聊(文本、表情、语音、图片、小视频、视频通话、语音通话、红包、转账)、群聊、个人信息、朋友圈、支付服务、扫一扫、搜索好友、添加好友、开通VIP等众多功能。 Kotlin高仿…

ios照片误删怎么恢复,iphone已经删除的照片怎么恢复

苹果手机里面的重要照片被删除了&#xff0c;相信很多人都比较着急&#xff0c;想要想办法找回来。ios照片误删怎么恢复&#xff1f; 方法1.通过“最近删除”恢复照片 苹果删除的照片如何恢复&#xff1f;一般情况下&#xff0c;从苹果手机刚删除的照片会暂存在“最近删除”这…

使用MAT分析线上问题实战

概述 MAT&#xff0c;下载地址&#xff0c;Eclipse Memory Analysis Tools&#xff0c;一个分析Java堆数据的专业工具&#xff0c;可以计算出内存中对象的实例数量、占用空间大小、引用关系等&#xff0c;可得知哪些线程阻止垃圾收集器的回收工作&#xff0c;从而定位内存泄漏…

如何低成本减少企业知识流失?天翎知识文档系统+群晖NAS值得一试

编者按&#xff1a;知识管理可以减少企业知识流失&#xff0c;有效提高企业员工工作水平&#xff0c;增强企业综合竞争力。如何小成本做好企业知识管理呢&#xff1f;天翎知识文档系统群晖NAS值得一试。 关键词&#xff1a;标签分类&#xff0c;权限管理&#xff0c;在线预览&…

Git学习

Git是什么 Github作为最大的代码托管平台&#xff0c;是基于Git开发的 Git是最优秀的版本控制工具 iCode是基于Git的代码托管平台 版本控制&#xff1a;是对软件开发过程中各种程序代码&#xff0c;配置文件&#xff0c;说明文档等。 版本控制系统&#xff1a;集中式、分布式 …

在Mac中管理多版本 java——安装和使用 jenv

jenv 的 github 地址:https://github.com/jenv/jenv 安装 $ brew install jenv安装成功后需要进行一下简单的配置,让它可以起作用 使用Bash的情况$ echo export PATH="$HOME/.jenv/bin:$PATH" >> ~/.bash_profile $ echo eval "$(jenv init -)" &…

MAC 搭建vue开发环境,配置环境变量

1.官网下载nodejs安装包 http://nodejs.p2hp.com/ 下载完成后安装&#xff0c;一直点击下一步即可 2.自定以配置全局模块路径和缓存路径 先自己找一个路径创建两个文件夹&#xff0c;node_cache 和 node_global 打开终端&#xff0c;执行一下俩条命令,注意引号中的路径要换…

图纸识别自动生成BOM清单的方法

01 方案应用领域及行业 高端装备制造业行业、离散型制造业、电气机械和器材制造业等。 02 方案应用场景 某特变电工公司是国内输变电行业的核心骨干企业&#xff0c;每年生产产能巨大&#xff0c;拥有海量的技术图纸&#xff0c;因此技术人员人工拆解设计图纸的工作难度系数大…

【优化调度】基于改进遗传算法的公交车调度排班优化的研究与实现(Matlab代码实现)

目录 1 概述 2 运行结果 3 参考文献 4 Matlab代码 1 概述 本文对当前公交企业调度系统进行了分析&#xff0c;建立了公交排班的数学模型。本文基于数据挖掘分析的结果上&#xff0c;使用截面客流量数据对模型进行约束&#xff0c;得出了公交客流出行的空间分布规律。再以…

软件测试1

这里写自定义目录标题软件测试的定义1、软件测试的目的2、软件测试的要求3、测试与开发的模型3.1 测试的工作流程3.2 开发模型3.2.1 瀑布模型3.2.2 增量模型3.2.3 快速模型3.2.4 其他模型3.3 测试模型4、测试与开发的关系5、软件测试分类软件测试的定义 找Bug,发现缺陷。使用人…

全球科学家给孩子的stem课【001-046】mp3合集

全球科学家给孩子的stem课【001-046】mp3合集&#xff0c;适合给孩子们启蒙教育使用。 一只蚂蚁走丢后的8种可能结局.mp3 下雨天清爽的味道&#xff0c;来自尸体和臭气.mp3 为什么光的三原色是红绿蓝&#xff0c;绘画的三原色又成了红黄蓝&#xff1f;.mp3 为什么剖腹产的孩子…

患有癌症的心力衰竭患者LVAD植入前景可期

相关调查显示&#xff0c;患有终末期心力衰竭的癌症患者&#xff08;有癌症史&#xff09;正在增加&#xff0c;但其进行心脏移植的可能性不大&#xff0c;而左心室辅助装置&#xff08;LVAD&#xff09;是一种可选择的替代策略。近日&#xff0c;发表于Circulation: Heart Fai…