RocketMQ使用(3):消息重复

news2025/1/11 14:16:15

一、问题说明

发送时消息重复

        当一条消息已被成功发送到服务端并完成持久化,此时出现了网络闪断或者客户端宕机,导致服务端对客户端应答失败。如果此时生产者意识到消息发送失败并尝试再次发送消息,消费者后续会收到两条内容相同并且Message ID也相同的消息。

投递时消息重复

        消息消费的场景下,消息已投递到消费者并完成业务处理,当客户端给服务端反馈应答的时候网络闪断。为了保证消息至少被消费一次,消息队列RocketMQ版的服务端将在网络恢复后再次尝试投递之前已被处理过的消息,消费者后续会收到两条内容相同并且Message ID也相同的消息。

负载均衡时消息重复

        包括但不限于网络抖动、Broker重启以及消费者应用重启,当消息队列RocketMQ版的Broker或客户端重启、扩容或缩容时,会触发Rebalance(再平衡),此时消费者可能会收到重复消息。

二、简单的去重解决方案

        假设我们业务的消息消费逻辑是:插入某张订单表的数据,然后更新库存。

-- 添加一条订单数据
insert into t_order values .....
-- 对应的商品库存-1
update t_inv set count = count-1 where good_id = 'good123';

        要实现消息的幂等,我们可能会采取这样的方案:

-- 查询是否存在该订单
select * from t_order where order_no = 'order123'
/*
// 在代码中判断
if(order  != null) {
    return ;//消息重复,直接返回
}

*/

        这对于很多情况下,的确能起到不错的效果,但是在并发场景下,还是会有问题。

三、并发重复消息

        假设这个消费的所有代码加起来需要 1 秒,有重复的消息在这 1 秒内(假设 100 毫秒)内到达。例如生产者快速重发,Broker 重启等。那么很可能,上面去重代码里面会发现,数据依然是空的,因为上一条消息还没消费完,还没成功更新订单状态。具体一点就是两个线程在间隔非常短甚至是同时执行这个逻辑:

select * from t_order where order_no = 'order123'

        然后发现都没有查到数据,于是走入到这个逻辑中:

if(order  != null) {
    return ;//消息重复,直接返回
}

        那么就会穿透掉检查的挡板,最后导致重复的消息消费逻辑进入到非幂等安全的业务代码中,从而引发重复消费的问题,如主键冲突抛出异常、库存被重复扣减而没释放等。要解决上面并发场景下的消息幂等问题,一个可取的方案是开启事务把 select 改成 select for update 语句,把记录进行锁定:

-- 该代码在同一事物中
select * from t_order where order_no = 'THIS_ORDER_NO' for update  //开启事务

if(order.status != null) {
    return ;//消息重复,直接返回
}

        但这样消费的逻辑会因为引入了事务包裹而导致整个消息消费可能变长,并发度下降。当然还有其他更高级的解决方案,例如更新订单状态采取乐观锁,更新失败则消息重新消费之类的。但这需要针对具体业务场景做更复杂和细致的代码开发、库表设计,不在本文讨论的范围。但无论是select for update, 还是乐观锁这种解决方案,实际上都是基于业务表本身做去重,这无疑增加了业务开发的复杂度。一个业务系统里面很大部分的请求处理都是依赖 MQ 的,如果每个消费逻辑本身都需要基于业务本身而做去重/幂等的开发的话,这是繁琐的工作量。本文希望探索出一个通用的消息幂等处理的方法,从而抽象出一定的工具类用以适用各个业务场景。

四、Exactly Once(仅一次)

        在消息中间件里,有一个投递语义的概念。而这个语义里有一个叫 Exactly Once ,即消息肯定会被成功消费,并且只会被消费一次。以下是官方文档对 Exactly Once 的解释:

Exactly-Once 是指发送到消息系统的消息只能被消费端处理且仅处理一次,即使生产端重试消息发送导致某消息重复投递,该消息在消费端也只被消费一次。

        在我们业务消息幂等处理的领域内,可以认为业务消息的代码肯定会被执行,并且只被执行一次,那么我们可以认为是 Exactly Once。但这在分布式的场景下想找一个通用的方案几乎是不可能的。不过如果是针对基于数据库事务的消费逻辑,实际上是可行的。另外,关于 Exactly-Once 再补充一些下。Exactly-Once 语义是消息系统和流式计算系统中消息流转的最理想状态,但是在业界并没有太多理想的实现。因为真正意义上的 Exactly-Once 依赖消息系统的服务端、消息系统的客户端和用户消费逻辑这三者状态的协调。例如,当您的消费端完成一条消息的消费处理后出现异常宕机,而消费端重启后由于消费的位点没有同步到消息系统的服务端,该消息有可能被重复消费。业界对于 Exactly-Once 投递语义存在很大的争议,很多人会拿出“FLP不可能理论”或者其他一致性定律对此议题进行否定,但事实上,特定场景的Exactly-Once语义实现并不是非常复杂,只是因为通常大家没有精确的描述问题的本质。如果要实现一条消息的消费结果只能在业务系统中生效一次,需要解决的只是如何保证同一条消息的消费幂等问题。消息队列 RocketMQ 版的 Exactly-Once 语义就是解决业务中最常见的一条消息的消费结果(消息在消费端计算处理的结果)在数据库系统中有且仅生效一次的问题。

1、基于关系数据库事务插入消息表

        假设我们业务的消息消费逻辑是:更新MySQL数据库的某张订单表的状态。

update t_order set status = 'SUCCESS' where order_no= 'order123';

        要实现 Exaclty Once 即这个消息只被消费一次(并且肯定要保证能消费一次),我们可以这样做。在这个数据库中增加一个消息消费记录表,把消息插入到这个表,并且把原来的订单更新和这个插入的动作放到同一个事务中一起提交,就能保证消息只会被消费一遍了。流程看起来像是这样的:

  1. 开启事务
  2. 插入消息表(处理好主键冲突的问题)
  3. 更新订单表(原消费逻辑)
  4. 提交事务

        这时候如果消息消费成功并且事务提交了,那么消息表就插入成功了。这时候就算 RocketMQ 还没有收到消费位点的更新,从而再次投递,也会插入消息失败而视为已经消费过,后续就直接更新消费位点了。这保证我们消费代码只会执行一次。如果事务提交之前服务挂了(例如重启),对于本地事务并没有执行所以订单没有更新,消息表也没插入成功。而对于RocketMQ服务端来说,消费位点也没更新,所以消息还会继续投递下来,投递下来发现这个消息插入消息表也是成功的,所以可以继续消费。这保证了消息不丢失。事实上,阿里云的 RocketMQ 的 EXACTLY-ONCE 语义的实现上,就是类似这个方案基于数据库的事务特性实现的:

        基于这种方式,的确这是有能力拓展到不同的应用场景,因为它的实现方案与具体业务本身无关——而是依赖一个消息表。但是这里有它的局限性:消息的消费逻辑必须是依赖于关系型数据库事务。如果消费的消费过程中还涉及其他数据的修改,例如 Redis 这种不支持事务特性的数据源,则这些数据是不可回滚的。还有,数据库的数据必须是在一个库,跨库无法解决。另外,需要特别注意的是:在业务上,消息表的设计不应该以消息 ID 作为标识,而应该以业务的业务主键作为标识更为合理,以应对生产者的重发。

2、更复杂的业务场景

        如上所述,这种方式 Exactly Once 语义的实现,实际上有很多局限性,这种局限性使得这个方案基本不具备广泛应用的价值。且由于基于事务,可能导致锁表时间过长等性能问题。例如我们以一个比较常见的一个订单申请的消息来举例,可能有以下几步:

  • 检查库存(RPC)
  • 锁库存(RPC)
  • 开启事务,插入订单表(MySQL)
  • 调用某些其他下游服务(RPC)
  • 更新订单状态
  • commit 事务(MySQL)

        这种情况下,我们如果采取消息表+本地事务的实现方式,消息消费过程中很多子过程是不支持回滚的,也就是说就算我们加了事务,实际上这背后的操作并不是原子性的。怎么说呢?就是说有可能第一条消息在经历了第二步锁库存的时候,服务重启了,这时候实际上库存是已经在另外的服务里被锁定了,这并不能被回滚。当然消息还会再次投递下来,要保证消息能至少消费一遍,换句话说,锁库存的这个RPC接口本身依旧要支持“幂等”。再者,如果在这个比较耗时的长链条场景下加入事务的包裹,将大大的降低系统的并发。所以通常情况下,我们处理这种场景的消息去重的方法还是会使用一开始说的业务自己实现去重逻辑的方式,如前面加 select for update,或者使用乐观锁。

