【系统设计】如何确保消息不会丢失?

news2024/10/5 21:15:44

一、前言

对于大部分业务系统来说,丢消息意味着数据丢失,是完全无法接受的。其实,现在主流的消息队列产品都提供了非常完善的消息可靠性保证机制,完全可以做到在消息传递过程中,即使发生网络中断或者硬件故障,也能确保消息的可靠传递,不丢消息。

绝大部分丢消息的原因都是由于开发者不熟悉消息队列,没有正确使用和配置消息队列导致的。虽然不同的消息队列提供的 API 不一样,相关的配置项也不同,但是在保证消息可靠传递这块儿,它们的实现原理是一样的。

这节课我们就来讲一下,消息队列是怎么保证消息可靠传递的,这里面的实现原理是怎么样的。当你熟知原理以后,无论你使用任何一种消息队列,再简单看一下它的 API 和相关配置项,就能很快知道该如何配置消息队列,写出可靠的代码,避免消息丢失。

二、检测消息丢失的方法

我们说,用消息队列最尴尬的情况不是丢消息,而是消息丢了还不知道。一般而言,一个新的系统刚刚上线,各方面都不太稳定,需要一个磨合期,这个时候,特别需要监控到你的系统中是否有消息丢失的情况。

如果是 IT 基础设施比较完善的公司,一般都有分布式链路追踪系统,使用类似的追踪系统可以很方便地追踪每一条消息。如果没有这样的追踪系统,这里我提供一个比较简单的方法,来检查是否有消息丢失的情况。

我们可以利用消息队列的有序性来验证是否有消息丢失。原理非常简单,在 Producer 端,我们给每个发出的消息附加一个连续递增的序号,然后在 Consumer 端来检查这个序号的连续性。

如果没有消息丢失,Consumer 收到消息的序号必然是连续递增的,或者说收到的消息,其中的序号必然是上一条消息的序号 +1。如果检测到序号不连续,那就是丢消息了。还可以通过缺失的序号来确定丢失的是哪条消息,方便进一步排查原因。

大多数消息队列的客户端都支持拦截器机制,你可以利用这个拦截器机制,在 Producer 发送消息之前的拦截器中将序号注入到消息中,在 Consumer 收到消息的拦截器中检测序号的连续性,这样实现的好处是消息检测的代码不会侵入到你的业务代码中,待你的系统稳定后,也方便将这部分检测的逻辑关闭或者删除。

如果是在一个分布式系统中实现这个检测方法,有几个问题需要你注意。

首先,像 Kafka 和 RocketMQ 这样的消息队列,它是不保证在 Topic 上的严格顺序的,只能保证分区上的消息是有序的,所以我们在发消息的时候必须要指定分区,并且,在每个分区单独检测消息序号的连续性。

如果你的系统中 Producer 是多实例的,由于并不好协调多个 Producer 之间的发送顺序,所以也需要每个 Producer 分别生成各自的消息序号,并且需要附加上 Producer 的标识,在 Consumer 端按照每个 Producer 分别来检测序号的连续性。

Consumer 实例的数量最好和分区数量一致,做到 Consumer 和分区一一对应,这样会比较方便地在 Consumer 内检测消息序号的连续性。

三、确保消息可靠传递

讲完了检测消息丢失的方法,接下来我们一起来看一下,整个消息从生产到消费的过程中,哪些地方可能会导致丢消息,以及应该如何避免消息丢失。

  • 生产阶段: 在这个阶段,从消息在 Producer 创建出来,经过网络传输发送到 Broker 端。

  • 存储阶段: 在这个阶段,消息在 Broker 端存储,如果是集群,消息会在这个阶段被复制到其他的副本上。

  • 消费阶段: 在这个阶段,Consumer 从 Broker 上拉取消息,经过网络传输发送到 Consumer 上。

3.1、生产阶段

在生产阶段,消息队列通过最常用的请求确认机制,来保证消息的可靠传递:当你的代码调用发消息方法时,消息队列的客户端会把消息发送到 Broker,Broker 收到消息后,会给客户端返回一个确认响应,表明消息已经收到了。客户端收到响应后,完成了一次正常消息的发送。

只要 Producer 收到了 Broker 的确认响应,就可以保证消息在生产阶段不会丢失。有些消息队列在长时间没收到发送确认响应后,会自动重试,如果重试再失败,就会以返回值或者异常的方式告知用户。

你在编写发送消息代码时,需要注意,正确处理返回值或者捕获异常,就可以保证这个阶段的消息不会丢失。以 Kafka 为例,我们看一下如何可靠地发送消息:

