Rabbitmq Fanout如何保证不重复消费及应用场景

news2025/1/9 20:09:53

rabbitmq fanout业务场景,一个交换机对应多个队列,不会重复消费吗

在 RabbitMQ 中,使用 Fanout 类型的交换机时,确实可以将一个交换机绑定到多个队列。每当有消息发布到这个交换机时,交换机会把消息广播到所有绑定的队列。

不会重复消费的原因

  1. 队列与消费者关系

    • 每个队列都有一个或多个消费者来处理消息。RabbitMQ 确保每条消息只能被同一个队列的一个消费者消费。
    • 消息消费是按队列级别的,即每个队列中的消费者在消费时只能消费该队列中的消息,不同队列之间是独立的。
  2. 广播到多个队列

    • 当一个 Fanout 类型的交换机收到一条消息时,交换机会将该消息广播到所有与之绑定的队列。每个队列都独立维护一份消息副本,消息会按队列分发,而 每个队列 内的消息 只会被一个消费者消费
    • 也就是说,如果有多个队列与该交换机绑定,并且这些队列各自有消费者,那么每个消费者只会从自己所在的队列中取消息进行消费,而不会重复消费其他队列中的消息。
  3. 不会重复消费的核心原理

    • 假设有一个 Fanout 交换机与多个队列绑定,且每个队列有一个或多个消费者。
    • 当交换机广播消息时,每个队列都会得到该消息,但每个队列中的消费者只会从该队列中消费消息,不会跨队列消费。
    • 例如,如果有两个队列 queue1 和 queue2,并且每个队列都有一个消费者,那么每个消费者只会消费自己队列的消息,而不会消费其他队列中的消息。

举个例子

假设我们有一个 fanout 类型的交换机,名为 logs,并且我们绑定了两个队列 queue1queue2 到这个交换机。

  1. 消息发送

    • 如果向 logs 交换机发送了一条消息,RabbitMQ 会把这条消息广播到 queue1 和 queue2
  2. 队列消费

    • 假设 queue1 上有一个消费者 consumer1queue2 上有一个消费者 consumer2
    • consumer1 会从 queue1 中消费消息,而 consumer2 会从 queue2 中消费消息。
    • 每个消费者只会消费自己队列中的消息,因此不会重复消费。

总结

  • Fanout 交换机是广播消息到所有绑定的队列,每个队列都会接收到一份消息副本。
  • 消息 不会在不同的队列间重复消费,而是由各自队列内的消费者独立消费。
  • 如果同一个队列有多个消费者,它们会按照队列中的负载均衡机制分摊消费任务,但同一个队列中的消息 只能被一个消费者消费

所以,Fanout 交换机下的多个队列不会导致消息的重复消费,每个队列独立处理自己的消息

实际业务场景

但是一个消息被同时发送到多个队列中,就算每个队列中只能被一个消费者消费,不还是消费多次吗,请从实际业务场景给出例子吧,比如订单库存

是的,你说得对,如果一个消息被同时发送到多个队列中,每个队列内的消息会被独立消费,确实会出现“同一个消息被多个消费者消费”的情况。这种情况需要根据实际业务场景来设计系统,以确保消息的消费不会造成问题。以下是一个 订单库存管理 的实际业务场景例子来帮助说明:

场景背景:订单库存管理

假设你有一个电商系统,当用户下单后,系统需要做两件事:

  1. 扣减库存:减少商品库存。
  2. 创建订单:保存订单信息并做相关处理。

为了实现高可用和高并发,系统将这些操作拆分成两个独立的服务,并且通过消息队列来进行异步处理。你选择使用 RabbitMQ 来传递消息。

业务流程
  1. 用户下单

    • 用户提交订单时,系统会生成一条订单消息,并将该消息发送到多个队列:
      • 一个队列负责处理 扣减库存
      • 另一个队列负责 创建订单
  2. 消息发送

    • 订单消息通过 Fanout 类型的交换机广播到两个队列:order_queue 和 stock_queue
    • 这些队列都绑定到同一个交换机,因此每个队列都会接收到相同的订单消息。
  3. 消息消费

    • order_queue 会有一个消费者来处理订单创建的操作。
    • stock_queue 会有另一个消费者来处理库存扣减的操作。
