消息队列使用中防止消息丢失的实战指南

news2025/1/14 11:58:28

消息队列使用中防止消息丢失的实战指南

在分布式系统架构里,消息队列起着举足轻重的作用,它异步解耦各个业务模块,提升系统整体的吞吐量与响应速度。但消息丢失问题,犹如一颗不定时炸弹,随时可能破坏系统的数据一致性与业务完整性。接下来,详细聊聊在使用消息队列时,如何全方位筑牢防线,杜绝消息丢失。

在这里插入图片描述

一、生产者端防丢策略

  1. 事务机制
    主流的消息队列如 Kafka 、 RabbitMQ 都支持事务特性。以 RabbitMQ 为例,生产者开启事务后,发送消息、提交事务这一系列操作被纳入原子操作范畴。代码示例(Python 使用 pika 库):
import pika

connection = pika.BlockingConnection(pika.ConnectionParameters('localhost'))
channel = connection.channel()
channel.tx_select()
try:
    channel.basic_publish(exchange='test_exchange', routing_key='test_key', body='message')
    channel.tx_commit()
except Exception as e:
    channel.tx_rollback()
finally:
    connection.close()

采用事务机制,只要消息没能成功被队列接收,生产者就能回滚操作,确保消息不丢失,但事务操作会带来一定性能开销,需权衡使用。
2. 异步确认机制
许多高性能场景下,会选用异步确认来替代事务。还是以 RabbitMQ 举例,生产者发送一批消息后,无需等待 broker 逐个确认,继续发送后续消息,利用回调函数处理 broker 的确认信息:

import pika
from concurrent.futures import ThreadPoolExecutor

connection = pika.BlockingConnection(pika.ConnectionParameters('localhost'))
channel = connection.channel()
channel.confirm_delivery()
def on_ack(method_frame):
    print('Message confirmed:', method_frame.delivery_tag)

def on_nack(method_frame):
    print('Message not confirmed:', method_frame.delivery_tag)

with ThreadPoolExecutor(max_workers = 10) as executor:
    for i in range(10):
        message = f"Message {i}"
        future = executor.submit(channel.basic_publish, exchange='test_exchange', routing_key='test_key', body=message)
        future.add_done_callback(lambda f: (on_ack(f.result()) if f.result() else on_nack(f.result())))

connection.close()

这种方式能极大提升发送效率,同时借助回调精准定位未确认消息,及时补发。

二、消息队列中间件自身保障

  1. 持久化存储
    消息队列服务器需将消息持久化到磁盘,以防节点故障、重启导致数据丢失。Kafka 使用日志文件的方式,把每个分区的消息顺序写入磁盘,配合定期刷盘策略,确保消息稳妥落地;RabbitMQ 则提供了队列持久化、消息持久化两种配置选项。例如在 RabbitMQ 管理界面创建队列时勾选“Durable”选项,发送消息时设置 delivery_mode 为 2 ,就能开启消息持久化:
channel.basic_publish(exchange='test_exchange', routing_key='test_key', body='message', properties=pika.BasicProperties(delivery_mode = 2))
  1. 集群与副本机制
    搭建消息队列集群是必不可少的高可用手段。像 Kafka 天生为分布式架构,通过多副本机制,每个分区的副本分布在不同 broker 节点上,一旦某个节点宕机,其他副本可无缝接替服务,保障消息不丢失与服务持续可用。 RabbitMQ 也支持集群模式,实现数据冗余与故障转移。

三、消费者端防丢策略

  1. 手动确认机制
    消费者接收消息后,默认自动确认会在消息刚接收就通知队列已处理,这很容易在后续业务处理出错时丢失消息。手动确认则把控制权交回给消费者,例如 RabbitMQ 中:
import pika

connection = pika.BlockingConnection(pika.ConnectionParameters('localhost'))
channel = connection.channel()
channel.basic_consume(queue='test_queue', on_message_callback=lambda ch, method, properties, body: print(body), auto_ack=False)
channel.start_consuming()

