消息投递:如何保证消息不丢失?

news2024/11/27 14:38:02

目录

前言

一、消息为什么会丢失?

二、在消息生产的过程中丢失消息

三、在消息队列中丢失消息

四、在消费的过程中存在消息丢失的可能


前言

在电商系统中,我们经常有这样的场景,在用户下单购买完商品后,需要给用户发送红包来促进用户继续消费。而发送红包的流程并不在下单的核心流程之内,这样的流程我们通常会采用消息队列的方式来异步处理,提高接口性能。

这时,你发现了一个问题:如果消息在投递的过程中发生丢失,那么用户就会因为没有得到红包而投诉。相反,如果消息在投递的过程中出现了重复,那么你的系统就会因为发送两个红包而损失。

那么我们如何保证,产生的消息一定会被消费到,并且只被消费一次呢?


一、消息为什么会丢失

如果要保证消息只被消费一次,首先就要保证消息不会丢失。那么消息从被写入到消息队列,到被消费者消费完成,这个链路上会有哪些地方存在丢失消息的可能呢?其实,主要存在三个场景:

  1. 消息从生产者写入到消息队列的过程。
  2. 消息在消息队列中的存储场景。
  3. 消息被消费者消费的过程。

接下来,我就针对每一个场景,详细地剖析一下,这样你可以针对不同的场景选择合适的,减少消息丢失的解决方案。

二、在消息生产的过程中丢失消息

在这个环节中主要有两种情况。

首先,消息的生产者一般是我们的业务服务器,消息队列是独立部署在单独的服务器上的。两者之间的网络虽然是内网,但是也会存在抖动的可能,而一旦发生抖动,消息就有可能因为网络的错误而丢失。

针对这种情况,我建议你采用的方案是消息重传:也就是当你发现发送超时后你就将消息重新发一次,但是你也不能无限制地重传消息。一般来说,如果不是消息队列发生故障,或者是到消息队列的网络断开了,重试 2~3 次就可以了。不过,这种方案可能会造成消息的重复,从而导致在消费的时候会重复消费同样的消息。比方说,消息生产时由于消息队列处理慢或者网络的抖动,导致虽然最终写入消息队列成功,但在生产端却超时了,生产者重传这条消息就会形成重复的消息,那么针对上面的例子,直观显示在你面前的就会是你收到了两个现金红包。

spring:
  # kafka 配置
  kafka:
    bootstrap-servers: 127.0.0.1:9092
    producer:
      key-serializer: org.apache.kafka.common.serialization.StringSerializer
      value-serializer: org.apache.kafka.common.serialization.StringSerializer
      properties:
        # 消费请求超时时间
        request:
          timeout:
            ms: 20000
        batch:
          #批量大小
          size: 1024
        #procedure要求leader在考虑完成请求之前收到的确认数,用于控制发送记录在服务端的持久化,其值可以为如下:
        #acks = 0 如果设置为零,则生产者将不会等待来自服务器的任何确认,该记录将立即添加到套接字缓冲区并视为已发送。在这种情况下,无法保证服务器已收到记录,并且重试配置将不会生效(因为客户端通常不会知道任何故障),为每条记录返回的偏移量始终设置为-1。
        #acks = 1 这意味着leader会将记录写入其本地日志,但无需等待所有副本服务器的完全确认即可做出回应,在这种情况下,如果leader在确认记录后立即失败,但在将数据复制到所有的副本服务器之前,则记录将会丢失。
        #acks = all 这意味着leader将等待完整的同步副本集以确认记录,这保证了只要至少一个同步副本服务器仍然存活,记录就不会丢失,这是最强有力的保证,这相当于acks = -1的设置。
        #可以设置的值为:all, -1, 0, 1
        acks: 1
        #重试次数
        retries: 2

kafka中配置重试次数 spring.kafka.producer.retries = 2

那么消息发送到了消息队列之后是否就万无一失了呢?当然不是,在消息队列中消息仍然有丢失的风险。

三、在消息队列中丢失消息

拿 Kafka 举例,消息在 Kafka 中是存储在本地磁盘上的,而为了减少消息存储时对磁盘的随机 I/O,我们一般会将消息先写入到操作系统的 Page Cache 中,然后再找合适的时机刷新到磁盘上。

