【程序大侠传】服务发布引发mq消息重复消费

news2025/1/15 19:51:09

前序

在编程武侠世界中,有一个门派“天机楼”,连接并协调各大门派之间的关系,确保整个江湖的运作流畅无阻。天机楼住要的业务范围主要如下:

  • 信息传递的信使:
    天机楼就像是江湖中的飞鸽传书,确保各门派之间的信息能够快速、准确地传递。无论是战斗指令、情报交换还是紧急求援,天机楼都能可靠地完成任务。
  • 系统稳定的守护者:
    它如同一位隐形的护法,时刻监控着江湖的运作,确保各门派的系统稳定运行,避免因系统故障而引发的江湖动乱。
  • 性能优化的高手:
    天机楼精通各种优化技巧,能够在复杂的江湖环境中找到最佳的解决方案,提升系统的性能,让各系统的操作更加高效与流畅。
  • 负载均衡的调度者:
    如同武林盟主一般,天机楼可以合理调配各门派的资源,确保每个门派都能均衡发展,避免资源过度集中或分配不均。
  • 问题解决的医者:
    当江湖中出现问题时,天机楼能够迅速诊断并修复问题,像神医华佗一样,确保江湖的和平与稳定。

天机楼的武器:

  • 消息队列:
    如同传递消息的飞鸽,确保信息快速且准确地到达目的地。
  • 缓存系统:
    犹如藏在暗处的密库,能快速提供所需的资源。
  • 负载均衡:
    如同武林盟主,能够调配各系统的资源,确保每个系统都能均衡发展。

阿强所在的世界中,天机楼一直是基石般的存在,这个神秘的组织中每个人的技术功力都非常强悍。以至于天机楼虽然人数不多,但是地位与实力却一直很高。此组织成员一般散落在世界各地,统一由天机阁管理。而天机阁中的成员所接的任务难度跟积分跟平常门派所发布的任务都要高出不少,也正是因为如此,很多觉得自己实力还不错的人都想加入其中,但很多人都低估 了其考核难度,而此组织如果你实力没有达到他的考核标准是没有其他的途径进入的。而加入天机楼除了能够获取高积分的任务之外,还有的好处是,此组织跟自己的门派不冲突,也就是说,天机楼的身份不影响你可以在任何门派中担任职位。而阿强就是除了是目前所在代码剑宗的武林高手身份外,也在天机阁中担任中级侠客一职。而所谓的天机阁内部也有着自己的一套等级划分:

  • 初级侠客(新手):
    职责:负责基础的中间件配置和维护工作,处理常见的小问题和简单的优化任务。
    能力:掌握基础的中间件技术,能够进行基本的安装、配置和常见问题排查。
  • 中级侠客(高手):
    职责:负责复杂的中间件部署和优化任务,能够独立解决较为复杂的问题。
    能力:精通常见的中间件技术,具备较强的系统优化和问题解决能力,能够进行性能调优和负载均衡配置。
  • 高级侠客(大侠):
    职责:负责整个系统的中间件架构设计和优化,能够处理高难度的技术问题和进行系统级的优化。
    能力:深谙各种中间件技术,具备系统架构设计能力,能够进行复杂的系统集成和全面的性能优化。
  • 宗师(顶级专家):
    职责:负责中间件技术的战略规划和创新,领导团队进行技术攻关和前沿技术探索。
    能力:拥有深厚的技术积累和丰富的实战经验,能够在技术上引领团队,推动中间件技术的发展和创新。

阿强处理完上次门派中的pagehelper的紧急任务后就一直沉浸在进行门派中的另一个开发任务中,不知道过了多久,阿强满脸兴奋地从自己的洞府走出,他经过这段时间的闭关,终于将那个开发任务给开发完并交给断点神教小美测试,此时的他一身轻松地望向天空不禁有些恍惚,大约过了2刻钟,天机阁突然发布了一条任务“L服务出现mq重新消费,请及时处理”,本来此时的阿强就是无事一身轻,正想着去天机阁找个任务,因此看到任务这么合时宜地发布,阿强毫不犹豫就认领了下来,不多时,阿强就全身心地投入到此任务中…。