消费者行为
  • order_queue 的消费者:消费者会从 order_queue 队列中取出消息并执行 订单创建 操作,保存订单记录到数据库中。

  • stock_queue 的消费者:消费者会从 stock_queue 队列中取出消息并执行 库存扣减 操作,更新商品库存。

关键问题:消息重复消费

在这种场景下,同一条订单消息会同时被发送到多个队列,因此 每个队列内的消息都将被独立消费

这确实会让你产生一个疑问:同一条消息被发送到多个队列后,会不会在消费者中被重复消费,从而导致业务错误(例如扣减库存两次或创建两次订单)?

答案:不会重复消费

虽然每个队列都会收到相同的消息,但 每个队列只会被单独的消费者消费,且每个消费者只会消费自己队列中的消息。例如:

  • order_queue 的消费者 只负责处理订单创建,而不会去处理库存扣减。
  • stock_queue 的消费者 只负责处理库存扣减,而不会去处理订单创建。

因此,尽管消息在队列之间是广播的,但由于消费者是独立处理不同队列的消息,所以不会导致重复消费 同一条消息,从而避免了逻辑错误。

为什么这样设计?

这种设计的关键在于 解耦和职责分离,这样可以将不同的业务逻辑分开,使得系统具备更好的扩展性和容错性。每个消费者只负责一个独立的任务,避免了一个消费者过载或失败时影响到其他任务。

具体例子

假设有一个用户下单的请求,系统生成一条订单消息 order_msg

  1. order_msg 被发送到 Fanout 交换机,并广播到两个队列:

    • order_queue(负责创建订单)
    • stock_queue(负责扣减库存)
  2. 消息到达两个队列:

    • order_queue 中的消费者(假设消费者 A)消费该消息并创建订单。
    • stock_queue 中的消费者(假设消费者 B)消费该消息并扣减库存。
  3. 结果:

    • 订单创建操作和库存扣减操作是 独立的,并且各自只处理一个特定的任务。因此,每个队列中的消费者处理的都是各自队列的消息,而不会发生重复消费。

高并发情况下的注意事项

在高并发场景下,虽然一个消息会被发送到多个队列,但由于不同队列的消费者独立工作,每个消费者会处理自己队列中的消息,不会发生重复消费。为了避免在不同队列间出现竞争问题,可能需要在系统设计时考虑:

  • 事务性操作:例如在创建订单和扣减库存的过程中,可以使用数据库的事务机制来保证两者的原子性,避免出现只扣减库存但没有创建订单的情况,或反之。
  • 幂等性:为了确保即使消息被重复投递或重复处理,系统也能保证业务逻辑的正确性和一致性,通常需要在消费者端做幂等性设计。

总结

虽然在 Fanout 交换机的场景下,每个消息会被广播到多个队列,并且每个队列的消息会被独立消费,但 每个队列的消费者只会消费自己的队列中的消息。因此,消息不会在不同消费者之间重复消费,不会导致库存扣减和订单创建的重复操作。

订单与库存分别是不同的服务,如何做到事务性操作

当订单与库存是不同的服务时,保证两者的事务性操作变得复杂,因为它们通常运行在不同的进程、不同的数据库、甚至不同的物理主机上。在这种微服务架构下,传统的数据库事务(ACID)无法跨服务进行,因此需要采用分布式事务的方案来保证订单创建与库存扣减操作的一致性。

常见的分布式事务解决方案有 两阶段提交(2PC)TCC(Try-Confirm-Cancel)最终一致性(通过补偿机制)等。对于订单和库存这种场景,通常推荐的方案是通过 消息队列 + 补偿机制Saga模式 来确保最终一致性。

以下是几种常见的实现方法:

1. 基于消息队列 + 补偿机制(最常见的解决方案)

在这个方案中,订单服务和库存服务通过消息队列进行解耦,消息队列提供了异步、可靠的消息传递机制。具体流程如下:

1.1 订单服务和库存服务的操作流程
  1. 用户下单

    • 订单服务收到用户下单请求后,首先创建订单并生成订单ID。
    • 然后,订单服务会将一个“扣减库存”的消息发送到消息队列(例如,RabbitMQ、Kafka 等)。
  2. 库存服务处理消息

    • 库存服务从消息队列中消费“扣减库存”的消息,进行库存扣减操作。
    • 如果库存扣减成功,库存服务将发送一个“扣减库存成功”的消息回给订单服务,表示库存操作成功。
  3. 订单服务确认库存扣减

    • 订单服务收到“扣减库存成功”的确认消息后,确认订单创建的事务完成,最终提交订单。
  4. 失败处理

    • 如果库存服务在处理扣减库存时失败(例如,库存不足,或者系统崩溃),可以通过回退机制来补偿之前的订单操作。
    • 例如,订单服务可以设置一个 超时回调,如果在规定时间内没有收到库存服务的确认消息,则触发补偿操作(如取消订单或通知用户)。