比如,Kafka 可以配置当达到某一时间间隔,或者累积一定的消息数量的时候再刷盘,也就是所说的异步刷盘。

不过,如果发生机器掉电或者机器异常重启,那么 Page Cache 中还没有来得及刷盘的消息就会丢失了。那么怎么解决呢?

你可能会把刷盘的间隔设置很短,或者设置累积一条消息就就刷盘,但这样频繁刷盘会对性能有比较大的影响,而且从经验来看,出现机器宕机或者掉电的几率也不高,所以我不建议你这样做。

如果你的电商系统对消息丢失的容忍度很低,那么你可以考虑以集群方式部署 Kafka 服务,通过部署多个副本备份数据,保证消息尽量不丢失。那么它是怎么实现的呢?

Kafka 集群中有一个 Leader 负责消息的写入和消费,可以有多个 Follower 负责数据的备份。Follower 中有一个特殊的集合叫做 ISR(in-sync replicas),当 Leader 故障时,新选举出来的 Leader 会从 ISR 中选择,默认 Leader 的数据会异步地复制给 Follower,这样在 Leader 发生掉电或者宕机时,Kafka 会从 Follower 中消费消息,减少消息丢失的可能。

由于默认消息是异步地从 Leader 复制到 Follower 的,所以一旦 Leader 宕机,那些还没有来得及复制到 Follower 的消息还是会丢失。为了解决这个问题,Kafka 为生产者提供一个选项叫做“acks”,当这个选项被设置为“all”时,生产者发送的每一条消息除了发给Leader 外还会发给所有的 ISR,并且必须得到 Leader 和所有 ISR 的确认后才被认为发送成功。这样,只有 Leader 和所有的 ISR 都挂了,消息才会丢失。从上面这张图来看,当设置“acks=all”时,需要同步执行 1,3,4 三个步骤,对于消息生产的性能来说也是有比较大的影响的,所以你在实际应用中需要仔细地权衡考量。

spring:
  # kafka 配置
  kafka:
    bootstrap-servers: 127.0.0.1:9092
    producer:
      key-serializer: org.apache.kafka.common.serialization.StringSerializer
      value-serializer: org.apache.kafka.common.serialization.StringSerializer
      properties:
        #procedure要求leader在考虑完成请求之前收到的确认数,用于控制发送记录在服务端的持久化,其值可以为如下:
        #acks = 0 如果设置为零,则生产者将不会等待来自服务器的任何确认,该记录将立即添加到套接字缓冲区并视为已发送。在这种情况下,无法保证服务器已收到记录,并且重试配置将不会生效(因为客户端通常不会知道任何故障),为每条记录返回的偏移量始终设置为-1。
        #acks = 1 这意味着leader会将记录写入其本地日志,但无需等待所有副本服务器的完全确认即可做出回应,在这种情况下,如果leader在确认记录后立即失败,但在将数据复制到所有的副本服务器之前,则记录将会丢失。
        #acks = all 这意味着leader将等待完整的同步副本集以确认记录,这保证了只要至少一个同步副本服务器仍然存活,记录就不会丢失,这是最强有力的保证,这相当于acks = -1的设置。
        #可以设置的值为:all, -1, 0, 1
        acks: 1
  1. 如果你需要确保消息一条都不能丢失,那么建议不要开启消息队列的同步刷盘,而是需要使用集群的方式来解决,可以配置当所有 ISR Follower 都接收到消息才返回成功。
  2. 如果对消息的丢失有一定的容忍度,那么建议不部署集群,即使以集群方式部署,也建议配置只发送给一个 Follower 就可以返回成功了。
  3. 我们的业务系统一般对于消息的丢失有一定的容忍度,比如说以上面的红包系统为例,如果红包消息丢失了,我们只要后续给没有发送红包的用户补发红包就好了。

四、在消费的过程中存在消息丢失的可能