消费者成功处理完业务逻辑后,再调用 basic_ack 方法告知队列消息处理完成,要是处理过程报错,拒绝确认,队列会把消息重新分发给其他消费者或者留在队列等待重试。
2. 幂等性设计
即便有手动确认,也可能因网络抖动等原因,消费者重复收到同一消息。所以消费者业务逻辑要设计成幂等的,也就是多次处理同一消息,结果与一次处理一致。比如数据库插入操作,可以先查询主键是否存在,存在则跳过,以此确保无论消息重发几次,数据状态都不会出错,间接防止消息丢失带来的数据不一致问题。

通过在生产者、消息队列中间件、消费者这三个关键链路环节层层设防,才能让消息在复杂的分布式系统流转中稳如泰山,保障业务流程顺畅无误。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/2276464.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

Python爬虫-汽车之家各车系周销量榜数据

前言 本文是该专栏的第43篇,后面会持续分享python爬虫干货知识,记得关注。 在本专栏之前,笔者在文章《Python爬虫-汽车之家各车系月销量榜数据》中,有详细介绍,如何爬取“各车系车型的月销量榜单数据”的方法以及完整代码教学教程。 而本文,笔者同样以汽车之家平台为例,…

[C++]类与对象(上)

目录 💕1.C中结构体的优化 💕2.类的定义 💕3.类与结构体的不同点 💕4.访问限定符(public,private,protected) 💕5.类域 💕6.类的实例化 💕7.类的字节大小 💕8.类的字节大小特例…

Sprint Boot教程之五十八:动态启动/停止 Kafka 监听器

Spring Boot – 动态启动/停止 Kafka 监听器 当 Spring Boot 应用程序启动时,Kafka Listener 的默认行为是开始监听某个主题。但是,有些情况下我们不想在应用程序启动后立即启动它。 要动态启动或停止 Kafka Listener,我们需要三种主要方法…

《JavaWeb开发-javascript基础》

文章目录 《JavaWeb开发-javascript基础》1.javascript 引入方式2.JS-基础语法-书写语法2.1 书写语法2.2 输出语句 3.JS-基础语法-变量4.JS-基础语法-数据类型&运算符4.1 数据类型4.2 运算符4.3 数据类型转换 5. JS-函数6. JS-对象-Array数组7. JS-对象-String字符串8. JS-…

1.组件的三大组成部分注意点(结构/样式/逻辑)scoped解决样式冲突/data是一个函数2.组件通信组件通信语法父传子子传父

学习目标 1.组件的三大组成部分注意点(结构/样式/逻辑) scoped解决样式冲突/data是一个函数 2.组件通信 组件通信语法 父传子 子传父 非父子通信(扩展) 3.综合案例:小黑记事本(组件版) …

mapbox基础,expressions表达式汇总

👨‍⚕️ 主页: gis分享者 👨‍⚕️ 感谢各位大佬 点赞👍 收藏⭐ 留言📝 加关注✅! 👨‍⚕️ 收录于专栏:mapbox 从入门到精通 文章目录 一、🍀前言二、🍀Expressions简介2.1 expressions 操作符2.1.1 Data expressions2.1.2 Camera expressions2.2 Expressi…

一文清晰梳理Mysql 数据库

现在处于大四上学期的阶段,在大四下学期即将要进行毕业设计,所以在毕业设计开始之前呢,先将Mysql 数据库有关知识进行了一个梳理,以防选题需要使用到数据库。 1)什么是数据库? 简单理解数据库&#xff0c…

基于大语言模型的组合优化

摘要:组合优化(Combinatorial Optimization, CO)对于提高工程应用的效率和性能至关重要。随着问题规模的增大和依赖关系的复杂化,找到最优解变得极具挑战性。在处理现实世界的工程问题时,基于纯数学推理的算法存在局限…

安装conda 环境

conda create -n my_unet5 python3.8 conda activate my_unet5