同步发送时,只要注意捕获异常即可。

try {
    RecordMetadata metadata = producer.send(record).get();
    System.out.println(" 消息发送成功。");
} catch (Throwable e) {
    System.out.println(" 消息发送失败!");
    System.out.println(e);
}

异步发送时,则需要在回调方法里进行检查。这个地方是需要特别注意的,很多丢消息的原因就是,我们使用了异步发送,却没有在回调中检查发送结果。

producer.send(record, (metadata, exception) -> {
    if (metadata != null) {
        System.out.println(" 消息发送成功。");
    } else {
        System.out.println(" 消息发送失败!");
        System.out.println(exception);
    }
});

如果超过一定超时时间还是失败,那就抛出异常,由发者自己在应用层面进行处理,手动重试发送 或者 记录失败消息后续补偿。

不过我们需要特别注意是,RocketMQ支持多种「消息类型」,但是并不是对所有「消息类型」 都会有「消息确认机制」和「失败重试机制」。

RocketMQ生产消息时,支持多种「消息类型」和「消息发送模式」。咱们白话为主,就不展开源码了,有兴趣同学可以参考

org.apache.rocketmq.client.producer.MQProducer这个接口定义即可。

消息类型:

  • 普通消息:发送普通消息,异常时默认重试

  • 普通有序消息:发送普通有序消息,通过指定「消息筛选器selector」,动态决定发送哪个队列。异常默认不重试,可以用户自己重试,并发送到其他队列

  • 严格有序消息:发送严格有序消息,通过指定队列,保证严格有序,异常默认不重试

消息发送模式:

  • 同步:调用发送消息方法后,同步阻塞,直到返回SendResult。配置retryTimesWhenSendFailed重试次数。

  • 异步:调用发送消息方法后,立即返回,发送结果会通过开发者自己注册的回调函数SendCallback进行处理。配置retryTimesWhenSendAsyncFailed重试次数。

  • 单向发送:这种方法完全不关心发送后的返回结果。显然,它具有最大吞吐量,但也存在消息丢失的潜在风险。

发送消息的模式和消息类型,可以通过 消息确认、mq-client自动「失败重试机制」、业务自定义重试 等方式,确保消息发送不丢失。

3.2、存储阶段

在存储阶段正常情况下,只要 Broker 在正常运行,就不会出现丢失消息的问题,但是如果 Broker 出现了故障,比如进程死掉了或者服务器宕机了,还是可能会丢失消息的。

对于单个节点的 Broker,需要配置 Broker 参数,在收到消息后,将消息写入磁盘后再给 Producer 返回确认响应,这样即使发生宕机,由于消息已经被写入磁盘,就不会丢失消息,恢复后还可以继续消费。例如,在 RocketMQ 中,需要将刷盘方式 flushDiskType 配置为 SYNC_FLUSH 同步刷盘。

如果是 Broker 是由多个节点组成的集群,需要将 Broker 集群配置成:至少将消息发送到 2 个以上的节点,再给客户端回复发送确认响应。这样当某个 Broker 宕机时,其他的 Broker 可以替代宕机的 Broker,也不会发生消息丢失。

在Kafka中,可以设置以下参数:

  • replication.factor >= 3

该参数表示分区副本的个数(之前已经介绍过了副本的概念)。

replication.factor =3

建议设置大于1,至少有一个副本,随着业务的重要性提升可以增加副本数量。如果 Leader 副本出现不可用,Follower 副本会被选举为新的 Leader 副本继续提供服务。冗余存储数据,做备份本身就是一种提升消息可靠性的办法。

  • unclean.leader.election.enable = false

该参数表示有哪些 Follower 可以有资格被选举为 Leader 。

unclean.leader.election.enable = false

如果一个 Follower 的数据落后 Leader 太多,那么一旦它被选举为新的 Leader, 数据就会丢失,因此我们要将其设置为false,防止此类情况发生。

  • min.insync.replicas > 1

该参数表示消息至少要被写入成功到 ISR 多少个副本才算"已提交",默认值是1,建议设置 min.insync.replicas > 1,这样才可以提升消息持久性,保证数据不丢失。

min.insync.replicas = 2

另外我们还需要确保一下replication.factor > min.insync.replicas,如果相等,只要有一个副本不可以,整个 partition 将无法正常提供服务。

为了保证消息持久性的同时还要保证可用性,推荐设置成:replication.factor =min.insync.replicas +1, 最大限度保证系统可用性。

3.3、消费阶段

