重复消费和堆积

news2024/11/23 22:37:24

接受消息会重复这一现状,然后通过一些方法来消除重复消息对业务的影响

利用幂等性解决重复消息问题

幂等(其任意多次执行所产生的影响均与一次执行的影响相同。)

一个幂等的方法,使用同样的参数,对它进行多次调用和一次调用,对系统产生的影响是一样的。所以,对于幂等的方法,不用担心重复执行会对系统造成任何改变。

从对系统的影响结果来说:At least once + 幂等消费 = Exactly once。

实现幂等性操作,最好的发那首歌hi就是 从业务逻辑设计上入手,将消费的业务逻辑设计成具备幂等性的操作

一、利用数据库唯一约束实现幂等

消息id和 账户id作为唯一标识,这样就实现了一个幂等的操作。我们只要写一个 SQL,正确地实现它就可以了。

只要是支持类似“INSERT IF NOT EXIST”语义的存储类系统都可以用于实现幂等,比如,你可以用 Redis 的 SETNX 命令来替代数据库中的唯一约束,来实现幂等消费。

二、为更新的数据设置前置条件

​ 比如,“将账户 X 的余额增加 100 元”这个操作并不满足幂等性,我们可以把这个操作加上一个前置条件,变为:“如果账户 X 当前的余额为 500 元,将余额加 100 元”,这个操作就具备了幂等性。对应到消息队列中的使用时,可以在发消息时在消息体中带上当前的余额,在消费的时候进行判断数据库中,当前余额是否与消息中的余额相等,只有相等才执行变更操作。

​ 如果我们要更新的数据不是数值,或者我们要做一个比较复杂的更新操作怎么办?用什么作为前置判断条件呢?更加通用的方法是,给你的数据增加一个版本号属性,每次更数据前,比较当前数据的版本号是否和消息中的版本号一致,如果不一致就拒绝更新数据,更新数据的同时将版本号 +1,一样可以实现幂等更新。

三、记录并检查操作

记录并检查操作,也称为“Token 机制或者 GUID(全局唯一 ID)机制”,实现的思路特别简单:在执行数据更新操作之前,先检查一下是否执行过这个更新操作。

在发送消息时,给每条消息指定一个全局唯一的 ID,消费时,先根据这个 ID 检查这条消息是否有被消费过,如果没有消费过,才更新数据,然后将消费状态置为已消费。

原理和实现是不是很简单?其实一点儿都不简单,在分布式系统中,这个方法其实是非常难实现的。首先,给每个消息指定一个全局唯一的 ID 就是一件不那么简单的事儿,方法有很多,但都不太好同时满足简单、高可用和高性能,或多或少都要有些牺牲。更加麻烦的是,在“检查消费状态,然后更新数据并且设置消费状态”中,三个操作必须作为一组操作保证原子性,才能真正实现幂等,否则就会出现 Bug。

比如说,对于同一条消息:“全局 ID 为 8,操作为:给 ID 为 666 账户增加 100 元”,有可能出现这样的情况:

  • t0 时刻:Consumer A 收到条消息,检查消息执行状态,发现消息未处理过,开始执行“账户增加 100 元”;
  • t1 时刻:Consumer B 收到条消息,检查消息执行状态,发现消息未处理过,因为这个时刻,Consumer A 还未来得及更新消息执行状态。

这样就会导致账户被错误地增加了两次 100 元,这是一个在分布式系统中非常容易犯的错误,一定要引以为戒。

对于这个问题,当然我们可以用事务来实现,也可以用锁来实现,但是在分布式系统中,无论是分布式事务还是分布式锁都是比较难解决问题。

消息积压

消息积压的直接原因,一定是系统中的某个部分出现了性能问题,来不及处理上游发送的消息,才会导致消息积压。

优化性能来避免消息积压

发送端性能优化

​ 发送端业务代码的处理性能,实际上和消息队列的关系不大,因为一般发送端都是先执行自己的业务逻辑,最后再发送消息。如果说,你的代码发送消息的性能上不去,你需要优先检查一下,是不是发消息之前的业务逻辑耗时太多导致的。