1.2 补偿机制

补偿机制的核心思想是,当某个操作失败时,通过补偿操作来回滚之前成功的操作。例如,如果库存扣减失败,可以通过回滚库存并取消之前成功创建的订单。

在实际操作中,订单和库存的补偿可以通过以下方式实现:

  • 订单补偿:如果库存扣减失败,订单服务可以通过 API 调用回滚订单(例如删除订单或标记为失败)。
  • 库存补偿:如果订单未成功提交(例如用户取消订单),库存服务可以通过 API 调用恢复库存。
1.3 如何实现幂等性

为了避免消息重复消费导致的副作用(比如库存扣减两次、订单创建多次),通常需要在消息处理时实现 幂等性

  • 库存服务:可以根据订单ID来判定是否已经处理过该消息。如果已经扣减过库存,则不再重复操作。
  • 订单服务:可以通过唯一的订单ID来判断订单是否已经提交,避免重复创建订单。

2. 基于Saga模式

Saga模式是一种分布式事务的模式,适用于微服务架构中的跨服务事务。在Saga模式中,长事务被分解为一系列的局部事务,每个局部事务在执行完后都会发布一个事件或发送消息给下游服务。Saga模式的关键在于 补偿事务,即每个局部事务都有一个相应的补偿操作。

2.1 Saga模式的流程
  • Step 1:订单服务收到用户下单请求后,创建订单并保存到数据库,同时发送一条“扣减库存”的消息。
  • Step 2:库存服务接收到扣减库存的消息,进行库存扣减。如果扣减成功,则返回确认消息;如果失败,则通过补偿机制发出库存不足的消息。
  • Step 3:订单服务收到确认消息后提交订单。如果库存扣减失败,订单服务通过补偿操作回滚订单。
  • Step 4:如果库存扣减失败,库存服务会进行补偿操作,比如恢复库存。
2.2 事务补偿

Saga模式的关键在于如何定义补偿操作(即回滚操作)。在我们的场景中,补偿操作可以是:

  • 订单补偿:如果库存扣减失败,订单服务需要取消订单操作,可能是删除订单或标记为“失败”。
  • 库存补偿:如果订单取消或失败,库存服务需要恢复库存。

3. TCC(Try-Confirm-Cancel)模式

TCC模式也是一种分布式事务的方案,它通过三个操作来保证分布式事务的一致性:

  • Try:尝试执行操作,确保资源可用并且锁定资源。
  • Confirm:在所有的Try操作成功后,执行确认操作。
  • Cancel:如果Try操作失败或最终无法确认,则执行取消操作。
3.1 TCC模式流程
  1. 订单服务(Try):订单服务调用库存服务的 try 接口,检查库存是否足够并预扣库存。
  2. 库存服务(Try):库存服务尝试扣减库存并锁定资源,返回成功或失败。
  3. 订单服务(Confirm):如果库存服务成功,订单服务调用 confirm 接口确认订单创建并提交。
  4. 库存服务(Confirm):库存服务执行库存扣减操作,确保库存被扣除。
  5. 失败时的Cancel
    • 如果订单服务没有接到库存的确认消息,或者库存不足,订单服务会调用 cancel 接口回滚订单。
    • 库存服务会在确认失败时调用 cancel 操作,恢复库存。

4. 最终一致性 + 事件驱动

在实际的电商系统中,可能更多的是采用 最终一致性 的方式来保证跨服务的数据一致性。系统通过 事件驱动架构 来保证不同服务之间最终的一致性。

  • 订单服务:创建订单后发布事件。
  • 库存服务:接收到库存扣减的事件并进行库存扣减操作。
  • 失败恢复:如果库存服务失败,则通过重试机制或者补偿机制恢复数据一致性。

总结

