消息队列进阶-3.消息队列常见问题解决方案

news2025/1/11 7:48:25
  • 👏作者简介:大家好,我是爱吃芝士的土豆倪,24届校招生Java选手,很高兴认识大家
  • 📕系列专栏:Spring源码、JUC源码、Kafka原理
  • 🔥如果感觉博主的文章还不错的话,请👍三连支持👍一下博主哦
  • 🍂博主正在努力完成2023计划中:源码溯源,一探究竟
  • 📝联系方式:nhs19990716,加我进群,大家一起学习,一起进步,一起对抗互联网寒冬👀

文章目录

  • 确保消息可靠传递
    • 如何知道消息丢失
    • 确保消息可靠传递
  • 消息幂等:消息不被重复消费
    • 对业务幂等的理解
    • 消息投递的几种语义
    • 用幂等性解决重复消费问题
      • 利用数据库的唯一约束实现幂等
      • 为更新的数据设置前置条件
      • 记录并检查操作
  • 消息积压问题解决方案
    • 问题分析
    • 解决方案
    • 处理经验
    • 实战举例
  • 如何确保消息的顺序消费
    • 顺序消费的难点
    • 在这里插入图片描述
    • MQ对顺序消费的支持
    • 从业务角度保证顺序消费

确保消息可靠传递

如何知道消息丢失

在这里插入图片描述

解决思路:

利用消息队列的有序性来验证是否有消息丢失。 在消息生产端,给每个发出的消息都指定一个附加一个连续底层的版本号,然后在消费端检验序号的连续性。

落地方案:

利用拦截器机制,在Producer发送消息之前的拦截器中将序号注入到消息中,在Consumer收到消息的拦截器中检测序号的连续性。

细节问题:

不能保证在topic是严格顺序的,只能保证Queue/分区的消息是有序的,发消息的必须要指定的分区,在每个分区单独监测消息序号的连续性。

一般服务都是多实例进行部署,不好协调全局的Producer的发送顺序,每个Producer分别生成各自的消息序号,附加Producer的标识,在Consumer端按照每个Producer分别来检测序号的连续性。

Consumer实例的数量最好和分区数量保持一致。

确保消息可靠传递

在这里插入图片描述

  • 生产阶段:

通过请求、确认机制来保证消息的可靠传递

  • 消息存储阶段:

如果对消息的可靠性要求非常高,通过调整Broker的参数避免因为服务器故障而丢失消息。

在RocketMQ中,可以将刷盘的方式 flushDiskType 配置为 SYCN_FLUSH 同步刷盘。

如果Broker是多个节点组成的集群,至少将消息发送到2个以上的节点,再给客户端发送确认响应。

  • 消费阶段:

客户端从MQ拉取消息后,执行用户的业务逻辑成功之后,再给MQ发送消费确认响应。

消息幂等:消息不被重复消费

应用的幂等是在分布式系统涉及时必须要考虑的一个方面,如果对幂等没有额外的考虑,那么在消息失败重新投递,或者远程服务超时重试时,可能会出现很多诡异的问题。

对业务幂等的理解

体现对于不满足幂等性的业务,在消费重复消费,会出现数据的不一致,导致业务数据错乱。

幂等数学上的概念,对一个函数(方法),使用相同的1参数,执行多次,获得的结果是一致的。

HTTP协议中有四个方法,GET/POST/PUT/DELETE,其中GET 和 DELLETE 是幂等的,而POST方法不是幂等的。

幂等的Update:

update order set status = 1 where id = 1001;

不符合幂等涉设计:

update order set price = price + 1 where id = 1001

消息投递的几种语义

为了进一步规范消息的调用,业界有许多消息队列的应用协议,其中也对消息投递标准做了一些约束。

在这里插入图片描述

  • At most once

消息在传递时,最多会被送达一次。消息可能会丢失,但是永远不会出现重复消息的问题。比如日志指标监控信息。

  • At least once

消息在传递时,至少会被送达一次。消息肯定不会丢,可能会出现重复消费。

绝大多数的应用中,都是使用At Least Once,MQ产品都支持该级别。

  • Exactly once

每条消息肯定会被传输一次且仅传输一次,并且保证送达,因为涉及发送端和生产端的各种协同机制,绝对的Exactly once级别很难实现的,通用的Exactly once方案几乎不可能存在。

