消息队列如何保证消息幂等性消费

news2025/1/11 19:40:23

1 介绍

我们实际系统中有很多操作,不管你执行多少次,都应该产生一样的效果或返回一样的结果。 例如:

  • 前端页面重复提交选中的数据,服务端只产生对应这个数据的一个反应结果,只保存一次数据。
  • 我们发起一笔付款请求,也只能扣用户账户一次钱,即使遇到网络重发或系统bug重发,也应该只扣一次金额。
  • 消息通知,也应该只能收到一次,如果收到多次的扣款通知短信,会让用户误解的。
  • 创建商品订单,一次业务请求只能创建一个,创建多个就会变成购买多次,就会出问题。

以上等等很多重要的场景,都需要幂等的特性来支持。

幂等(idempotent、idempotence)是一个数学与计算机学概念,常见于抽象代数中。 在编程中.一个幂等操作的特点是其任意多次执行所产生的影响均与一次执行的影响相同。幂等函数,或幂等方法,是指可以使用相同参数重复执行,并能获得相同结果的函数。这些函数不会影响系统状态,也不用担心重复执行会对系统造成改变。
例如,“getUserSex()和setRight()”函数就是一个幂等函数,包括数据库中的查询和删除也是一样的道理,它是天然幂等的。总之,幂等就是一个操作,不论执行多少次,产生的效果和返回的结果都是一样的 。

2 消息队列中如何保证幂等性

2.1 消息队列的基本构成

我们先来回顾下 Message Queue的构成,这边以RocketMQ为例子:
RocketMQ主要有四大核心组成部分:NameServer、Broker、Producer以及Consumer四部分。

  • NameServer:Name Server是一个几乎无状态节点,可集群部署,节点之间无任何信息同步。NameServer 是整个 RocketMQ 的 "中央大脑 " ,它是 RocketMQ 的服务注册中心,所以 RocketMQ 需要先启动 NameServer 再启动 Rocket 中的 Broker。
  • Broker: 消息服务器,作为Server提供消息核心服务, 它接收并存储Producer生产的消息,也提供消息给Consumer消费。Broker一般会分主从,Master 可读可写,Slave 只读。
  • Producer: 消息生产者,消息的发送方,负责生产消息传输给broker。RocketMQ提供了发送:同步、异步和单向(one-way)的多种模式。
  • Consumer: 消息消费者,消息的处理方,负责从broker获取消息并进行业务逻辑处理。
    另外其他如 Topic、 Message,也是重要的组成部分:
  • Topic:主题,发布/订阅模式下的消息统一汇集地,不同生产者向topic发送消息,由MQ服务器分发到不同的订阅者,实现消息的广播
  • Message:消息体,根据不同通信协议定义的固定格式进行编码的数据包,来封装业务数据,实现消息的传输。

2.2 消息队列的幂等分析

可以看出,消息发送和消息消费两个步骤是有可能产生消息不幂等的问题。
为保证消息的正确性发送,超时重试、异常重试、消费完成确认机制等能力都是可以使用,并对业务产生影响的。
我们举个例子,如果你购买一件商品,用户付款完成之后,通过MQ消息的异步通知,告知下游服务出库和通知。如果消息通知出现了问题或者下游消息消费出现了问题,导致无法ACK,都有可能导致重复的出库和通知。

2.2.1 消息生产的幂保证

MQ消息生产部分,就是下图中的步骤1、步骤2、步骤3:

  • 步骤1:消息生产端 MQ-Client Producer 将消息发给服务端MQ-server
  • 步骤2:消息队列服务 MQ-Server 将消息持久化存储
  • 步骤3:息队列服务 MQ-Server 返回确认信息(ACK \ CONSUME_SUCCESS \ offset)给消息生产端 MQ-Client Producer

如果3 消息确认故障导致消息丢失,则消息生产端 MQ-Client Producer 超时后会重发消息,这时候可能就有重复消息,如何保证幂等呢?
因为消息重发也是MQ-Client Producer发起的,消息的处理是消息队列的服务MQ-Server处理的,MQ-Server将数据进行了持久化么,这时候我们可以设计一个唯一的 msgId,作为去重的依据,无论重发多少次,msgId都是一样的,然后在DB数据库中将这个msgId设置为unique key,不允许重复,他有如下特性:

  • 全局唯一,不允许重复
  • MQ生成与业务无耦,对消息的生产和消费也是无强相关。

使用这个 msgId,可以保证只有1条消息落地到数据库中,就保证了消息生产端的幂等。

2.2.2 消息消费的幂保证

MQ消息消费部分,就是下图中的步骤4、步骤5、步骤6:

  • 步骤4:消息队列服务 MQ-Server 将消息发给给消费端 MQ-Client Consumer
  • 步骤5:消费端 MQ-Client Consumer 返回确认信息 (ACK \ CONSUME_SUCCESS \ offset) 给 消息队列服务
  • 步骤6:消息队列服务 MQ-Server 将持久化的消息数据删除,根据msgId精确删除

★ 说明:以上步骤须做一致性保障

