RabbitMQ 消息顺序性保证

news2025/2/11 19:31:51

方式一:Consumer设置exclusive

在这里插入图片描述

注意条件

  • 作用于basic.consume
  • 不支持quorum queue
    在这里插入图片描述
    当同时有A、B两个消费者调用basic.consume方法消费,并将exclusive设置为true时,第二个消费者会抛出异常:
com.rabbitmq.client.AlreadyClosedException: channel is already closed due to channel error; protocol method: #method<channel.close>(reply-code=403, reply-text=ACCESS_REFUSED - queue 'test' in vhost '/' in exclusive use, class-id=60, method-id=20)
	at com.rabbitmq.client.impl.AMQChannel.ensureIsOpen(AMQChannel.java:190)
	at com.rabbitmq.client.impl.AMQChannel.rpc(AMQChannel.java:223)
	at com.rabbitmq.client.impl.ChannelN.basicConsume(ChannelN.java:981)
	at com.dms.rabbitmq.TopicSender.lambda$main$2(TopicSender.java:63)
	at java.base/java.lang.Thread.run(Thread.java:840)

Spring AMQP 如何通过exclusive实现顺序消费:

在这里插入图片描述
核心逻辑

while (!DirectMessageListenerContainer.this.started && isRunning()) {
   this.cancellationLock.reset();
   try {
      for (String queue : queueNames) {
         consumeFromQueue(queue);
      }
   }
   catch (AmqpConnectException | AmqpIOException e) {
      long nextBackOff = backOffExecution.nextBackOff();
      if (nextBackOff < 0 || e.getCause() instanceof AmqpApplicationContextClosedException) {
         DirectMessageListenerContainer.this.aborted = true;
         shutdown();
         this.logger.error("Failed to start container - fatal error or backOffs exhausted",
               e);
         this.taskScheduler.schedule(this::stop, Instant.now());
         break;
      }
      this.logger.error("Error creating consumer; retrying in " + nextBackOff, e);
      doShutdown();
      try {
         Thread.sleep(nextBackOff); // NOSONAR
      }
      catch (InterruptedException e1) {
         Thread.currentThread().interrupt();
      }
      continue; // initialization failed; try again having rested for backOff-interval
   }
   DirectMessageListenerContainer.this.started = true;
   DirectMessageListenerContainer.this.startedLatch.countDown();
}
  1. 抛出异常后,会重试
  2. 重试间隔、次数受recoveryInterval(默认无限)、recoveryBackOff控制

方式二:single active consumer

在这里插入图片描述

原理:

在这里插入图片描述

代码示例

Channel ch = ...;
Map<String, Object> arguments = newHashMap<String, Object>();
arguments.put("x-single-active-consumer", true);
ch.queueDeclare("my-queue", false, false, false, arguments);

在这里插入图片描述
参考资料:https://www.rabbitmq.com/blog/2022/07/05/rabbitmq-3-11-feature-preview-single-active-consumer-for-streams

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/2296477.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

DeepSeek R1 简单指南:架构、训练、本地部署和硬件要求

DeepSeek R1 简单指南&#xff1a;架构、训练、本地部署和硬件要求 DeepSeek 的 LLM 推理新方法 DeepSeek 推出了一种创新方法&#xff0c;通过强化学习 (RL) 来提高大型语言模型 (LLM) 的推理能力&#xff0c;其最新论文 DeepSeek-R1 对此进行了详细介绍。这项研究代表了我们…

1.攻防世界 unserialize3(wakeup()魔术方法、反序列化工作原理)

进入题目页面如下 直接开审 <?php // 定义一个名为 xctf 的类 class xctf {// 声明一个公共属性 $flag&#xff0c;初始值为字符串 111public $flag 111;// 定义一个魔术方法 __wakeup()// 当对象被反序列化时&#xff0c;__wakeup() 方法会自动调用public function __wa…

【R语言】卡方检验

一、定义 卡方检验是用来检验样本观测次数与理论或总体次数之间差异性的推断性统计方法&#xff0c;其原理是比较观测值与理论值之间的差异。两者之间的差异越小&#xff0c;检验的结果越不容易达到显著水平&#xff1b;反之&#xff0c;检验结果越可能达到显著水平。 二、用…

2025.2.9机器学习笔记:PINN文献阅读

2025.2.9周报 文献阅读题目信息摘要Abstract创新点网络架构实验结论缺点以及后续展望 文献阅读 题目信息 题目&#xff1a; GPT-PINN:Generative Pre-Trained Physics-Informed Neural Networks toward non-intrusive Meta-learning of parametric PDEs期刊&#xff1a; Fini…

JVM(Java 虚拟机)

Java语言的解释性和编译性&#xff08;通过JVM 的执行引擎&#xff09; Java 代码&#xff08;.java 文件&#xff09;要先使用 javac 编译器编译为 .class 文件&#xff08;字节码&#xff09;&#xff0c;紧接着再通过JVM 的执行引擎&#xff08;Execution Engine&#xff09…

利用二分法进行 SQL 盲注

什么是sql注入&#xff1f; SQL 注入&#xff08;SQL Injection&#xff09;是一种常见的 Web 安全漏洞&#xff0c;攻击者可以通过构造恶意 SQL 语句来访问数据库中的敏感信息。在某些情况下&#xff0c;服务器不会直接返回查询结果&#xff0c;而是通过布尔值&#xff08;Tr…

大模型数据集全面整理:444个数据集下载地址

本文针对Datasets for Large Language Models: A Comprehensive Survey 中的 444 个数据集&#xff08;涵盖8种语言类别和32个领域&#xff09;进行完整下载地址整理收集。 2024-02-28&#xff0c;由杨刘、曹家欢、刘崇宇、丁凯、金连文等作者编写&#xff0c;深入探讨了大型语…