消费阶段采用和生产阶段类似的确认机制来保证消息的可靠传递,客户端从 Broker 拉取消息后,执行用户的消费业务逻辑,成功后,才会给 Broker 发送消费确认响应。如果 Broker 没有收到消费确认响应,下次拉消息的时候还会返回同一条消息,确保消息不会在网络传输过程中丢失,也不会因为客户端在执行消费逻辑中出错导致丢失。

你在编写消费代码时需要注意的是,不要在收到消息后就立即发送消费确认,而是应该在执行完所有消费业务逻辑之后,再发送消费确认。

同样,我们以用 Python 语言消费 RabbitMQ 消息为例,来看一下如何实现一段可靠的消费代码:

def callback(ch, method, properties, body):
    print(" [x] 收到消息 %r" % body)
    # 在这儿处理收到的消息
    database.save(body)
    print(" [x] 消费完成 ")
    # 完成消费业务逻辑后发送消费确认响应
    ch.basic_ack(delivery_tag = method.delivery_tag)
 
channel.basic_consume(queue='hello', on_message_callback=callback)

你可以看到,在消费的回调方法 callback 中,正确的顺序是,先是把消息保存到数据库中,然后再发送消费确认响应。这样如果保存消息到数据库失败了,就不会执行消费确认的代码,下次拉到的还是这条消息,直到消费成功。

如果在尝试消费的过程中达到了最大重试次数(通常为16次),仍然无法成功消费,则消息将被发送到死信队列,以确保消息存储的可靠性。后续业务可以根据死信队列,来做相关补偿措施。

四、参考

  • Kafka 是如何做到消息不丢或不重复的?

  • 3分钟白话RocketMQ系列—— 如何保证消息不丢失

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/1314537.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

Initial用法-FPGA入门3

Initial是什么 FPGA Initial是一种在FPGA中进行初始化的方法。在FPGA设备上,初始值决定了逻辑门的状态和寄存器的初始值。FPGA Initial可以通过设置初始值来控制电路在上电后的初始状态。 Initial的作用 2.1,控制电路启动时的初始状态 通过设置FPGA Ini…

迅为RK3568开发板使用OpenCV处理图像-ROI区域-位置提取ROI

在图像处理过程中,我们可能会对图像的某一个特定区域感兴趣,该区域被称为感兴趣区域(Region of Interest, ROI)。在设定感兴趣区域 ROI 后,就可以对该区域进行整体操作。 位置提取 ROI 本小节代码在配套资料“iTOP-3…

KVM虚拟机console使用

注意这些设置都在你要进入虚拟机里设置,不是在你的物理机设置 首先debian12 需要设置 grep ttyS0 /etc/securetty #没有则加上 echo ttyS0 >> /etc/securetty #启动 systemctl start serial-gettyttyS0 systemctl enable serial-gettyttyS0#CentOS Stream …

MIT18.06线性代数 笔记3

文章目录 对称矩阵及正定性复数矩阵和快速傅里叶变换正定矩阵和最小值相似矩阵和若尔当形奇异值分解线性变换及对应矩阵基变换和图像压缩单元检测3复习左右逆和伪逆期末复习 对称矩阵及正定性 特征值是实数特征向量垂直>标准正交 谱定理,主轴定理 为什么对称矩…

智能优化算法应用:基于供需算法3D无线传感器网络(WSN)覆盖优化 - 附代码

智能优化算法应用:基于供需算法3D无线传感器网络(WSN)覆盖优化 - 附代码 文章目录 智能优化算法应用:基于供需算法3D无线传感器网络(WSN)覆盖优化 - 附代码1.无线传感网络节点模型2.覆盖数学模型及分析3.供需算法4.实验参数设定5.算法结果6.参考文献7.MA…

企业微信旧版-新版网络连接错误,无法登录的解决方案

一.企业微微信无法登录故障 二.解决方案 1.网上的解决方案 **检查网络连接:**确保你的计算机正常连接到互联网。尝试打开其他网页,以确保网络连接正常。 **防火墙和安全软件:**某些防火墙或安全软件可能会阻止企业微信的正常连接。请确保你…

CGAL的3D网格参数化

1、介绍 参数化曲面相当于找到一个从合适的域到曲面的单射映射。一个好的映射是在某种意义上最小化角度失真(保角参数化)或面积失真(等面积参数化)的映射。在这个包中,我们专注于参数化与圆盘或球体同胚的三角化曲面&a…

用CC三维建模建出的OSGB格式,用模方打不开,显示该路径包含OSGB瓦块数量0,是什么原因?