3、更通用的解决方案

        上面消息表+本地事务的方案之所以有其局限性和并发的短板,究其根本是因为它依赖于关系型数据库的事务,且必须要把事务包裹于整个消息消费的环节。如果我们能不依赖事务而实现消息的去重,那么方案就能推广到更复杂的场景例如:RPC、跨库等。例如,我们依旧使用消息表,但是不依赖事务,而是针对消息表增加消费状态,是否可以解决问题呢?接下来就要祭出基于消息幂等表的非事务方案了。

        我们这个方案实际上没有事务的,只需要一个存储的中心媒介,那么自然我们可以选择更灵活的存储媒介,例如Redis。使用Redis有两个好处:

  • 性能上损耗更低
  • 上面我们讲到的超时时间可以直接利用Redis本身的ttl实现

        当然Redis存储的数据可靠性、一致性等方面是不如MySQL的,需要用户自己取舍。使用这个方法能保证正常的消费逻辑场景下(无异常,无异常退出),消息的幂等工作全部都能解决,无论是业务重复,还是 RocketMQ 特性带来的重复。事实上,这已经能解决 99% 的消息重复问题了,毕竟异常的场景肯定是少数的。那么如果希望异常场景下也能处理好幂等的问题,可以做以下工作降低问题率:

  1. 消息消费失败做好回滚处理。如果消息消费失败本身是带回滚机制的,那么消息重试自然就没有副作用了。
  2. 消费者做好优雅退出处理。这是为了尽可能避免消息消费到一半程序退出导致的消息重试。
  3. 一些无法做到幂等的操作,至少要做到终止消费并告警。例如锁库存的操作,如果统一的业务流水锁成功了一次库存,再触发锁库存,如果做不到幂等的处理,至少要做到消息消费触发异常(例如主键冲突导致消费异常等)
  4.  在第 3 步做好的前提下,做好消息的消费监控,发现消息重试不断失败的时候,手动做好第 1 步的回滚,使得下次重试消费成功。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/1692003.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

vue项目elementui刷新页面弹窗问题

bug:每次刷新页面都有这个鬼弹窗。 刚开始以为是自己的代码问题,于是我翻遍了每一行代码,硬是没找出问题。 后来在网上找了些资料,原来是引入的问题。 解决方案: 改一下引入方式即可。 错误姿势 import Vue from …

Autodesk 3ds Max下载,3ds MAX 2024三维建模渲染软件安装包下载安装

3ds MAX中文版,其强大的功能和灵活的操作为广大用户提供了无限的创意空间,使得高质量动画、最新游戏、设计效果等领域的制作需求得以完美满足。 ​ 作为一款三维建模软件,3ds MAX中文版具备极高的建模精度和渲染质量。它支持多种建模方式&am…

Golang项目代码组织架构实践

Golang在项目结构上没有强制性规范,虽然这给了开发者很大的自由度,但也需要自己沉淀一套可行的架构。本文介绍了一种项目布局,可以以此为参考设计适合自己的 Golang 项目组织模式。原文: Golang Project Layout Go 有很多强制的或是约定俗成的…

Python学习---基于TCP协议的网络通信程序案例

TCP简介: ●TCP 面向连接、可靠的、基于字节流的传输控制协议 ●TCP的特点 ○面向连接 ○可靠传输 ■应答机制 ■超时重传 ■错误校验 ■流量管控 ●TCP通信模型 TCP严格区分客户…

2024年5月25日 十二生肖 今日运势

小运播报:2024年5月25日,星期六,农历四月十八 (甲辰年己巳月己丑日),法定节假日。 红榜生肖:鸡、鼠、猴 需要注意:马、狗、羊 喜神方位:东北方 财神方位:…

篮球论坛|基于SprinBoot+vue的篮球论坛系统(源码+数据库+文档)

篮球论坛系统 目录 基于SprinBootvue的篮球论坛系统 一、前言 二、系统设计 三、系统功能设计 1系统功能模块 2管理员功能模块 3用户功能模块 四、数据库设计 五、核心代码 六、论文参考 七、最新计算机毕设选题推荐 八、源码获取: 博主介绍&#xff…

抖音运营_打造高流量的抖音账号

目录 一 账号定位 行业定位 用户定位 内容定位 二 账号人设 我是谁? 我的优势 我的差异化 三 创建账号 名字 头像 简介 四 抖音养号 为什么要养号? 抖音快速养号 正确注册抖音账号 一机一卡一号 实名认证 正确填写账号信息 养号期间的操作 五…

OpenWrt 23.05 安装中文语言包 教程 软路由实测 系列三