Linux 创建进程 fork()、vfork() 与进程管理

Linux 创建进程 fork、vfork、进程管理 一、Linux的0号、1号、2号进程二、Linux的进程标识三、fork() 函数1、基本概念2、函数特点3、用法以及应用场景&#xff08;1&#xff09;父子进程执行不同的代码&#xff08;2&#xff09;进程执行另一个程序 4、工作原理 四、vfork() 函…

2025web寒假作业二

一、整体功能概述 该代码构建了一个简单的后台管理系统界面&#xff0c;主要包含左侧导航栏和右侧内容区域。左侧导航栏有 logo、管理员头像、导航菜单和安全退出按钮&#xff1b;右侧内容区域包括页头、用户信息管理内容&#xff08;含搜索框和用户数据表格&#xff09;以及页…

鸿蒙NEXT API使用指导之文件压缩和邮件创建

鸿蒙NEXT API 使用指导 一、前言二、邮件创建1、拉起垂类应用2、 UIAbilityContext.startAbilityByType 原型2.1、wantParam2.2、abilityStartCallback 与 callback 3、拉起邮箱类应用3.1、单纯拉起邮箱应用3.2、传入带附件的邮件 三、压缩文件1、认识 zlib2、压缩处理2.1、单文…

javaEE-10.CSS入门

目录 一.什么是CSS ​编辑二.语法规则: 三.使用方式 1.行内样式: 2.内部样式: 3.外部样式: 空格规范 : 四.CSS选择器类型 1.标签选择器 2.类选择器 3.ID选择器 4.通配符选择器 5.复合选择器 五.常用的CSS样式 1.color:设置字体颜色 2.font-size:设置字体大小 3…

Spring Boot牵手Redisson:分布式锁实战秘籍

一、引言 在当今的分布式系统架构中,随着业务规模的不断扩大和系统复杂度的日益增加,如何确保多个服务节点之间的数据一致性和操作的原子性成为了一个至关重要的问题。在单机环境下,我们可以轻松地使用线程锁或进程锁来控制对共享资源的访问,但在分布式系统中,由于各个服务…

制药行业 BI 可视化数据分析方案

一、行业背景 随着医药行业数字化转型的深入&#xff0c;企业积累了海量的数据&#xff0c;包括销售数据、生产数据、研发数据、市场数据等。如何利用这些数据&#xff0c;挖掘其价值&#xff0c;为企业决策提供支持&#xff0c;成为医药企业面临的重大挑战。在当今竞争激烈的…

[学习笔记] Kotlin Compose-Multiplatform

Compose-Multiplatform 原文&#xff1a;https://github.com/zimoyin/StudyNotes-master/blob/master/compose-multiplatform/compose.md Compose Multiplatform 是 JetBrains 为桌面平台&#xff08;macOS&#xff0c;Linux&#xff0c;Windows&#xff09;和Web编写Kotlin UI…

Golang 并发机制-7:sync.Once实战应用指南

Go的并发模型是其突出的特性之一&#xff0c;但强大的功能也带来了巨大的责任。sync.Once是由Go的sync包提供的同步原语。它的目的是确保一段代码只执行一次&#xff0c;而不管有多少协程试图执行它。这听起来可能很简单&#xff0c;但它改变了并发环境中管理一次性操作的规则。…

【AI实践】Cursor上手-跑通Hello World和时间管理功能

背景 学习目的&#xff1a;熟悉Cursor使用环境&#xff0c;跑通基本开发链路。 本人背景&#xff1a;安卓开发不熟悉&#xff0c;了解科技软硬件常识 实践 基础操作 1&#xff0c;下载安装安卓Android Studio 创建一个empty project 工程&#xff0c;名称为helloworld 2&am…

【多模态大模型】系列4:目标检测(ViLD、GLIP)

目录 1 ViLD2 GLIP 1 ViLD OPEN-VOCABULARY OBJECT DETECTION VIA VISION AND LANGUAGE KNOWLEDGE DISTILLATION 从标题就能看出来&#xff0c;作者是把CLIP模型当成一个Teacher&#xff0c;去蒸馏他自己的网络&#xff0c;从而能Zero Shot去做目标检测。 现在的目标检测数据…

计算机网络结课设计:通过思科Cisco进行中小型校园网搭建

上学期计算机网络课程的结课设计是使用思科模拟器搭建一个中小型校园网&#xff0c;当时花了几天时间查阅相关博客总算是做出来了&#xff0c;在验收后一直没管&#xff0c;在寒假想起来了简单分享一下&#xff0c;希望可以给有需求的小伙伴一些帮助 目录 一、设计要求 二、…

从零到一:基于Rook构建云原生Ceph存储的全面指南(下)

接上篇&#xff1a;《从零到一&#xff1a;基于Rook构建云原生Ceph存储的全面指南&#xff08;上&#xff09;》 链接: link 六.Rook部署云原生CephFS文件系统 6.1 部署cephfs storageclass cephfs文件系统与RBD服务类似&#xff0c;要想在kubernetes pod里使用cephfs&#…

AutoMQ 如何实现没有写性能劣化的极致冷读效率

前言 追赶读&#xff08;Catch-up Read&#xff0c;冷读&#xff09;是消息和流系统常见和重要的场景。 削峰填谷&#xff1a;对于消息来说&#xff0c;消息通常用作业务间的解耦和削峰填谷。削峰填谷要求消息队列能将上游发送的数据堆积住&#xff0c;让下游在容量范围内消费…