用幂等性解决重复消费问题

如果我们系统消费消息的业务逻辑具备幂等性,那就不用担心消息重复的问题了,因为同一条消息,消费一次 和 消费多次对系统的影响时完全一样的。也就可以认为,消费多次等于消费一次。

从对系统的影响结果来说:

At least once + 幂等消费 = Exactly once

利用数据库的唯一约束实现幂等

举个例子来说明一下。在不考虑并发的情况下,将账号X的余额设置为100元,执行一次后对系统的影响时,账户X的余额变成了100元。只要提供的参数是100元不变,那即使再执行多少次,账户X的余额始终都是100元,不会变化,这个操作就是一个幂等的操作。

再举一个例子,将账户X的余额加100元,这个操作它就不是幂等的,每执行一次,账户余额就增加100元,执行多次和执行一次对系统的影响(也就是账户的余额)是不一样的。

可以限定,对于每个转账单每个账户只可以执行一次变更操作。转账流水表:转账单ID、账户ID、变更金额,联合主键(转账单ID、账户ID)

或者使用Redis的SETNS命令。

为更新的数据设置前置条件

另外一种实现幂等的思路是,给数据变更设置一个前置条件,如果满足条件就更新数据,否则拒绝更新数据,在更新数据的时候,同时变更前置条件中需要判断的数据。这样,重复执行这个操作时,由于第一次更新数据的时候已经变更了前置条件中需要判断的数据,不满足前置条件,则不会重复执行更新数据操作。

将账户 X 的余额增加为 100 元,增加一个前置条件:如果账户X的余额是250,才执行将余额增加100操作。在消息中带上余额,如果余额和数据库中一致,才执行。

通用解决方案呢:给数据增加一个版本号属性,通常表现为在表中添加一个版本号的列,每次更新之前,比较当前数据的版本号和消息中的版本号是否一致,如果一致则更新,如果不一致就拒绝更新数据,更新数据的同时需要将版本号+1,实现幂等设计。

记录并检查操作

如果上面提到的两种实现幂等方法都不能适用于你的场景,我们还有一种通用性最强,适用范围最广的实现幂等性方法:记录并检查操作,也称为“Token 机制或者 GUID(全局唯一 ID)机制”,实现的思路特别简单:在执行数据更新操作之前,先检查一下是否执行过这个更新操作。

具体的实现方法是,在发送消息时,给每条消息指定一个全局唯一的 ID,消费时,先根据这个 ID 检查这条消息是否有被消费过,如果没有消费过,才更新数据,然后将消费状态置为已消费。

  • 检查消费状态
  • 更新数据
  • 设置消费状态

如上的三个操作要保证原子性,才能实现幂等性。

如果不能保证原子性的话,可能会出现下面的问题。

消息的全局ID为250,操作:给ID为 38 的账号增加100元:

t0:消费者A收到消息,检查执行状态,发现消费未处理,开始执行 “ 账户增加100 ” 操作。

t1:消费者B收到消息,检查执行状态,发现消费未处理。

执行两次!

消息积压问题解决方案

问题分析

如果出现了积压,那一定是性能问题,想要解决消息从生产到消费上的性能问题,就首先要知道那些环节可能出现消息积压,然后再考虑如何解决。

  • 跟消息生产者没有关系
  • 跟消息队列本身没有关系
  • 消息消费者的消费能力不足引起的

解决方案

如果是突发问题,临时扩容,增加消费者的数量。通过扩容和降级承担流量,应急问题的处理。

其次,才是排查解决异常问题。监控、日志分析是否消费端的业务逻辑代码出现了问题,优化消费端的业务处理逻辑。

最后,如果消费端处理不足,水平扩容提升消费端并发处理能力。在扩容消费者实例的同事,必须要同步扩容Topic分区的数量,确保消费者的实例数和分区数是相同的,分区是单线程消费的。

在涉及系统的时候,一定要保障消费端的消费的性能要高于生产端生产的性能。

处理经验

还有一种消息积压的情况是,日常系统正常运转的时候,没有积压或者只有少量积压很快就消费掉了,但是某一个时刻,突然就开始积压消息并且积压持续上涨。这种情况下需要你在短时间内找到消息积压的原因,迅速解决问题才不至于影响业务。