我们之前的课程中讲过 Producer 发送消息的过程,Producer 发消息给 Broker,Broker 收到消息后返回确认响应,这是一次完整的交互。假设这一次交互的平均时延是 1ms,我们把这 1ms 的时间分解开,它包括了下面这些步骤的耗时:

  • 发送端准备数据、序列化消息、构造请求等逻辑的时间,也就是发送端在发送网络请求之前的耗时;
  • 发送消息和返回响应在网络传输中的耗时;
  • Broker 处理消息的时延。

如果是单线程发送,每次只发送 1 条消息,那么每秒只能发送 1000ms / 1ms * 1 条 /ms = 1000 条 消息,这种情况下并不能发挥出消息队列的全部实力。

无论是增加每次发送消息的批量大小,还是增加并发,都能成倍地提升发送性能。至于到底是选择批量发送还是增加并发,主要取决于发送端程序的业务性质。简单来说,只要能够满足你的性能要求,怎么实现方便就怎么实现。

消费端性能优化

​ 使用消息队列的时候,大部分的性能问题都出现在消费端,如果消费的速度跟不上发送端生产消息的速度,就会造成消息积压。如果这种性能倒挂的问题只是暂时的,那问题不大,只要消费端的性能恢复之后,超过发送端的性能,那积压的消息是可以逐渐被消化掉的。

​ 要是消费速度一直比生产速度慢,时间长了,整个系统就会出现问题,要么,消息队列的存储被填满无法提供服务,要么消息丢失,这对于整个系统来说都是严重故障。

​ 那么在设计之初,一定要保证消费端的消费性能要高于生产端的发送性能,这样的系统才能健康的持续运行。

​ 消费端的性能优化除了优化消费业务逻辑以外,也可以通过水平扩容,增加消费端的并发数来提升总体的消费性能。特别需要注意的一点是,**在扩容 Consumer 的实例数量的同时,必须同步扩容主题中的分区(也叫队列)数量,确保 Consumer 的实例数和分区数量是相等的。**如果 Consumer 的实例数量超过分区数量,这样的扩容实际上是没有效果的。原因我们之前讲过,因为对于消费者来说,在每个分区上实际上只能支持单线程消费。

image-20230620154415335

它收消息处理的业务逻辑可能比较慢,也很难再优化了,为了避免消息积压,在收到消息的 OnMessage 方法中,不处理任何业务逻辑,把这个消息放到一个内存队列里面就返回了。然后它可以启动很多的业务线程,这些业务线程里面是真正处理消息的业务逻辑,这些线程从内存队列里取消息处理,这样它就解决了单个 Consumer 不能并行消费的问题。

这个方法是不是很完美地实现了并发消费?请注意,这是一个非常常见的错误方法! 为什么错误?因为会丢消息。如果收消息的节点发生宕机,在内存队列中还没来及处理的这些消息就会丢失

消息积压了该如何处理

还有一种消息积压的情况是,日常系统正常运转的时候,没有积压或者只有少量积压很快就消费掉了,但是某一个时刻,突然就开始积压消息并且积压持续上涨。这种情况下需要你在短时间内找到消息积压的原因,迅速解决问题才不至于影响业务。

导致突然积压的原因肯定是多种多样的,不同的系统、不同的情况有不同的原因,不能一概而论。但是,我们排查消息积压原因,是有一些相对固定而且比较有效的方法的。

能导致积压突然增加,最粗粒度的原因,只有两种:要么是发送变快了,要么是消费变慢了。

大部分消息队列都内置了监控的功能,只要通过监控数据,很容易确定是哪种原因。如果是单位时间发送的消息增多,比如说是赶上大促或者抢购,短时间内不太可能优化消费端的代码来提升消费性能,唯一的方法是通过扩容消费端的实例数来提升总体的消费能力。

如果短时间内没有足够的服务器资源进行扩容,没办法的办法是,将系统降级,通过关闭一些不重要的业务,减少发送方发送的数据量,最低限度让系统还能正常运转,服务一些重要业务。