对于分布式系统中的订单与库存这类跨服务事务,无法依赖传统的数据库事务,需要通过一些分布式事务解决方案来保证数据一致性。常用的方式有:

  1. 消息队列 + 补偿机制:通过消息传递解耦订单与库存,使用补偿操作来保证一致性。
  2. Saga模式:将大事务拆分为多个局部事务,并通过补偿操作保证最终一致性。
  3. TCC模式:通过 Try-Confirm-Cancel 三步操作来保证事务一致性。

最常见的实践是 消息队列 + 补偿机制,通过保证消息的可靠性和幂等性,以及设置补偿机制来应对系统故障或异常情况,最终实现订单和库存之间的数据一致性。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/2273985.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

各品牌大语言模型汇总

2024年大语言模型快速发展,应用广泛。面对众多选择,我们整理了一份各大语言模型汇总表格,提供清晰参考,助您了解各模型参数(截止日期为2025年1月8日)。 高通智匠AI支持在Windows/Android/MAC等平台上使用 …

xtu oj 1614 数字(加强版)

输出格式# 每行输出一个样例的结果&#xff0c;为一个整数。 样例输入# 3 1 10 101 样例输出# 1 2 3 解题思路&#xff1a;这个题不要想复杂了&#xff0c;很容易超时。 首先需要注意的点&#xff0c;n<10的10000次方&#xff0c;用int或者long long都会爆&#xff0c;所…

【25考研】川大计算机复试情况,重点是啥?怎么准备?

24年进入复试的同学中&#xff0c;有10位同学的复试成绩为0分。具体是个人原因还是校方原因&#xff0c;还尚不明确。但是C哥提醒&#xff0c;一定要认真复习&#xff01;复试完后不要跟任何人讨论有关复试的题目及细节&#xff01; 一、复试内容 四川大学复试内容较多&#xf…

AR 眼镜之-拍照/录像动效切换-实现方案

目录 &#x1f4c2; 前言 AR 眼镜系统版本 拍照/录像动效切换 1. &#x1f531; 技术方案 1.1 技术方案概述 1.2 实现方案 1&#xff09;第一阶段动效 2&#xff09;第二阶段动效 2. &#x1f4a0; 默认代码配置 2.1 XML 初始布局 2.2 监听滑动对 View 改变 3. ⚛️…

STM32-笔记39-SPI-W25Q128

一、什么是SPI&#xff1f; SPI是串行外设接口&#xff08;Serial Peripheral Interface&#xff09;的缩写&#xff0c;是一种高速的&#xff0c;全双工&#xff0c;同步的通信总线&#xff0c;并且 在芯片的管脚上只占用四根线&#xff0c;节约了芯片的管脚&#xff0c;同时为…

JavaScript动态渲染页面爬取之Selenium

前面这篇博客讲解了 Ajax 的分析方法&#xff0c;利用 Ajax接口可以非常方便地爬取数据。只要能找到 Ajax接口的规律&#xff0c;就可以通过某些参数构造出对应的请求&#xff0c;自然就能轻松爬取数据啦。 但是在很多情况下&#xff0c;Ajax请求的接口含有加密参数&#xff0…

python学习笔记—14—函数

1. 函数 (1) len与my_len str "supercarrydoinb"def my_len(tmp_str):cnt 0for i in tmp_str:cnt 1return cntstr_len_1 len(str) str_len_2 my_len(str) print(f"len {str_len_1}") print(f"my_len {str_len_2}") (2) 函数传参数量不受…

Unity性能优化总结

目录 前言 移动端常见性能优化指标​编辑 包体大小优化 FPS CPU占用率 GPU占用率 内存 发热和耗电量 流量优化 前言 终于有时间了,我将在最近两个项目中进行优化的一些经验进行归纳总结以飨读者。因为我习惯用思维导图,所以归纳的内容主要以图来表达希望对大家有用。…

51单片机——定时器中断(重点)

STC89C5X含有3个定时器&#xff1a;定时器0、定时器1、定时器2 注意&#xff1a;51系列单片机一定有基本的2个定时器&#xff08;定时器0和定时器1&#xff09;&#xff0c;但不全有3个中断&#xff0c;需要查看芯片手册&#xff0c;通常我们使用的是基本的2个定时器&#xff…

基于html5实现音乐录音播放动画源码

源码介绍 基于html5实现音乐录音播放动画源码是一款类似Shazam的UI&#xff0c;点击按钮后&#xff0c;会变成为一个监听按钮。旁边会有音符飞入这个监听按钮&#xff0c;最后转换成一个音乐播放器。 效果预览 源码获取 基于html5实现音乐录音播放动画源码