1 web 登录 #更改阿里云下载源,可参考第一篇文章:OpenWrt U盘安装使用 详细教程 x86/64平台 软路由实测 系列一-CSDN博客

如何网页在线编辑 Office word 文档,并支域功能:创建域/插入域/替换域等

在日常在线办公场景中,我们经常会遇到一些复杂的文档编辑需求,特别是我们经常会遇到一些复杂的数学公式,会用到“域”功能,“域”功能便是一个高级且实用的工具。通过设置域,用户可以实现文档的自动化处理,…

聚观早报 | 华为畅享 70S真机图赏;vivo Y200 GT开售

聚观早报每日整理最值得关注的行业重点事件,帮助大家及时了解最新行业动态,每日读报,就读聚观365资讯简报。 整理丨Cutie 5月25日消息 华为畅享 70S真机图赏 vivo Y200 GT开售 一加13部分细节曝光 马斯克谈AI未来 三星Galaxy Z Fold6将…

轻量级 K8S 环境 安装minikube

文章目录 操作系统DockerDocker CE 镜像源站使用官方安装脚本自动安装 (仅适用于公网环境)安装校验Docker代理docker permission denied while trying to connect to the Docker daemon socket minikubekubectl工具minikube dashboard参考资料 操作系统 …

[图解]产品经理创新之阿布思考法

0 00:00:00,000 --> 00:00:01,900 那刚才我们讲到了 1 00:00:02,730 --> 00:00:03,746 业务序列图 2 00:00:03,746 --> 00:00:04,560 然后怎么 3 00:00:05,530 --> 00:00:06,963 画现状,怎么改进 4 00:00:06,963 --> 00:00:09,012 然后改进的模式…

简洁实用视频播放器-PotPlayer

一、前言 PotPlayer 是一款简洁实用的视频播放器。 发现的确是良心软件,只有20M 的大小,占内存是同类软件最低的。不要小看它那么小巧简洁,但也很强大的,支持强劲的加速引擎,同时支持3D 视频。 同时支持多种编码和字…

生产物流智能优化系统

对生产调度、物流调度【车辆路径问题、配送中心拣选问题】智能优化算法研究形成系统性程序,逐步开发设计一个智能优化系统【包括:问题说明、实验界面、算法结构和算法程序应用说明】, 当前完成TSP和集送车辆路径的算法程序,程序效…

移动端仪表盘,支持更多组件

05/22 主要更新模块概览 定位函数 快捷筛选 轨迹图表 时间组件 01 表单管理 1.1 【表单组件】- 表单关联新增支持自定义按钮样式 说明: 表单关联-关联数据按钮,原仅支持默认按钮样式,现增加关联数据按钮自定义功能,满…

生活小区火灾预警新篇章:泵吸式可燃气体报警器的检定与运用

在现代化的生活小区中,燃气设备广泛应用于居民的日常生活之中,但同时也带来了潜在的火灾风险。 可燃气体报警器作为一种安全监测设备,能够及时检测到燃气泄漏等安全隐患,并在达到预设的阈值时发出警报,提醒居民采取相…

Doris集群安装部署

Doris集群安装部署 一、环境搭建 1、环境准备 主机名IP角色doris1192.168.100.131Frotend,Backenddoris2192.168.100.132Backenddoris3192.168.100.133Backend 2、Doris整体架构 Frontend(FE) 主要负责用户请求的接入、查询解析规划、元数据的管理…

Python学习---基于HTTP的服务端基础框架搭建案例

整体功能: 1 创建框架构建相关的文件夹 2 创建app,模块文件 3 在 app模块文件中创建application函数(用于处理请求) 4 将request_handler()中的处理逻辑交由app模块的application函数完成 5 app模块的 application函数返回响应报文 6 在application 文件夹中创建一个…

blender 布尔运算,切割模型。

1.创建一个立方体和球体。 2.选中立方体,在属性面板添加布尔修改器。点击物体属性右边的按钮选中球体。参数如下。 3.此时隐藏球体,就可以看到被切掉的效果了。

TENT: FULLY TEST-TIME ADAPTATION BY ENTROPY MINIMIZATION--论文笔记

论文笔记 资料 1.代码地址 https://github.com/DequanWang/tent 2.论文地址 https://arxiv.org/abs/2006.10726 1论文摘要的翻译 在这种完全测试时适应的情况下,模型只有测试数据和自身参数。我们建议通过测试熵最小化(tent)进行适应&…