这边重灾区就是步骤5,如果因为故障导致消息丢失,消息队列服务 MQ-Server 在超时后会重发消息,这样 MQ-Client Producer/Consumer 就会重复收到消息。
因为消息重发是 消息队列服务 MQ-Server 发起的,MQ-Client Consumer 负责消息消费,消息重发必然会导致业务重复消费(比如重复发消息、重复出库)。所以一样的道理,必然使用msgId来做判断,如果存在库中就进行消费,然后精确删除库中的数据。如果数据库中不存在,就忽略,避免重复消费。
同样的,这个msgID的特性如下:

  • 全局唯一,不允许重复
  • MQ生成与业务无耦,对消息的生产和消费也是无强相关。
  • 业务消息消费方 MQ-Client Consumer 负责判重,保证幂等性

这种方式最常见应用在:商品下单、消费支付、帖子点赞和留言等。

2.3 总结说明

无论是何种消息队列,造成重复消费原因其实都是类似的。正常情况下,消费者在消费消息时候,消费完毕后,会发送一个确认信息给消息队列,消息队列就知道该消息被消费了,就会将该消息从消息队列中删除。
只是不同的消息队列发送的确认信息形式不同,例如RabbitMQ是发送一个ACK确认消息,RocketMQ是返回一个CONSUME_SUCCESS成功标志,kafka实际上有个offset的概念,每一个消息都有一个offset,kafka消费过消息后,需要提交offset,让消息队列知道自己已经消费过了。
那造成重复消费的原因? 就是因为网络传输等等故障,确认信息没有传送到消息队列,导致消息队列不知道自己已经消费过该消息了,再次将该消息分发给其他的消费者。
如何解决?这个问题针对业务场景来答分以下几点
(1)给这个消息做一个唯一主键,做数据库insert,如果出现重复消费情况,会导致主键冲突,避免数据库出现脏数据。
(2)update 和 delete 支持天然幂等性,拿到这个消息做redis的set的操作,那就容易了,不用解决,set操作天然幂等操作。
(3)第三方介质,来做消费记录。以redis为例,给消息分配一个全局id,只要消费过该消息,将<id,message>以K-V形式写入redis。那消费者开始消费前,先去redis中查询有没消费记录即可。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/143625.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

裸露土堆智能识别检测系统 yolo

裸露土堆智能识别检测系统基于pythonyolo计算机视觉深度学习技术&#xff0c;对现场画面中土堆裸露情况进行实时分析检测&#xff0c;若发现画面中的土堆有超过40%部分裸露&#xff0c;则判定为裸露进行抓拍预警。我们选择当下YOLO最新的卷积神经网络YOLOv5来进行裸露土堆识别检…

商用密码安全性评估

商用密码应用安全性评估&#xff08;简称“密评”&#xff09;指在采用商用密码技术、产品和服务集成建设的网络和信息系统中&#xff0c;对其密码应用的合规性、正确性和有效性等进行评估。01办理依据 GM/T0054-2018《信息系统密码应用基本要求》 《信息系统密码测评要求&…

Linux内核内存分配函数kmalloc、kzalloc和vmalloc

在内核环境中&#xff0c;常用的内存分配函数主要有kmalloc、kzalloc和vmalloc这三个。既然这三函数都能在内核申请空间&#xff0c;那么这三个函数有什么区别呢&#xff1f;如何选用呢&#xff1f; kmalloc 首先是kmalloc&#xff0c;其函数原型为 // /include/linux/slab.…

acwing基础课——质数

由数据范围反推算法复杂度以及算法内容 - AcWing 常用代码模板4——数学知识 - AcWing 基本思想&#xff1a; 首先&#xff0c;我们给出质数的定义&#xff0c;指在大于1的自然数中&#xff0c;除了1和该数自身外&#xff0c;无法被其他自然数整除的数。这里考虑三个问题&…

笔记-鼠标悬浮展示图标

鼠标悬浮展示图标 .primaryLink {color: primary-color-dark;}.primaryLink:hover {cursor: pointer;color: link-hover-color-dark;}.itemAction {display: none; }.itemMenu:hover .itemAction {display: block; }

【数据结构进阶】并查集

并查集 正如它的名字一样&#xff0c;并查集&#xff08;Union-Find&#xff09;就是用来对集合进行 合并&#xff08;Union&#xff09; 与 查询&#xff08;Find&#xff09; 操作的一种数据结构。 合并 就是将两个不相交的集合合并成一个集合。 查询 就是查询两个元素是否属…

链表常见OJ题汇总(持续更新)

目录前言一、移除链表中的元素&#xff08;多指针法&#xff09;二、反转链表&#xff08;多指针法&头插法&#xff09;三、链表的中间结点&#xff08;算数法和双指针法&#xff09;四、链表中的第K个结点&#xff08;算数法&双指针法&#xff09;五、合并两个有序链表…

vue 父子组件设置 scoped, 如何导致滚动条失效的