我还是以 Kafka 为例来说明。一个消费者消费消息的进度是记录在消息队列集群中的,而消费的过程分为三步:接收消息、处理消息、更新消费进度。这里面接收消息和处理消息的过程都可能会发生异常或者失败,比如说,消息接收时网络发生抖动,导致消息并没有被正确的接收到;处理消息时可能发生一些业务的异常导致处理流程未执行完成,这时如果更新消费进度,那么这条失败的消息就永远不会被处理了,也可以认为是丢失了。

所以,在这里你需要注意的是,一定要等到消息接收和处理完成后才能更新消费进度,但是这也会造成消息重复的问题,比方说某一条消息在处理之后,消费者恰好宕机了,那么因为没有更新消费进度,所以当这个消费者重启之后,还会重复地消费这条消息。

spring:
  kafka:
    consumer:
      group-id: launch-mq-prod
      # 是否在消费消息后将 offset 同步到 Broker,当 Consumer 失败后就能从 Broker 获取最新的 offset
      enable-auto-commit: false

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/1170286.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

互联网医院|湖南互联网医院|智慧医疗改善就医服务

互联网医院系统,是指利用互联网技术和远程医疗技术,提供在线就诊、咨询、诊断和治疗等医疗服务的一种医疗模式。互联网医院系统实际上与医院的HIS系统很相似,是侧重服务于线上问诊的专业HIS,包含传统HIS的基本模块,如挂…

【数据结构】— —查找(折半查找,二叉排序树)

🎃个人专栏: 🐬 算法设计与分析:算法设计与分析_IT闫的博客-CSDN博客 🐳Java基础:Java基础_IT闫的博客-CSDN博客 🐋c语言:c语言_IT闫的博客-CSDN博客 🐟MySQL&#xff1a…

数据结构-二叉树·堆(顺序结构的实现)

🎉个人名片: 🐼作者简介:一名乐于分享在学习道路上收获的大二在校生 🐻‍❄个人主页🎉:GOTXX 🐼个人WeChat:ILXOXVJE🐼本文由GOTXX原创,首发CSDN&…

游戏推荐《塞尔达传说王国之泪》

塞尔达传说:王国之泪 播报编辑讨论12上传视频 2023年任天堂企划制作本部开发的动作冒险游戏 《塞尔达传说 王国之泪》是任天堂企划制作本部开发的动作冒险游戏,为《塞尔达传说》主系列的第20作、《塞尔达传说:旷野之息》的正统续篇&#xf…

块级作用域的理解

块级作用于的概念 由一对花括号{}中的语句集都属于一个块,在这个{}里面包含的块内定义的所有变量在代码块外都是不可见的,因此称为块级作用域。 作用域永远都是任何一门语言的重中之中,因为它控制着变量和参数的可见性和生命周期。讲到这里&…

uniapp打包安卓app后获取签名证书的SHA1,SHA256,MD5等信息

获取签名证书信息的方法有两种 1.dcloud开发者中心生成证书查看 在证书详情中可以查看,还可以下载证书 2.使用命令查看 1.先安装jre8,再配置一下环境变量 jre8下载地址 2.将打好的正式版app 后缀改为 .zip,解压 3.打开META-INF目录&…

基于卷积神经网络的抗压强度预测,基于卷积神经网络的抗折强度预测