第四章 什么?mq重复消费了?

15分钟过后,阿强了解到此次的问题暴露是由于L服务侧下游F服务(没错,就是上次pagehelper 分页sql问题服务)生成了两条一样的由L服务调用产生的订单。而L调用F服务生成订单的时序图如下:
在这里插入图片描述

从时序图不难看出,这个生成订单的过程是一个异步过程,而L服务的异步处理是通过MQ来实现。知道了整体交互逻辑的阿强打开idea查看L服务中MQ的消费逻辑:

//下面是L服务消息消费的伪代码
public MsgStatus onMessage(ConsumeMessage msg){
		log.info("【xxx异步处理】接收MQ消息:{}", message);
		if (StringUtils.isBlank(message)) {
            return MsgStatus.SUCCEED;
        }
        Entity entity = dataConversion();//数据转换
        LOrderEntity  LOrderEntity = LOrderRepository.selectByLOrderId(entity.getLOrderId());
        //幂等校验,但是在某些场景下没有用,如本案例中
		if(StringUtils.isNotEmpty(LOrderEntity.getTaskId()))){
			log.warn("【xxx异步处理】mq重复发送消息");
			return MsgStatus.SUCCEED;
		}
		//构建请求入参
		FOrderReq req = buildReq(LOrderEntity,msg);
		RpcResponse res = FRpc.createOrder(req);
		//构建更新字段实体
		LOrderEntity  LOrderEntity1  = buildLOrderField(res.getTaskId(),res.getStates());
		LOrderRepository.updateOrder(LOrderEntity1);
        return MsgStatus.SUCCEED;
    }

阿强梳理完L服务的异步消息处理逻辑,随即就开始通过天书法器查看L服务的日志,不多时,阿强就通过“【xxx异步处理】接收MQ消息”关键字在天书上看到了两条很奇怪的日志,这两条日志都是mq消费逻辑打印出来,且后面输出的mq消息体字段全都是一样。
在这里插入图片描述
看到两条一样的消息体,阿强陷入了沉思,随即他又通过“发送mq”关键字,在这条链路上搜索,发现只出现了一条日志,也就是说,mq生产者只发送了一条消息,却消费了两次。
在这里插入图片描述
看到这种情况,阿强脑子里面浮现几种猜想,第一种是:天机楼的消息处理发生抖动导致的重复消费;第二种是:L服务的消费者没有去ack。此时的阿强也没办法确定是那种情况导致的重复消费,此时最好的办法就是使用排除法。同身为天机阁成员,阿强有权限去查看某个消息的一个消费情况跟整个消息处理服务的监控。当阿强打开天眼系统查看消息处理服务的监控,发现整个服务的指标都很正常,并没有什么抖动。此时第一种猜想已经验证是没有问题的,阿强快马加鞭地开始第二种猜想验证。只见他打开天书系统的链路耗时
在这里插入图片描述
发现整个消费的耗时远远没有达到消息ack处理超时时间3分钟的。此时的阿强双眉紧凑,脑中已经开始了头脑风暴。如果没有达到消息ack处理超时时间,那么还会有什么场景会让mq消息ack失效呢?阿强在脑中重新回顾了一下RocketMQ的ack机制(因为L系统使用的RocketMQ消息中间件):

  1. 消息消费确认机制
    RocketMQ 采用了“消费者主动确认”的机制,即消费者在成功处理完消息后,主动向 Broker 发送 ACK 确认。这与一些消息队列系统的自动确认机制不同,能够确保消息被成功处理后才被确认。
  2. 消息消费过程
    消息发送:生产者将消息发送到 RocketMQ Broker。
    消息存储:Broker 接收到消息后,将消息存储在磁盘中,并将消息写入 CommitLog。
    消息拉取:消费者从 Broker 拉取消息进行消费。
    消息处理:消费者接收消息并进行处理。
    确认消息:消息处理成功后,消费者向 Broker 发送 ACK 确认。
  3. 消息重试机制
    如果消费者在处理消息过程中出现异常或失败,没有发送 ACK 确认,RocketMQ 会认为该消息未被成功消费,并会进行重试。重试机制确保消息不会丢失,但可能会出现消息重复消费的情况。为此,消费者需要实现幂等性处理。