vue父组件的页面结构 // 调用子组件 <process-time-line :nodeArr"nodeArr"></process-time-line> 父组件的样式 <style lang"scss" scoped> ::-webkit-scrollbar {width: 0px;height: 0px;} </style>子组件的页面结构 <div …

学习C语言笔记:字符串和格式化输入/输出

学习内容&#xff1a;1.函数——strlen()&#xff1b;2.关键字——const&#xff1b;3.字符串&#xff1b;4..如何创建、存储字符串&#xff1b;5.如何使用strlen()函数获取字符串的长度&#xff1b;6.用C预处理器指令#define和ANSIC的const修饰符创建符号常量。与程序交互和使…

《Linux运维实战:Centos7.6基于docker-compose一键离线部署redis6.2.8之哨兵集群》

一、部署背景 由于业务系统的特殊性&#xff0c;我们需要面向不通的客户安装我们的业务系统&#xff0c;而作为基础组件中的redis针对不同的客户环境需要多次部署哨兵集群&#xff0c;作为一个运维工程师&#xff0c;提升工作效率也是工作中的重要一环。所以我觉得有必要针对re…

(Java高级教程)第三章Java网络编程-第一节3:网络编程必备网络知识3之IP地址、端口号

文章目录一&#xff1a;网络传输基本流程&#xff08;1&#xff09;数据包&#xff08;2&#xff09;网络传输的基本流程&#xff08;3&#xff09;具体处理过程A&#xff1a;发送数据B&#xff1a;路由转发C&#xff1a;接受数据二&#xff1a;网络中的地址&#xff08;1&…

Elasticsearch-使用入门

_cat /_cat/nodes&#xff1a;查看所有节点 接口&#xff1a;GET http://192.168.177.134:9200/_cat/nodes /_cat/health&#xff1a;查看ES健康状况 接口&#xff1a;GET http://192.168.177.134:9200/_cat/health /_cat/master&#xff1a;查看主节点信息 接口&#xff1a;G…

【Azure 架构师学习笔记】-Azure Logic Apps(3)-演示1

本文属于【Azure 架构师学习笔记】系列。 本文属于【Azure Logic Apps】系列。 接上文【Azure 架构师学习笔记】-Azure Logic Apps&#xff08;2&#xff09;-组件介绍 前言 前面两篇文章大概介绍了一些理论知识&#xff0c;但是为用而学才是最重要的&#xff0c;所以接下来做…

word排版时如何保证每张图片大小一样?

问题描述 为了保证文档的美观性&#xff0c;在对图片进行排版时&#xff0c;最好保证图片的大小一致&#xff0c;尤其是多张图片组成一张大图时。 如果一张张图片调整大小&#xff0c;那真的是毫无技术含量的耗时工作。 解决方案 在这提出一种借助表格的解决办法。比如有4张…

Parasoft发布最广泛的MISRA规则覆盖-C/C++test最新版本正式上线!

作为拥有30多年自动化软件测试经验的全球领导者Parasoft宣布发布Parasoft C/Ctest的最新2022.2版本&#xff0c;支持MISRA C:2012修正案3和MISRA C 202x的草案版本。Parasoft针对C和C软件开发的统一、完全集成的测试解决方案的最新版本&#xff0c;帮助团队实现自动化静态分析和…

【java入门系列三】java基础-控制结构

学习记录&#x1f914;分支控制if-elseswitch分支接收字符for循环控制while循环do-while打印金字塔break终止-可以用label&#xff1a;表明continue与break类似return循环中表示直接退出方法(函数)&#xff0c;主方法直接结束字符串比较trick讨论总结谢谢点赞交流&#xff01;(…

外观模式

外观模式 1.外观模式介绍 1.外观模式&#xff08;Facade&#xff09;&#xff0c;也叫“过程模式&#xff1a;外观模式为子系统中的一组接口提供一个一致的界面&#xff0c;此模式定义了一个高层接口&#xff0c;这个接口使得这一子系统更加容易使用 2.外观模式通过定义一个一…

Linux(06)之获取内核代码

Linux(06)之获取内核代码 Author&#xff1a;OnceDay Date&#xff1a;2023年1月5日 漫漫长路&#xff0c;有人对你微笑过嘛… 参考文档&#xff1a; 《Linux内核设计和实现》 1.概述 linux内核的基本架构如下&#xff1a; 所以每个处理器运行的地方只有以下可能&#xf…

带你玩转指针——指针进阶(二)

上次我们说到了函数指针&#xff0c;对于函数指针大家还不太清楚的参考&#xff0c;指针进阶&#xff08;一&#xff09;http://t.csdn.cn/z5cjM函数指针数组数组是存放相同类型的空间&#xff0c;前面我们已经学习了指针数组int* arr[10] 每个元素是int*那么我们把函数的地址存…

grpc实现c++异步非阻塞stream

grpc实现c异步非阻塞stream 参考文章 Non-blocking single-threaded streaming C servergRPC C async api doc and sample codegrpc异步stream server端demo 序言 原来一直是用着同步阻塞的grpc stream。由于不想再创建新的线程来监听grpc stream的新消息了&#xff0c;所以就…