目录 背影 卷积神经网络CNN的原理 卷积神经网络CNN的定义 卷积神经网络CNN的神经元 卷积神经网络CNN的激活函数 卷积神经网络CNN的传递函数 卷积神经网络CNN抗压强度预测 完整代码:基于卷积神经网络的抗压强度和抗折强度预测,基于CNN的抗压强度和抗折强度预测(代码完整,数据…

搭建第一个区块链网络与一键部署WeBASE步骤

官网 搭建第一个区块链网络 — FISCO BCOS v2 v2.9.0 文档 (fisco-bcos-documentation.readthedocs.io) 一键部署 — WeBASE v1.5.5 文档 (webasedoc.readthedocs.io) 步骤 默认如MySQL、Python、java等依赖已经引入 1.创建操作目录, 下载安装脚本 创建操作目录 cd ~ &a…

Hydra post登录框爆破

文章目录 无token时的Hydra post登录框爆破带Token时的Hydra post登录框爆破 无token时的Hydra post登录框爆破 登录一个无验证码和token的页面,同时抓包拦截 取出发送数据包:usernameadb&password133&submitLogin 将用户名和密码替换 userna…

安科瑞变电站综合自动化系统在青岛海洋科技园应用

安科瑞 耿敏花 摘 要:变电站综合自动化系统是将变电站内的二次设备经过功能的组合和优化设计,利用先进的计算机技术、通信技术、信号处理技术,实现对全变电站的主要设备和输、配电线路的自动监视、测量、控制、保护、并与上级调度通信的综合性…

教你怎么用Python每天自动给女朋友免费发短信

今天的教程就是教大家怎么发送免费短信给女朋友。 发送短信接口,我知道的常见的有两个平台,一个是 twilio,可以免费发短信 500 条,可发任意信息,一个是腾讯云,可以免费发短信 100 条,需要申请短…

【C语言初阶】函数

目录 一、函数是什么 二、C语言中函数的分类 2.1 库函数: strcpy memset 2.2 自定义函数 三、函数的参数 3.1 实际参数(实参) 3.2 形式参数(形参) 四、函数的调用 4.1 传值调用 4.2 传址调用 五、函数的嵌…

【广州华锐互动】VR历史古城复原:沉浸式体验古代建筑,感受千年风华!

在科技日新月异的今天,虚拟现实(VR)技术已经成为了我们生活中不可或缺的一部分。从娱乐游戏到医疗健康,从教育培训到房地产销售,VR技术的应用领域日益广泛。而近年来,VR技术在文化遗产保护和古迹复原方面的…

警惕Mallox勒索病毒的最新变种mallox,您需要知道的预防和恢复方法。

尊敬的读者: 在这个数字时代,恶意软件不再是仅限于技术领域的威胁,而是每个人都可能面临的潜在风险。其中,.mallox勒索病毒崭露头角,它不仅能够以不可思议的方式加密您的数据,还能要求您支付赎金以获取解密…

行业观察:数字化企业需要什么样的数据中心

伴随着数字经济在中国乃至全球的高速发展,数字化转型已经成为广大企业的必经之路。而作为数字经济的核心基础设施,数据中心充当了接收、处理、存储与转发数据流的“中枢大脑”,对驱动数字经济发展和企业数字化转型起到了极为关键的重要作用。…

vue中的rules表单校验规则使用方法 :rules=“rules“

一、el-form里面必写属性值 :ref"dataForm" // 提交表单时进行校验 :rules"rules" // return 下的校验规则 :model"userForm" // 绑定表单的值 <el-formref"dataForm" // 必写属性值:rules"rules"…

通过在Z平面放置零极点的来设计数字滤波器

文章来源地址&#xff1a;https://www.yii666.com/blog/393376.html 通过在Z平面放置零极点的来设计数字滤波器 要求&#xff1a;设计一款高通滤波器&#xff0c;用在音频信号处理过程中&#xff0c;滤掉100Hz以下的信号。 实现方法&#xff1a;通过在Z平面放置零极点的来设…

(景行锐创) 高性能计算平台 Pytorch 深度学习环境超详细教程

文章目录 前言1. 账号申请2. 登录高算平台网站3. 安装 Xshell&#xff0c;Xftp 软件4. 连接高算平台5. 安装 Anaconda6. 安装 CUDA7. 配置 cuDNN8. 安装 torch 和 torchvision9. 提交作业测试10. 解压与压缩文件11. 其他结语 前言 目前一些学校为了便于师生进行大规模的计算任…

Aop自定义注解生成日志

Aop自定义注解生成日志 1.编写自定义注解 //表示此注解可以标注在方法上 Target(ElementType.METHOD) //运行时生效 Retention(RetentionPolicy.RUNTIME) public interface OpetionLog {//定义一个变量&#xff0c;可以接收参数String value() default "";}2.Cont…

Leetcode 剑指 Offer II 051. 二叉树中的最大路径和

题目难度: 困难 原题链接 今天继续更新 Leetcode 的剑指 Offer&#xff08;专项突击版&#xff09;系列, 大家在公众号 算法精选 里回复 剑指offer2 就能看到该系列当前连载的所有文章了, 记得关注哦~ 题目描述 路径 被定义为一条从树中任意节点出发&#xff0c;沿父节点-子节…