答:模方只识别tile命名的模型文件,此模型是不分块输出,要平面切块重新跑。 模方是一款针对实景三维模型的冗余碎片、水面残缺、道路不平、标牌破损、纹理拉伸模糊等共性问题研发的实景三维模型修复编辑软件。模方4.1新增自动单体化建模功能&…

Kubernetes 容器编排(2)

可视化部署 官方Dashboard 部署Dashboard # kubectl apply -f https://raw.githubusercontent.com/kubernetes/dashboard/v2.4.0/aio/deploy/recommended.yaml # kubectl edit svc kubernetes-dashboard -n kubernetes-dashboard # 注意将 type: ClusterIP 改为 type: NodePo…

每周一算法:树形动态规划

树形动态规划 树形动态规划一般用于处理求树上最优值的问题。大多数动态规划问题都是在一维二维这种规则的背景下的,可以解决的问题比较局限,而树作为一种特殊的图,可以描述比较复杂的关系,再加上树的递归定义,是一种…

Linux-CentOS7(无图形界面版)部署stable-diffusion-webui 全过程

Linux-CentOS7(无图形界面版)部署Stable Diffusion webui 全过程 前置要求 git的版本不能是CentOS默认的版本(1.8),版本太老,在后面安装过程会失败。去github上下载最新的git源码包 安装成功显示版本号 …

用重建大师集群跑模型,在哪里可以设置联机?

答:工程路径、照片路径,引擎路径均设置为网络IP方式的路径,集群内的引擎都设置一样的路径就可以集群处理了。 重建大师是一款专为超大规模实景三维数据生产而设计的集群并行处理软件,输入倾斜照片,激光点云&#xff0c…

磁盘坏道扫描工具 Macrorit Disk Scanner v6.7.0 中文免费版 -供大家学习研究参考

非常方便实用的磁盘坏道修复软件。Wipe Bad Disk功能强大好用,通过特殊的算法来强制将硬盘的坏道删除清空格式化,从而拯救因产生坏道而不敢继续使用的硬盘!要注意的是经过这块软件清空的硬盘数据基本上是不能被恢复的,所以操作前请一定要备份…

【深度学习目标检测】七、基于深度学习的火灾烟雾识别(python,目标检测,yolov8)

YOLOv8是一种物体检测算法,是YOLO系列算法的最新版本。 YOLO(You Only Look Once)是一种实时物体检测算法,其优势在于快速且准确的检测结果。YOLOv8在之前的版本基础上进行了一系列改进和优化,提高了检测速度和准确性。…

基于Nexus搭建Maven私服基础入门

什么是Nexus?它有什么优势? 要了解为什么需要nexus的存在,我们不妨从以下几个问题来简单了解一下: 为什么需要搭建私服?如果没有私服会出现什么问题? 对于企业开发而言,如果没有私服,我们所有…

uniGUI for Delphi UniSweetAlert控件详解

UniSweetAlert是UniGUI后期版本新增的一个界面友好的消息提示和输入控件,是ShowMessageN的升级版,UniSweetAlert增加了更多的可控制属性。 属性介绍 1、AlertType:提示类型,分为atError、atSuccess、atInfo、atQuestion、atWarni…

JavaSE语法之七:封装

文章目录 一、封装的概念二、访问限定符三、封装扩展之包1. 包的概念2. 导入包中的类3. 自定义包4. 常见的包 四、实现封装五、static成员1. 再谈学生类成员变量2. static修饰成员变量3. static修饰成员方法4. static成员变量初始化 六、代码块1. 代码块概念及其分类2. 普通代码…

【VMware安装及虚拟机配置】

1. 下载VMware 进入 VMware Workstation 17 Pro下载链接 下拉到如下位置,点击DOWNLOAD 2. 安装VMware 参考:虚拟机VMware下载与安装教程 本次安装是vmware 17,安装步骤差不多,只参考第二部分即可。 3. 激活VMware 密钥&…

version `GLIBC_2.29‘ not found 的原因和怎么解决问题

程序上经常有在这台Linux上编译,然后放到另一个Linux上运行的情况。 如果Linux版本差别不大或都是ubuntu或centos系列还好。 如果不是一个系列很容易出现GLIBC 找不到的情况。 尤其是ubuntu上编译,然后放到centos系列。因为centos为了追求所谓的稳定&…

计算机组成原理-选择语句和循环语句的汇编表示

文章目录 选择语句jmpjxx示例:选择语句的机器级表示扩展:cmp指令的底层原理 循环语句使用条件转移指令实现循环用loop指令实现循环 选择语句 不一定知道指令的位置,所以jmp直接跳转到指令的位置很难办 jmp 标号相当于位置,名字…