此时的阿强已经没有一开始接收这个任务时轻松的心态,他的脑海不断浮现各种知识内容,去思考可能出现场景。大约过了2小时,阿强的眼睛闪过一丝光亮,他连忙打开了天剑部署系统去查看最后一次L服务部署时间,不多时,阿强深沉的眼神中闪过一丝兴奋,嘴角露出了一丝笑容。但是为了确认自己的猜想,他回顾了一下rocketmq消费的存储方式、消费模式:

  1. 消费进度的存储方式
    RocketMQ 支持两种消费进度(offset)的存储方式:
    Broker 端存储:消费进度保存在 Broker 上,消费者重启后会从 Broker 获取最新的消费进度。
    消费者本地存储:消费进度保存在本地磁盘,消费者重启后会从本地磁盘读取消费进度。
    如果消费进度存储在 Broker 上,那么即使消费者重新部署,重新启动后也会从 Broker 获取最新的消费进度,避免消息重复消费的情况。
  2. 消费模式
    RocketMQ 支持两种消费模式:
    集群消费(Clustering):同一个消费组内的多个消费者实例会均摊消息,每条消息只会被其中一个消费者实例消费。当某个消费者实例重启时,其他实例会接管它的消费任务。
    广播消费(Broadcasting):每个消费者实例都会消费所有的消息。当某个消费者实例重启时,重新启动后会重新消费所有未确认的消息。

再结合L项目的消费模式,真相大白了,L项目当时由于有人在重启项目,消息还未ack,机器重启导致的消息重复消费。知道原因的阿强呼出一大口浊气,脸上的眉目也放松下来。接下来,就只需怎么处理这种特殊场景。
在这里插入图片描述
怎么处理这种问题,阿强心里已经有了方案,第一种方案是让F服务做幂等处理;第二种方案是在L服务的mq消息消费逻辑里面做一个幂等处理。最终阿强结合本次任务选择了第二种方案,他用idea
打开了L系统的代码,并进行了一个改造:

//下面是L服务消息消费的伪代码
public MsgStatus onMessage(ConsumeMessage msg){
		log.info("【xxx异步处理】接收MQ消息:{}", message);
		if (StringUtils.isBlank(message)) {
            return MsgStatus.SUCCEED;
        }
        Entity entity = dataConversion();//数据转换
        RedisLock lock = RedisClientManagement.createLock(entity.getUserId,entity.getApplyNo());
        try {
        	if (lock.blockAcquireLock(RedisTimeOut.FIVE_MINUTE,RedisTimeOut.SECOUND)) {
		        LOrderEntity  LOrderEntity = LOrderRepository.selectByLOrderId(entity.getLOrderId());
		        //幂等校验,但是在某些场景下没有用,如本案例中
				if(StringUtils.isNotEmpty(LOrderEntity.getTaskId()))){
					log.warn("【xxx异步处理】mq重复发送消息");
					return MsgStatus.SUCCEED;
				}
				//构建请求入参
				FOrderReq req = buildReq(LOrderEntity,msg);
				RpcResponse res = FRpc.createOrder(req);
				//构建更新字段实体
				LOrderEntity  LOrderEntity1  = buildLOrderField(res.getTaskId(),res.getStates());
				LOrderRepository.updateOrder(LOrderEntity1);
        	}
        }else{
          log.info("Existing lock key = {}",key);
        }
        }catch (Exception e) {
           log.error("消费异常,不重新消费",e);
           return MsgStatus.SUCCEED;
        } finally {
            lock.releaseLock();
        }

        return MsgStatus.SUCCEED;
    }