如何排查消息积压的原因?

如果赶上大促场景,扩容消费实例,如果服务器资源不足,系统降级:关闭一些不重要的业务,减少发送方的数据量,最低限度的去运行。

实战举例

在高并发的场景中,消息积压问题,可以说如影随形,真的没办法从根本上解决。表面上看,已经解决了,但后面不知道什么时候,就会冒出一次。

参考 《苏三说技术》所举的实际情况

有天下午,产品过来说:有几个商户投诉过来了,他们说菜品有延迟,快查一下原因。

这次问题出现得有点奇怪。

为什么这么说?

首先这个时间点就有点奇怪,平常出问题,不都是中午或者晚上用餐高峰期吗?怎么这次问题出现在下午?

根据以往积累的经验,我直接看了kafka的topic的数据,果然上面消息有积压,但这次每个partition都积压了十几万的消息没有消费,比以往加压的消息数量增加了几百倍。这次消息积压得极不寻常。

我赶紧查服务监控看看消费者挂了没,还好没挂。又查服务日志没有发现异常。这时我有点迷茫,碰运气问了问订单组下午发生了什么事情没?他们说下午有个促销活动,跑了一个JOB批量更新过有些商户的订单信息。

这时,我一下子如梦初醒,是他们在JOB中批量发消息导致的问题。怎么没有通知我们呢?实在太坑了。

虽说知道问题的原因了,倒是眼前积压的这十几万的消息该如何处理呢?

此时,如果直接调大partition数量是不行的,历史消息已经存储到4个固定的partition,只有新增的消息才会到新的partition。我们重点需要处理的是已有的partition。

直接加服务节点也不行,因为kafka允许同组的多个partition被一个consumer消费,但不允许一个partition被同组的多个consumer消费,可能会造成资源浪费。

因此,为了保证Kafka系统的稳定性和性能,不建议将同一个分区分配给多个消费者组。在实际应用中,可以根据实际需求和消费者组的数量,合理调整分区数量,以提高系统的并发处理能力和负载能力。

看来只有用多线程处理了。

为了紧急解决问题,我改成了用线程池处理消息,核心线程和最大线程数都配置成了50。

大致用法如下:

  • 先定义一个线程池:
@Configuration
public class ThreadPoolConfig {

    @Value("${thread.pool.corePoolSize:5}")
    private int corePoolSize;

    @Value("${thread.pool.maxPoolSize:10}")
    private int maxPoolSize;

    @Value("${thread.pool.queueCapacity:200}")
    private int queueCapacity;

    @Value("${thread.pool.keepAliveSeconds:30}")
    private int keepAliveSeconds;

    @Value("${thread.pool.threadNamePrefix:ASYNC_}")
    private String threadNamePrefix;

    @Bean("messageExecutor")
    public Executor messageExecutor() {
        ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
        executor.setCorePoolSize(corePoolSize);
        executor.setMaxPoolSize(maxPoolSize);
        executor.setQueueCapacity(queueCapacity);
        executor.setKeepAliveSeconds(keepAliveSeconds);
        executor.setThreadNamePrefix(threadNamePrefix);
        executor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());
        executor.initialize();
        return executor;
    }
}
  • 再定义一个消息的consumer:
@Service
public class MyConsumerService {
    @Autowired
    private Executor messageExecutor;
    
    @KafkaListener(id="test",topics={"topic-test"})
    public void listen(String message){
        System.out.println("收到消息:" + message);
        messageExecutor.submit(new MyWork(message);
    }
}
  • 在定义的Runable实现类中处理业务逻辑:
public class MyWork implements Runnable {
    private String message;
    
    public MyWork(String message) {
       this.message = message;
    }

