Spring RabbitMQ那些事(3-消息可靠传输和订阅)

news2025/1/12 3:04:43

目录

  • 一、序言
  • 二、生产者确保消息发送成功
    • 1、为什么需要Publisher Confirms
    • 2、哪些消息会被确认处理成功
  • 三、消费者保证消息被处理
  • 四、Spring RabbitMQ支持代码示例
    • 1、 application.yml
    • 2、RabbigtMQ配置
    • 3、可靠生产者配置
    • 4、可靠消费者配置
    • 5、测试用例

一、序言

在有些业务场景中,消息是不能丢的,比如分布式事务资金动账,出账方扣款,那么入账方就一定要收款。以前写了一篇分布式事务的文章,里面的跨地区转账就是一个实际案例。

消息是有可能丢的,比如生产者在发送消息时broker服务挂了,消息没有来得及落盘,这时消息就彻底丢了。

保证MQ消息可靠传输主要有两个方面,一方面是消息生产者确保消息一定发送成功,另一方面是消费者确保消息一定被处理。


二、生产者确保消息发送成功

1、为什么需要Publisher Confirms

在Spring AMQP中AmqpTemplate的实现RabbitTemplate已经支持 Publisher Confirms and Returns,所谓的publisher confirms意思就是消息发布者确认消息是否已经被发送。

在RabbitMQ官方文档描述中,持久化的消息在Broker重启时也是应该存活的,这里的词用的是应该,因为消息有可能在落地磁盘前Broker就挂了,导致消息丢失。

最直接的解决方案是通过事务,但是通过事务有两个问题:

  • 事务阻塞:发布者必须等待Broker处理完每条消息。
  • 事务很重:每次提交都会要求触发fsync(),强制磁盘,这个过程需要花很长的时间。

备注:在RabbitMQ官方测试中,通过事务去保证,发布10000条消息需要花至少4分钟的时间。

在这里插入图片描述

而通过Publisher Confirm机制,一旦Broker处理完就会确认消息,而且这个过程是异步的,生产者可以流式发布消息,不需要等待Broker,并且Broker会批量高效将消息落盘。

2、哪些消息会被确认处理成功

当Broker确认消息时,会通知消息发布者消息是否被成功处理,成功处理的基本规则如下:

  • 无法路由的mandatory(必须有符合条件的队列)和immediate(必须有消费者在线)类型在被basic.return后会被确认。
  • 非持久化消息在入队时会被确认。
  • 持久化消息当持久化到磁盘或者被消费者消费时会被确认。

三、消费者保证消息被处理

消费者端确保消息消费很简单,关闭消息自动确认就好,开启消息手动确认。当然有些场景消息只能被处理一次,可以通过分布式锁来实现。


四、Spring RabbitMQ支持代码示例

1、 application.yml

server:
  port: 8080
spring:
  rabbitmq:
    addresses: localhost:5672
    username: admin
    password: admin
    virtual-host: /
    publisher-returns: true
    publisher-confirm-type: correlated
    listener:
      type: simple
      simple:
        acknowledge-mode: manual
        concurrency: 5
        max-concurrency: 20
        prefetch: 5
    template:
      mandatory: true

备注:

  • 这里一定要设置spring.rabbitmq.publisher-returnstrue,并且设置spring.rabbitmq.publisher-confirm-typecorrelated,同时设置spring.rabbitmq.template.mandatorytrue
  • 上面我们将消费者的确认模式改为了手动确认

2、RabbigtMQ配置

@Configuration
public class RabbitReliableTransportConfig {

	/**
	 * RabbitTemplate消息转换器配置,自动将对象转换为json字符串
	 *
	 * @return
	 */
	@Bean
	public MessageConverter jackson2JsonMessageConverter() {
		Jackson2JsonMessageConverter messageConverter = new Jackson2JsonMessageConverter();
		messageConverter.setClassMapper(new DefaultJackson2JavaTypeMapper());
		return messageConverter;
	}

	@Bean
	public Queue reliableQueue() {
		return QueueBuilder.durable("reliable-queue").build();
	}
}

3、可靠生产者配置

@Slf4j
@Component
@RequiredArgsConstructor
public class RabbitMqReliableProducer {

	private final RabbitTemplate rabbitTemplate;

	public void sendReliableMsg(String body) {
		// 发送可靠消息
		ReliableMsgDTO reliableMsgDTO = ReliableMsgDTO.builder().body(body).build();
		CorrelationData correlationData = new CorrelationData();
		rabbitTemplate.convertAndSend("reliable-queue", reliableMsgDTO, correlationData);

		// 发送确认逻辑
		CompletableFuture<Confirm> future = correlationData.getFuture().completable();
		future.whenComplete((confirm, throwable) -> {
			if (confirm.isAck()) {
				log.info("消息已经被成功发送, 消息内容:{}", JSON.toJSONString(reliableMsgDTO));
				return;
			}

			log.warn("消息发送未成功发送, 原因:{}, 消息内容:{}", confirm.getReason(), JSON.toJSONString(reliableMsgDTO), throwable);
			// 5秒后再发送
			LockSupport.parkNanos(5 * 1000 * 1000 * 1000L);
			rabbitTemplate.convertSendAndReceive(reliableMsgDTO, correlationData);
		});
	}
}