当阿强把改完的代码提交完,恋恋不舍地看了看美好世界,一会后又开始进行回到洞府内修炼起来…

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/1931946.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

学生管理系统(C语言)(Easy-x)

课 程 报 告 课 程 名 称: 程序设计实践 专 业 班 级 : XXXXX XXXXX 学 生 姓 名 : XXX 学 号 : 231040700302 任 课 教 师 &a…

电瓶车检测AI算法:视频智能分析技术助力电瓶车规范与安全管理

随着电瓶车(电动自行车)的普及,其在城市交通中扮演着越来越重要的角色。然而,电瓶车的管理、安全监控以及维护等方面也面临着诸多挑战。近年来,人工智能(AI)技术的发展为解决这些问题提供了新的…

Ubuntu安装virtualbox(win10)

virtualbox下载安装 1、下载virtualbox 下载路径:Linux_Downloads – Oracle VM VirtualBox 根据自己的Ubuntu版本选择对应的安装包下载 2、安装virtualbox 到下载路径(一般为~/Download)打开终端输入命令 sudo dpkg -i xxx.deb 继续执…

求解答word图标变白

把WPS卸载了之后就变成白色了,然后在注册表中把word的地址改成office word的地址之后图标变成这样了,怎么办

【漏洞复现】Rejetto HTTP文件服务器——远程命令执行(CVE-2024-23692)

声明:本文档或演示材料仅供教育和教学目的使用,任何个人或组织使用本文档中的信息进行非法活动,均与本文档的作者或发布者无关。 文章目录 漏洞描述漏洞复现测试工具 漏洞描述 Rejetto HTTP文件服务器是一个轻量级的HTTP服务器软件&#xff…

电源芯片MPQ3431A