    @Override
    public void run() {
        System.out.println(message);
    }
}

果然,调整之后消息积压数量确实下降的非常快,大约半小时后,积压的消息就非常顺利的处理完了。

而对于RocketMQ来说:允许多个消费者同时消费一个队列。这种消费模式通常被称为“共享模式消费”或“广播模式消费”。

在共享模式下,多个消费者可以同时从同一个队列中接收消息。这对于需要水平扩展消费能力的场景非常有用,因为可以简单地增加消费者实例来提高整体的消费能力。每个消费者都会接收到队列中的所有消息的副本,但是每条消息只会被其中一个消费者处理。

在广播模式下,多个消费者也可以同时消费同一个队列,不同的是每个消费者都会独立地接收队列中的所有消息。这种模式适用于需要多个消费者独立处理同一份消息的场景,比如日志分析系统等。

如何确保消息的顺序消费

消息投递的顺序!

顺序消费的难点

在这里插入图片描述

MQ对顺序消费的支持

在这里插入图片描述

Kafka:在同一个分区中天然有序,如果是多分区可以通过定制的分发策略,将同一类消息分发到同一个分区中。比如订单场景,写入Kafka时通过订单ID进行分发,保证同一个订单ID的消息发送到同一个分区中。同一个订单下的消息1和消息2,如果1失败了,重发的时候会出现在消息2的后面。max.in.flight.request.per.connection 该参数可以控制客户端在等待响应之前可以发送的未确认请求的数量。

Rocket:在同一个Queue中保证有序性,如果把对应一个业务主键的消息都路由到同一个Queue中,可以实现消息的有序传输。

从业务角度保证顺序消费