还有一种不太常见的情况,你通过监控发现,无论是发送消息的速度还是消费消息的速度和原来都没什么变化,这时候你需要检查一下你的消费端,是不是消费失败导致的一条消息反复消费这种情况比较多,这种情况也会拖慢整个系统的消费速度。

论是发送消息的速度还是消费消息的速度和原来都没什么变化,这时候你需要检查一下你的消费端,是不是消费失败导致的一条消息反复消费这种情况比较多,这种情况也会拖慢整个系统的消费速度。

如果监控到消费变慢了,你需要检查你的消费实例,分析一下是什么原因导致消费变慢。优先检查一下日志是否有大量的消费错误,如果没有错误的话,可以通过打印堆栈信息,看一下你的消费线程是不是卡在什么地方不动了,比如触发了死锁或者卡在等待某些资源上了。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/667563.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

flutter七牛云上传sdk插件qiniu_flutter_sdk使用

flutter七牛云上传sdk插件qiniu_flutter_sdk使用 最近在拆分代码,将上传组件设置成插件,下面记录下实现过程。 一、创建flutter_plugin上传插件 这里Android Studio使用创建plugin 填写一下信息 Project nameProject locationDescriptionProject typ…

单片机入门所需的基础数电和模电知识

要学习单片机并入门相关领域,推荐掌握以下数电和模电的基础知识: 数电知识: 布尔代数和逻辑门:了解布尔代数的基本概念和逻辑门的工作原理,包括与门、或门、非门、与非门、或非门、异或门等。 时序逻辑和时钟信号&a…

AIOps介绍

AIOps介绍 AIOps是指人工智能运维(Artificial Intelligence for IT Operations)的缩写。它是将人工智能(AI)和机器学习(ML)技术应用于IT运维领域的一种方法。 传统的IT运维通常需要人工监测和管理大量的系…

脑机接口科普0022——黑门02:伦理道德问题

本文禁止转载!!!! 脑机接口这个技术,是属于黑科技技术中的一种。 现在已经有很多专家,以及机构,提出脑机接口的存在的一些问题。法律是一块的问题,伦理道德是另一块的问题。 虽然…

苹果iPhone14卡死怎么办?解决办法分享!

正常使用的iPhone14虽然很少会出现卡死的情况,但iPhone就是一台微型电脑,像电脑一样“死机”也不是没可能。 有用户称在使用iPhone14时出现突然出现弹出的提示框无法点击取消,锁屏也解决不了死机的问题。同时又因为屏幕其他区域不能操作&…

MySQL免安装配置教程(win10)

一、下载安装包 1.1、下载zip包 打开官网地址下载zip安装包,这里下载的版本是5.7,可自行选择。 对应下载网址:https://downloads.mysql.com/archives/community/ 根据自己电脑进行选择对应安装包 若需要下载msi安装包(图形化界…

Mysql漏洞处理之升级版本到5.7.42过程指导手册

一、背景 某次安全漏扫,发现MySQL大量漏洞,基于Mysql之用于内网,且版本确实有点旧,考虑升级,综合漏洞分析,只能升级到最新版5.7.42和8.0.33,现场环境:Mysql 5.7.28、5.7.20和mysql&…

高等职业学院校园IP网络广播应用-河北资源环境职业技术学院校园IP广播

职业院校大学校园IP网络广播在河北资源环境职业技术学院产教融合基地的应用 北京海特伟业科技任洪卓发布于2023年6月20日 一、高等职业院校校园IP网络广播系统-广播中心 河北资源环境职业技术学院产教融合基地-高等职业院校校园IP网络广播系统是基于TCP/IP协议校园局域网构建…

通过GPIO子系统编写LED驱动,应用程序控制LED灯亮灭

1、在内核设备树中添加设备信息: LED1的设备树编写需要参考内核的帮助文档: linux-5.10.61/Documentation/devicetree/bindings/gpio 在根节点内部添加led灯设备树节点 :~/linux-5.10.61/arch/arm/boot/dts $ vi stm32mp157a-fsmp1a.dts myled.c #in…