4、可靠消费者配置

@Slf4j
@Component
public class RabbitMQReliableConsumer {

	@RabbitListener(queues = "reliable-queue")
	public void handleMsgFromQueue(ReliableMsgDTO reliableMsgDTO, Channel channel, @Header(AmqpHeaders.DELIVERY_TAG) long tag) throws IOException {
		channel.basicAck(tag, false);
		// channel.basicNack(tag, false, false);
		log.info("Message received from queue, message body: {}", JSON.toJSONString(reliableMsgDTO));
	}
}

备注:这里我们开启了消息的手动确认,如果消息处理失败没有确认,那么消息将会在下次消费者参加连接时再次被投递。

5、测试用例

测试结果如下,每当消息发送至Broker成功后会触发回调,如果消息发送失败将会触发重新发送。

2024-01-20 18:13:11.399  INFO 12316 --- [78.107.127:5672] c.u.r.i.p.RabbitMqReliableProducer       : 消息已经被成功发送, 消息内容:{"body":"hello"}
2024-01-20 18:13:11.399  INFO 12316 --- [ntContainer#0-5] c.u.r.i.c.RabbitMQReliableConsumer       : Message received from queue, message body: {"body":"hello"}

在这里插入图片描述

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/1399910.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

安装MySQL8.0

安装MySQL8.0 第一步我们先把MySQL8.0的镜像拉一下&#xff08;建议在网络好的情况下 下拉镜像&#xff09; 之后我们在创造一个容器 conf目录 必须提前上传my.cnf文件到/data/conf目录 并且它与window中的配置文件my.ini后缀名是不一样 data目录 数据保存到宿主机中&#x…

Laya3.0 相机使用

摄像机&#xff0c;是3D场景里边最经常使用的对象了。 官方文档&#xff1a;点击这里学习 1.投影 Projection 透视&#xff1a; 模拟人眼的视觉效果&#xff0c;近大远小。模拟物理世界的规律&#xff0c;将眼睛或相机抽象成一个点&#xff0c;此时视锥体内的物体投影到视平…

logstack 日志技术栈-04-opensource 开源工具 Syslog-ng+Highlight.io

5. Syslog-ng Syslog-ng 是一个开源的日志管理解决方案&#xff0c;主要用于收集和处理日志数据。它可以从多种源收集日志&#xff0c;包括系统日志、网络设备日志和第三方应用日志。 然后将日志解析、分类、重写和关联到统一格式中&#xff0c;然后将其存储或安全地传输到不同…

【C/Python】用GTK实现多文档窗体程序

一、用C语言 在GTK&#xff08;GIMP Toolkit&#xff09;中实现多文档接口&#xff08;MDI&#xff09;程序可以使用多种方法。GTK本身并没有提供专用的MDI窗口小部件&#xff0c;但可以使用标签页&#xff08;Notebook&#xff09;或多个窗口&#xff08;Window&#xff09;来…

力扣精选算法100题——串联所有单词的字串(滑动窗口专题)

本题链接——串联所有单词的字串 本题和找到字符串中所有字母异位词题目非常相似&#xff0c;思路都是一样。通过自己的大脑能发现其中的相似之处。 第一步&#xff1a;了解题意 就按实例来分析吧&#xff0c;这样更通俗易懂。 words["ab","cd","ef…

小程序学习-19

Vant Weapp - 轻量、可靠的小程序 UI 组件库 ​​​​​ Vant Weapp - 轻量、可靠的小程序 UI 组件库 安装出现问题&#xff1a;rollbackFailedOptional: verb npm-session 53699a8e64f465b9 解决办法&#xff1a;http://t.csdnimg.cn/rGUbe Vant Weapp - 轻量、可靠的小程序…

如何证明一个矩阵是可逆矩阵?

想要证明一个矩阵是可逆矩阵&#xff0c;其实就是要知道可逆矩阵具有哪些性质。荒原之梦考研数学网把线性代数中可逆矩阵的常用性质都整理在下面了&#xff1a;

供应链安全项目in-toto开源框架详解

引言&#xff1a;in-toto 是一个开源框架&#xff0c;能够以密码学的方式验证构件生产路径上的每个组件和步骤。它可与主流的构建工具、部署工具进行集成。in-toto已经被CNCF技术监督委员会 (Technical Oversight Committee&#xff0c;TOC)接纳为CNCF孵化项目。 1. 背景 由于…

String在VS与Linux下的区别

目录 一、string的成员 1.VS 2.Linux 二、string的扩容机制 1. VS 2.Linux 一、string的成员 string是C标准库中的一个类模板&#xff0c;用于表示和操作字符串 string在 Windows 与 Linux 中的成员不是相同的 1.VS 4个成员&#xff1a;_str , _size , _capacity 和…

第三课:GPT

文章目录 第三课&#xff1a;GPT1、学习总结&#xff1a;GPT出现的原因GPT的方法原理目前存在的问题无监督的预训练优化目标模型结构 监督微调课程ppt及代码地址 2、学习心得&#xff1a;3、经验分享&#xff1a;4、课程反馈&#xff1a;5、使用MindSpore昇思的体验和反馈&…

SpringMVC(八)处理AJAX请求

一、处理AJAX之准备工作: 首先我们创建一个新的工程: 我们将pom.xml复制过来: <?xml version="1.0" encoding="UTF-8"?> <project xmlns="http://maven.apache.org/POM/4.0.0"xmlns:xsi="http://www.w3.org/2001/XMLSchema-in…

JVM之java内存区域[1](程序计数器、栈)

文章目录 版权声明零 运行时数据区一 程序计数器1.1 加载阶段1.2 执行阶段1.3 多线程情况 二 栈2.1 java虚拟机栈2.2 java虚拟机栈帧的组成2.2.1 局部变量表2.2.2 操作数栈2.2.3 帧数据 2.3 栈内存溢出2.4 设置帧大小2.5 本地方法栈 版权声明 本博客的内容基于我个人学习黑马程…

OCS2 入门教程(六)- Double Integrator

系列文章目录 前言 双积分器示例是我们最简单的问题。它模拟了一个沿 x 方向移动的一维点质量。模型是线性的&#xff0c;成本函数是二次函数。目标点通过参考管理器模块设置为二次成本。 一、查看文件结构 1.1 ocs2_double_integrator 文件夹 . ├── auto_generated ├─…

Python爬虫时被封IP,该怎么解决?四大动态IP平台测评

在使用 Python 进行爬虫时&#xff0c;很有可能因为一些异常行为被封 IP&#xff0c;这主要是因为一些爬虫时产生的异常行为导致的。 在曾经的一次数据爬取的时候&#xff0c;我尝试去爬取Google地图上面的商家联系方式和地址信息做营销&#xff0c;可是很不幸&#xff0c;还只…

Centos使用Docker搭建自己的Gitlab(社区版和设置汉化、修改密码、设置SSH秘钥)

根据我的经验 部署Gitlab&#xff08;社区版&#xff09; 至少需要2核4g的服务器 带宽3~4M 1. 在自己电脑上安装终端&#xff1a;宝塔ssl终端 或者 FinalShell&#xff0c;根据喜好安装即可 http://www.hostbuf.com/t/988.html http://www.hostbuf.com/downloads/finalshell_w…

华为原生 HarmonyOS NEXT 鸿蒙操作系统星河版 发布!不依赖 Linux 内核

华为原生 HarmonyOS NEXT 鸿蒙操作系统星河版 发布&#xff01;不依赖 Linux 内核 发布会上&#xff0c;余承东宣布&#xff0c;HarmonyOS NEXT鸿蒙星河版面向开发者开放申请。 申请链接 鸿蒙星河版将实现原生精致、原生易用、原生流畅、原生安全、原生智能、原生互联6大极致原…

原型设计 Axure RP 9

Axure RP 9是一款专业的原型设计和协作工具&#xff0c;让用户快速创建高保真度的交互原型&#xff0c;模拟真实的用户界面和交互体验。该软件界面布局合理&#xff0c;易于使用&#xff0c;提供丰富的交互功能和效果&#xff0c;如动态面板、变量、条件逻辑、动画等。同时支持…

DFA有穷自动机敏感词过滤算法

1.EndType package com.example.utils.wordfilter;/*** 结束类型定义*/ public enum EndType {/*** 有下一个&#xff0c;结束*/HAS_NEXT, IS_END } 2.WordType package com.example.utils.wordfilter;/*** 词汇类型*/ public enum WordType {/*** 黑名单/白名单*/BLACK, WH…

JS-WebAPIs-本地存储(五)

• 本地存储介绍 以前我们页面写的数据一刷新页面就没有了&#xff0c;是不是&#xff1f;随着互联网的快速发展&#xff0c;基于网页的应用越来越普遍&#xff0c;同时也变的越来越复杂&#xff0c;为了满足各种各样的需求&#xff0c;会经常 性在本地存储大量的数据&#xf…

python类继承之__init__函数覆盖问题

目录 1.问题描述 2.代码演示 3.总结 在Python这个广受欢迎的编程语言中&#xff0c;类继承是一项强大而精巧的特性。通过类继承&#xff0c;我们可以构建出更加灵活、可重用和易维护的代码&#xff0c;同时实现代码的模块化和扩展性。 但是如果对于熟悉C和java的人而言&…