  • 根据不同的业务场景,以发送端或者消费端时间戳为准
  • 每次消息发送时生成唯一递增的ID
  • 通过缓存时间戳的方式

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/1267332.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

产品软文撰写思路,媒介盒子分享

产品软文的目的是为了将产品卖出去,然而想把产品卖出去,不是靠几句话就能实现的,还需要进行多方面分析,今天媒介盒子就来和大家分享:产品软文撰写思路。 一、 产品体验分享 自己要成为自己产品的深度用户并不是一句空…

洗牙器亚马逊UL1431测试报告检测标准

洗牙器是一种电动口腔清洁工具,用于移除食物残渣和牙菌斑,提高口腔卫生水平。 亚马逊要求商家上架的产品检测报告必须是ISO17025/ILAC ISO 17025标准认可的实验室出具的合格报告。 UL测试报告是根据产品选用相应的UL标准进行测试合格后,出具…

【MySQL源码】使用CLion 远程调试MySQL源码

目录 0 准备工作 1 IDE 2 下载MySQL源码 ​编辑 一 配置CLion 1 添加远程服务器 2 配置远程服务器环境 3 升级gdb版本 4 升级CMake版本 5 修改远程服务器文件上传的目录的对应关系 5 配置cmake 7 初始化MySQL 8 启动MySQL 作为DBA工作多年,如果还是停…

InnoDB存储引擎中的锁

文章目录 概要一、需要解决的问题二、共享锁和独占锁1.1 锁定读1.2 表级别的共享锁、独占锁 三、行锁3.1 数据准备3.2 几种常见的行级锁3.3 行锁升级为表锁 概要 关于MySQL涉及到的锁,大致可以总结如下: MyISAM存储引擎在开发过程中几乎很少使用了&…

虾知(知虾):助力Shopee卖家实现市场分析和选品策略优化的神器

在如今的电商市场竞争激烈的背景下,卖家需要借助数据分析工具来了解市场需求、热销产品和竞争状况,以制定明智的选品策略。而虾知(知虾)作为一款专为Shopee卖家设计的数据分析工具,为卖家提供全面的市场分析、商品分析…

InstructDiffusion-多种视觉任务统一框架

论文:《InstructDiffusion: A Generalist Modeling Interface for Vision Tasks》 github:https://github.com/cientgu/InstructDiffusion InstructPix2Pix:参考 文章目录 摘要引言算法视觉任务统一引导训练集重构统一框架 实验训练集关键点检测分割图像…

Could NOT find resource [logback-test.xml]

修改 之后就可以正常启动了

wsj0数据集原始文件.wv1.wv2转换成wav文件

文章目录 准备一、获取WSJO数据集二、安装sph2pipe三、转换代码四、结果展示 ​ 最近做语音分离实验需要用到wsj0-2mix数据集,但是从李宏毅语音分离教程里面获取的wsj0-2mix只有一部分。从网上获取到了完整的WSJO数据集后,由于原始的语音文件后缀是wv1或…

Linux安装mongodb数据库(详细)

一、下载安装包 本文使用 tgz 方式,根据服务器类型在官网下载 MongoDB 安装包。官方地址:https://www.mongodb.com/try/download/community 下载方式如图所示: 选择版本 关于 MongoDB 的版本选择,参见如下版本差异: 1、将从官…

推荐几款免费的智能AI伪原创工具

在当今信息快速传播的时代,创作者们常常为了在激烈的竞争中脱颖而出而苦苦挣扎,而其中的一项挑战就是创作出独具创意和独特性的内容。然而,时间有限的现实让很多人望而却步。在这个背景下,免费在线伪原创工具成为了创作者们的得力…

csapp-linklab之第二阶段“输出学号”实验报告

本阶段主题是链接中的“重定位”。两次重定位,一次是绝对地址重定位,一次是PC相对地址重定位。 本题目标依旧是输出学号,反汇编phase2.o,看到学号“0000000000”已经存放在只读数据区了。现在任务就是改do_pheas的指令和重定位表…

示波器高压探头的操作说明及使用注意事项

操作说明: 连接探头衰减端的地线(鳄鱼夹)到好的接地点或可靠的接地测试端。连接BNC头到示波器的BNC输入端口。选择示波器要求的量程范围。 注意:请务必在连接测试前把高压电源关闭。 注意事项: 请勿将测试设备的接地线从地面接线柱上移开。…

“创新视频封面设计,轻松提取其他视频第一帧,让你的视频更具吸引力!“

你是否曾经为如何为自己的视频定制封面而烦恼?现在,我们为你推荐一款全新的视频封面提取工具,让你的视频封面设计更加简单、快捷! 首先,运行媒体梦工厂,在板块栏路选择“视频封面”板块。并点击“提取封面…

【C++】杨辉三角详解和C++代码示例

杨辉三角的每行第i个数是由上一行的第i-1个数和第i个数相加得到的&#xff0c;且每行的第一个数和最后一个数都是1&#xff0c;每行的中间个数等于它两肩上的数字相加。 目录 C代码输出结果8行输出15行输出25行输出 C代码 #include <iostream> #include <vector>…

如何用CHAT写一篇儿童地理入门的文章?

问CHAT&#xff1a;从初中地理知识的角度&#xff0c;以"地球&#xff0c;我的家“为标题写一篇儿童地理入门的文章&#xff0c;主要概述地球的地理特点&#xff0c;引起孩子对地球地理知识的兴趣。可以用这些相关生活场景来延伸&#xff1a;在学校上地理课时学习关于地球…

ThinkPHP的方法接收json数据问题

第一次接触到前后端分离开发&#xff0c;需要在后端接收前端ajax提交的json数据&#xff0c;开发基于ThinkPHP3.2.3框架。于是一开始习惯性的直接用I()方法接收到前端发送的json数据&#xff0c;然后用json_decode()解析发现结果为空&#xff01;但是打印出还未解析的值却打印得…

第二十章Java博客

如果一次只完成一件事情&#xff0c;很容易实现。但现实生活中&#xff0c;很多事情都是同时进行的。Java中为了模拟这种状态&#xff0c;引入了线程机制。简单地说&#xff0c;当程序同时完成多件事情时&#xff0c;就是所谓的多线程。多线程应用相当广泛&#xff0c;使用多线…

【LeetCode刷题】数组篇1

&#x1f387;数组简单题Part &#x1f308; 开启LeetCode刷题之旅 &#x1f308; 文章目录 &#x1f387;数组简单题Part&#x1f370;1.两数之和&#x1f451;思路分析1.暴力法2.哈希表法 &#x1f370;26.删除有序数组中的重复项&#x1f451;思路分析1.双指针2.利用vector…

linux安装docker(脚本一键安装配置docker)

1、创建脚本 vi initDocker.sh #安装前先更新yum&#xff0c;防止连接镜像失败 yum -y update#卸载系统之前的docker&#xff08;可选择&#xff0c;我这里直接注释了&#xff09; #yum remove docker docker-client docker-client-latest docker-common docker-latest docke…

自动化接口测试之Postman(一篇搞定)

该篇文章针对已经掌握 Postman 基本用法的读者&#xff0c;即对接口相关概念有一定了解、已经会使用 Postman 进行模拟请求的操作。 当前环境&#xff1a; Window 7 - 64 Postman 版本&#xff08;免费版&#xff09;&#xff1a;Chrome App v5.5.3 不同版本页面 UI 和部分…