对话|全年HUD前装将超330万台,疆程技术瞄准人机交互“第一屏”

2024年&#xff0c;在高阶智驾进入快速上车的同时&#xff0c;座舱人机交互也在迎来新的增长点。Chat GPT、AR-HUD、车载投影等新配置都在带来新增量机会。 高工智能汽车研究院监测数据显示&#xff0c;2024年1-10月&#xff0c;中国市场&#xff08;不含进出口&#xff09;乘用…

【技术支持】安卓无线adb调试连接方式

Android 10 及更低版本&#xff0c;需要借助 USB 手机和电脑需连接在同一 WiFi 下&#xff1b;手机开启开发者选项和 USB 调试模式&#xff0c;并通过 USB 连接电脑&#xff08;即adb devices可以查看到手机&#xff09;&#xff1b;设置手机的监听adb tcpip 5555;拔掉 USB 线…

Dependency check 通过Maven构建时,配置Mysql数据库遇到的三个坑

使用过Dependency check的同学&#xff0c;一定会遇到这个问题—— 每次执行依赖扫描时&#xff0c;由于网络问题会导致NVD下载种子数据的过程中的种种失败&#xff0c;不仅浪费了大量时间&#xff0c;还会因为下载文件的不完整性直接导致依赖检测的失败。所以我在使用Dependen…

uniApp通过xgplayer(西瓜播放器)接入视频实时监控

&#x1f680; 个人简介&#xff1a;某大型国企资深软件开发工程师&#xff0c;信息系统项目管理师、CSDN优质创作者、阿里云专家博主&#xff0c;华为云云享专家&#xff0c;分享前端后端相关技术与工作常见问题~ &#x1f49f; 作 者&#xff1a;码喽的自我修养&#x1f9…

【OJ刷题】同向双指针问题

这里是阿川的博客&#xff0c;祝您变得更强 ✨ 个人主页&#xff1a;在线OJ的阿川 &#x1f496;文章专栏&#xff1a;OJ刷题入门到进阶 &#x1f30f;代码仓库&#xff1a; 写在开头 现在您看到的是我的结论或想法&#xff0c;但在这背后凝结了大量的思考、经验和讨论 目录 1…

Ubuntu中使用miniconda安装R和R包devtools

安装devtools环境包 sudo apt-get install gfortran -y sudo apt-get install build-essential -y sudo apt-get install libxt-dev -y sudo apt-get install libcurl4-openssl-dev -y sudo apt-get install libxml2.6-dev -y sudo apt-get install libssl-dev -y sudo apt-g…

《分布式光纤测温:解锁楼宇安全的 “高精度密码”》

在楼宇建筑中&#xff0c;因其内部空间庞大&#xff0c;各类电器设施众多&#xff0c;如何以一种既高效又稳定&#xff0c;兼具低成本与高覆盖特性的方式&#xff0c;为那些关键线路节点开展温度监测&#xff0c;是目前在安全监测领域一项重点研究项目&#xff0c;而无锡布里渊…

git撤回提交、删除远端某版本、合并指定版本的更改

撤回提交 vscode的举例 一、只提交了还未推送的情况下 1.撤回最后一次提交&#xff0c;把最后一次提交的更改放到暂存区 git reset --soft HEAD~12.撤回最后一次提交&#xff0c;把最后一次提交的更改放到工作区 git reset --mixed HEAD~13.撤回最后一次提交&#xff0c;不…

【Spring Boot】Spring AOP 快速上手指南:开启面向切面编程新旅程

前言 &#x1f31f;&#x1f31f;本期讲解关于spring aop的入门介绍~~~ &#x1f308;感兴趣的小伙伴看一看小编主页&#xff1a;GGBondlctrl-CSDN博客 &#x1f525; 你的点赞就是小编不断更新的最大动力 &#x1f386;那么废话不…

开源CMS建站系统的安全优势有哪些?

近年来&#xff0c;用户们用开源CMS系统搭建网站的比例也越来越高&#xff0c;它为用户提供了便捷的网站建设解决方案。其中&#xff0c;亿坊CMS建站系统更因安全方面备受用户欢迎&#xff0c;下面带大家一起全面地了解一下。 一、什么是开源CMS&#xff1f; 开源CMS指的是那…