一、芯片介绍 MPQ3431A是一款具有宽输入范围的固定频率为450kHz的高度集成的升压转换器,其输入电压低至2.7V,采用恒定关断时间(COT)的控制拓扑,可提供快速的瞬态响应。芯片支持通过MODE管脚配置PSM(pulse-…

redis基本类型和订阅

redis-cli -h <host> -p <port> -a <password> 其中&#xff0c;< host>是Redis服务器的主机名或IP地址&#xff0c;< port>是Redis服务器的端口号&#xff0c;< password>是Redis服务器的密码&#xff08;如果有的话&#xff09;。 set …

FPGA CFGBVS 管脚接法

说明 新设计了1个KU040 FPGA板子&#xff0c;回来之后接上JTAG FPGA不识别。做如下检查&#xff1a; 1、电源测试点均正常&#xff1b; 2、查看贴片是否有漏焊&#xff0c;检查无异常&#xff0c;设计上NC的才NC&#xff1b; 3、反复检查JTAG接线是否异常&#xff0c;贴片是…

Large Language Model系列之一:语言模型与表征学习(Language Models and Representation Learning)

语言模型与表征学习&#xff08;Language Models and Representation Learning&#xff09; 1 语言模型 N-Gram模型 from collections import defaultdictsentences [The swift fox jumps over the lazy dog.,The swift river flows under the ancient bridge.,The swift br…

C语言 ——— 编写代码,判断 整型数组 是否 有序

目录 题目要求 代码实现 题目要求 判断 整型数组 是否有序 如果 整型数组 有序输出 sorted&#xff1b;否则输出 unsorted 代码实现 #include<stdio.h> int main() {int arr[10] { 0 };int sz sizeof(arr) / sizeof(arr[0]);//输入for (int i 0; i < sz; i){s…

大数据基础:Doris重点架构原理

文章目录 Doris重点架构原理 一、Apache Doris介绍 二、Apache Doris使用场景 三、Apache Doris架构原理 四、Apache Doris 特点 Doris重点架构原理 一、Apache Doris介绍 基于 MPP 架构的高性能、实时的分析型数据库&#xff0c;以极速易用的特点被人们所熟知&#xff…

YOLOV5的输出[1,25200,85]如何理解和解析

1、25200代表着检测框的数量&#xff0c;比如我们取出第一个检测框a&#xff0c;也就是[1,1&#xff0c;85]&#xff0c;取出来之后我们解析85&#xff0c;前五个为box的中点坐标、长宽值以及置信&#xff0c;后面80我们取Max&#xff08;80个类别&#xff09;中最大值&#xf…

【解决】多个网卡导致nacos注册的服务ip有误问题

解决办法 在本地idea中启动的时候添加启动配置&#xff1a; 方法一 -Dspring.cloud.inetutils.preferred-networks你自己网卡的ip 方法二 -Dspring.cloud.nacos.discovery.ip你自己网卡的ip

朴素模式匹配算法与KMP算法(非重点)

目录 一. 朴素模式匹配算法1.1 什么是字符串的匹配模式1.2 朴素模式匹配算法1.3 通过数组下标实现朴素模式匹配算法 二. KMP算法2.1 算法分析2.2 用代码实现&#xff08;只会出现在选择题&#xff0c;考察代码的概率不大&#xff09; 三. 手算next数组四. KMP算法的进一步优化4…

【CH32V303RCT6】NB模块在CTwing下的OTA升级[实操过程篇]

一、 本篇内容简介 本篇主要通过日志打印和一些云端的显示&#xff0c;来讲解整个SOTA升级的过程。 二、实验现象 2.1、目标代码 2.2、最终效果 当我们升级成功后&#xff0c;会跳转到APP_B执行程序。 三、设计思路 根据电信云平台的OTA设计特性&#xff0c;本次设计是通过…

【自撰写】【国际象棋入门】第11课 对局实例分析(一)

第11课 对局实例分析&#xff08;一&#xff09; 本次课中&#xff0c;我们来分析一例真实的对局。对局弈于“国象联盟”APP&#xff0c;日期为2024年6月13日星期四&#xff0c;我执黑。开局伊始&#xff0c;白方的布局略占优势&#xff0c;中局阶段黑方一直保持着微弱的领先&…

[AWS]CodeCommit的创建与使用

背景&#xff1a;CodeCommit是AWS自带的代码管理仓库&#xff0c;使用起来很不顺手&#xff0c;不如自建的gitlab仓库。不足之处很多&#xff0c;比如&#xff1a;缺乏可视化工具、用户管理麻烦&#xff0c;仓库管理手段贫瘠。 老板为了简单就使用了CodeCommit进行管理&#xf…

一个用于管理多个 Node.js 版本的安装和切换开源工具

大家好&#xff0c;今天给大家分享一个用于管理多个Node.js版本的工具 NVM&#xff08;Node Version Manager&#xff09;&#xff0c;它允许开发者在同一台机器上安装和使用不同版本的Node.js&#xff0c;解决了版本兼容性问题&#xff0c;为开发者提供了极大的便利。 在开发环…

Mongodb多键索引中索引边界的混合

学习mongodb&#xff0c;体会mongodb的每一个使用细节&#xff0c;欢迎阅读威赞的文章。这是威赞发布的第93篇mongodb技术文章&#xff0c;欢迎浏览本专栏威赞发布的其他文章。如果您认为我的文章对您有帮助或者解决您的问题&#xff0c;欢迎在文章下面点个赞&#xff0c;或者关…

ubuntu搭建harbor私仓

1、环境准备 链接: https://pan.baidu.com/s/1q4XBWPd8WdyEn4l253mpUw 提取码: 7ekx --来自百度网盘超级会员v2的分享 准备一台Ubuntu 机器:192.168.124.165 将上面两个文件考入Ubuntu上面 2、安装harbor 安装Docker Harbor仓库以容器方式运行,需要先安装好docker,参考:…