容器技术全面攻略:Docker的硬核玩法

文章背景 想象一下,一个项目终于要上线了,结果因为环境配置不一致,测试服务器一切正常,生产环境却宕机了。这是开发者噩梦的开始,也是Docker救世主角色的登场!Docker的出现颠覆了传统环境配置的方式&#…

LabVIEW部署Web服务

目录 LabVIEW部署Web服务1、创建项目2、创建Web服务3、新建WebVI3.1、使用GET方法3.2、使用POST方法 4、 部署和对应URL4.1、应用程序:80804.2、本地调试:80094.3、NI Web服务器:9090(禁用) 5、测试5.1、测试GET方法5.2、测试POST方法 6、实际…

STM32 : 波特率发生器

波特率发生器 1. 发送器和接收器的波特率 波特率寄存器 (BRR): 在串行通信中,发送器和接收器的波特率是由波特率寄存器(BRR)中的一个值 DIV 来确定的。 2. 计算公式 计算公式: 详细解释 1. 波特率寄存器 (BRR) BRR: 波特率寄存器是一…

Excel数据叠加生成新DataFrame:操作指南与案例

目录 一、准备工作 二、读取Excel文件 三、数据叠加 四、处理重复数据(可选) 五、保存新DataFrame到Excel文件 六、案例演示 七、注意事项 八、总结 在日常数据处理工作中,我们经常需要将不同Excel文档中的数据整合到一个新的DataFra…

基于微信小程序的汽车销售系统的设计与实现springboot+论文源码调试讲解

第4章 系统设计 一个成功设计的系统在内容上必定是丰富的,在系统外观或系统功能上必定是对用户友好的。所以为了提升系统的价值,吸引更多的访问者访问系统,以及让来访用户可以花费更多时间停留在系统上,则表明该系统设计得比较专…

C#调用OpenCvSharp实现图像的开运算和闭运算

对图像同时进行腐蚀和膨胀操作,顺序不同则效果也不同。先腐蚀后膨胀为开运算,能够消除小斑点和细小的突出物、平滑图像以及改善边缘;先膨胀后腐蚀为闭运算,能够去除噪点、填补图像孔洞、连接邻近物体和平滑物体边界。   OpenCvS…

从 SQL 语句到数据库操作

1. SQL 语句分类 数据定义语言 DDL : 用于定义或修改数据库中的结构,如:创建、修改、删除数据库对象。create、drop alter 数据操作语言 DML : 用于添加、删除、更新数据库中的数据。select、insert alter、drop 数据控制语言 D…

django在线考试系统

Django在线考试系统是一种基于Django框架开发的在线考试平台,它提供了完整的在线考试解决方案。 一、系统概述 Django在线考试系统旨在为用户提供便捷、高效的在线考试环境,满足教育机构、企业、个人等不同场景下的考试需求。通过该系统,用…

【Spring Boot 应用开发】-04-01 自动配置-数据源-连接池

资源关闭 还记得上一节中的这段代码么? try {if (resultSet ! null) resultSet.close();if (preparedStatement ! null) preparedStatement.close();if (connection ! null) connection.close(); } catch (SQLException e) {e.printStackTrace(); }这是我们在查询…

AngularJs指令中出错:Error: $compile:nonassign Non-Assignable Expression

Expression {resumeId: item.resumeId} used with directive rwdsDelete is non-assignable! 在AngularJS中,$compile服务用于将指令编译成HTML。当你在模板中使用了一个表达式,但这个表达式不是一个左值(即不能被赋值的表达式)时…

Docker 的安装和基本使用[SpringBoot之Docker实战系列] - 第535篇

历史文章(文章累计530) 《国内最全的Spring Boot系列之一》 《国内最全的Spring Boot系列之二》 《国内最全的Spring Boot系列之三》 《国内最全的Spring Boot系列之四》 《国内最全的Spring Boot系列之五》 《国内最全的Spring Boot系列之六》 《…