渗透测试思路总结

一、说明 《Metasploit 渗透测试魔鬼训练营》等书已经对渗透测试的步骤流程划分得比较合理透彻了,但感觉在多次通读该类书藉之后仍总感觉不得要领----要对一台给定的主机进行渗透还是不懂到底该如何着手。想来主要是存在以下两个问题。 第一个是在渗透操作系统时&…

Yolov8优化:引入Soft-NMS,提升密集遮挡场景检测精度

1.Soft-NMS介绍 论文地址:https://arxiv.org/pdf/1704.04503.pdf NMS需要优化的参数: IoU 的阈值是一个可优化的参数,一般范围为0~0.5,可以使用交叉验证来选择最优的参数。 R-CNN会从一张图片中找出n个可能是物体的矩形框,然后为每个矩形框为做类别分类概率: 就…

maven测试依赖的排除

1、概念 当 A 依赖 B,B 依赖 C 而且 C 可以传递到 A 的时候,A 不想要 C,需要在 A 里面把 C 排除掉。而往往这种情况都是为了避免 jar 包之间的冲突。 所以配置依赖的排除其实就是阻止某些 jar 包的传递。因为这样的 jar 包传递过来会和其他 …

parallelStream与CompletableFuture

1 了解parallelStream parallelStream怎么实现的并行处理呢? 其底层是Fork/Join并行计算框架的默认线程池,默认线程池的数量就是处理器的数量,可以使用系统属性:-Djava.util.concurrent.ForkJoinPool.common.parallelism{N} 调整…

【数据库】MySQL 高级(进阶) SQL 语句

文章目录 前提条件一、常用查询1. SELECT(显示查询)2. DISTINCT(不重复查询)3. WHERE(有条件查询)4. AND/OR(且/或)5. IN (显示已知值的字段)6. BETWEEN&…

自驾出游擅自使用对讲机属于违法行为?

周末或节假日大多数人通常都会选择自驾出游,或是叫上自己的好友一起出游,这个时候就可以组成一个车队。为了沟通起来更方便,大家一般都喜欢配个对讲机。 不过据调查显示,大多数人并不认为擅自使用对讲机算违法行为。在多个电商平…

【计算机视觉】OFA:通过一个简单的seq2seq的学习框架来统一架构、任务和模态

文章目录 一、导读二、摘要三、介绍四、OFA4.1 I/O & Architecture4.1.1 I/O4.1.2 Architecture 4.2 Tasks & Modalities4.3 预训练数据集4.4 训练与推理4.5 缩放模型 五、实验结果5.1 跨模态任务的结果5.2 单模态任务的结果5.3 zero-shot学习和任务迁移 六、测试结果七…

测试不为人知的小秘密,你占了几个?

1、线上出现小bug,小本本记下,后面偷偷给开发提个bug。 2、操作服务器时,把数据库玩坏了,系统玩崩了,加班熬夜默默的抢救修复。 3、和开发吵了一架,然后重点照顾了他负责的模块,找了一堆bug&a…

智慧无线灌溉在园林中的应用

智慧无线灌溉技术是解决人们生活用水与园林灌溉之间用水矛盾的有效措施之一,对提升园林灌溉效率和节约水资源有着重要的意义。 智慧无线灌溉系统可以自动感知园林植物周围的环境温度、水分等要素,并对感知到的要素进行详细分析和判断,以确定…

小白到运维工程师自学之路 第四十一集 (shell脚本的基本使用)

一、概述 Shell是一种命令行解释器,它是一种编程语言,用于在操作系统上执行命令和脚本。Shell语言是一种脚本语言,它可以用于自动化任务、批处理、系统管理和编写简单的程序。Shell语言通常用于Unix和Linux操作系统中,但也可以在其…

MySQL优化--索引创建原则,索引什么时候会失效

目录 索引创建原则 面试回答 索引什么时候会失效 面试回答 索引创建原则 1). 针对于数据量较大,且查询比较频繁的表建立索引。 2). 针对于常作为查询条件(where)、排序(order by)、分组(group by&…