如何保证消息只被消费一次

news2025/1/11 9:06:37

目录

前言

一、什么是幂等?

二、在生产过程中增加消息幂等性的保证

三、在消费过程中增加消息幂等性的保证 


前言

消息一旦被重复消费,就会造成业务逻辑处理的错误。那么我们要如何避免消息的重复呢?

想要完全的避免消息重复的发生是很难做到的,因为网络的抖动、机器的宕机和处理的异常都是比较难以避免的,在工业上并没有成熟的方法,因此我们会把要求放宽,只要保证即使消费到了重复的消息,从消费的最终结果来看和只消费一次是等同的就好了,也就是保证在消息的生产和消费的过程是“幂等”的。


一、什么是幂等

幂等是一个数学上的概念,它的含义是多次执行同一个操作和执行一次操作,最终得到的结果是相同的。

如果我们消费一条消息的时候,要给现有的库存数量减 1,那么如果消费两条相同的消息就会给库存数量减 2,这就不是幂等的。而如果消费一条消息后,处理逻辑是将库存的数量设置为 0,或者是如果当前库存数量是 10 时则减 1,这样在消费多条消息时,所得到的结果就是相同的,这就是幂等的。

二、在生产过程中增加消息幂等性的保证

在消息生产过程中,在 Kafka0.11 版本和 Pulsar 中都支持“producer idempotency”的特性,翻译过来就是生产过程的幂等性,这种特性保证消息虽然可能在生产端产生重复,但是最终在消息队列存储时只会存储一份。

它的做法是给每一个生产者一个唯一的 ID,并且为生产的每一条消息赋予一个唯一 ID,消息队列的服务端会存储 < 生产者 ID,最后一条消息 ID> 的映射。当某一个生产者产生新的消息时,消息队列服务端会比对消息 ID 是否与存储的最后一条 ID 一致,如果一致,就认为是重复的消息,服务端会自动丢弃。

三、在消费过程中增加消息幂等性的保证 

而在消费端,幂等性的保证会稍微复杂一些,你可以从通用层和业务层两个层面来考虑。在通用层面,你可以在消息被生产的时候,使用发号器给它生成一个全局唯一的消息 ID,消息被处理之后,把这个 ID 存储在数据库中,在处理下一条消息之前,先从数据库里面查询这个全局 ID 是否被消费过,如果被消费过就放弃消费。

你可以看到,无论是生产端的幂等性保证方式,还是消费端通用的幂等性保证方式,它们的共同特点都是为每一个消息生成一个唯一的 ID,然后在使用这个消息的时候,先比对这个ID 是否已经存在,如果存在,则认为消息已经被使用过。所以这种方式是一种标准的实现幂等的方式,你在项目之中可以拿来直接使用,它在逻辑上的伪代码就像下面这样:

     boolean isIDExisted = selectByID(ID); // 判断 ID 是否存在
     if(isIDExisted) {  
        return; // 存在则直接返回
     } else {  
        process(message); // 不存在,则处理消息
         saveID(ID); // 存储 ID  
    }

不过这样会有一个问题:如果消息在处理之后,还没有来得及写入数据库,消费者宕机了重启之后发现数据库中并没有这条消息,还是会重复执行两次消费逻辑,这时你就需要引入事务机制,保证消息处理和写入数据库必须同时成功或者同时失败,但是这样消息处理的成本就更高了,所以,如果对于消息重复没有特别严格的要求,可以直接使用这种通用的方案,而不考虑引入事务。

在业务层面怎么处理呢?这里有很多种处理方式,其中有一种是增加乐观锁的方式。比如,你的消息处理程序需要给一个人的账号加钱,那么你可以通过乐观锁的方式来解决。具体的操作方式是这样的:你给每个人的账号数据中增加一个版本号的字段,在生产消息时先查询这个账户的版本号,并且将版本号连同消息一起发送给消息队列。消费端在拿到消息和版本号后,在执行更新账户金额 SQL 的时候带上版本号,类似于执行:

update user set amount = amount + 20, version=version+1 where userId=1 
and version=1

你看,我们在更新数据时给数据加了乐观锁,这样在消费第一条消息时,version 值为 1,SQL 可以执行成功,并且同时把 version 值改为了 2;在执行第二条相同的消息时,由于version 值不再是 1,所以这条 SQL 不能执行成功,也就保证了消息的幂等性。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/1170771.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

全方位移动机器人 SolidWorks 转 URDF 并在 Rviz 中仿真

全方位移动机器人 SolidWorks 转 URDF 并在 Rviz 中仿真 参考 solidworks转URDF&#xff0c;并且在rviz中仿真 从solidworks导出URDF模型 Export a SolidWorks Assembly to URDF Solidworks模型导出urdf SolidWorks 模型简化 将整车除车轮部分另存为零件&#xff0c;作为一个…

MySQL连接时出现Host ‘::1‘ is not allowed to connect to this MySQL server

报错原因 之前想着要提高一下连接速度&#xff0c;所以在my.ini中加入了&#xff1a;skip-name-resolve&#xff0c;当时的数据库root账号设置的登录权限是%&#xff0c;因此没有出现连接错误&#xff0c;这次因为是新建数据库&#xff0c;root账号的登录权限默认是localhost&…

如何基于链表与数组实现栈

这里写目录标题 栈的基础知识基于数组实现栈基于链表实现栈 栈的基础知识 栈&#xff0c;又名堆栈&#xff0c;是一种受限的线性表&#xff0c;这意味着该线性表只能在一段进行插入或删除操作。具体来说&#xff0c;栈顶是允许进行插入或删除操作的一端&#xff0c;而相对的另…

零基础入门网络安全白帽黑客,挑战年薪30w!

最近好朋友老李说他想转渗透测试。 他说&#xff1a;运维这块干了3年了&#xff0c;感觉自己目前有点迷茫&#xff0c;不知该怎么去提升了。而在日常工作生活当中&#xff0c;黑客攻击可以说是很常见了&#xff0c;他感觉到网络安全越来越重要&#xff0c;对软件测试的要求也不…

大疆Livox MID-360安装ROS1/2驱动 Ubuntu20.04

文章目录 一、接线连接二、安装上位机可视化工具三、安装ROS驱动3.1 配置静态IP3.2 安装Livox SDK23.3 安装ROS驱动3.4 驱动 本文介绍如何在Ubuntu20.04中安装大疆Livox MID-360的ROS1/2驱动 一、接线连接 livox航插一分三线&#xff0c;其中航空母头连接激光雷达&#xff0c…

Spring | Sring Task (定时任务框架) 、微信小程序开发

目录&#xff1a; 一、Sring Task (定时任务框架) &#xff1a;Sring Task介绍Spring Task应用场景corn表达式corn表达式在线生成器SpringTask入门案例&#xff1a;导入maven依赖启动类上添加 EnableScheduling 注解定时方法上添加 Scheduled( cron “xxxxx” ) 注解自定义“定…

ZKP Introduction of Nova (Yu Guo) 手写笔记

ZKP学习笔记 郭宇老师Nova课程手写笔记

创建asp.net api和docker-compose项目

vs2022创建asp.net core web api项目 创建完成 添加docker-compose支持 添加成功 docker配置 docker-compose配置

腾讯云3年轻量2核2G4M服务器从366.6元三年涨价了?

2023腾讯云双11优惠活动3年轻量应用服务器涨价了&#xff1f;确实是涨价了&#xff0c;仅限于三年时长轻量应用服务器&#xff0c;一年时长并没有涨价&#xff0c;相比隔壁阿里云&#xff0c;腾讯云依旧在提供三年轻量应用服务器和5年时长云服务器CVM已经很难得了&#xff0c;想…

集线器、交换机、网桥、路由器、网关

目录 集线器(HUB)交换机(SWITCH)网桥(BRIDGE)路由器(ROUTER)网关(GATEWAY)交换机和路由器的区别参考 集线器(HUB) 功能 集线器对数据的传输起到同步、放大和整形的作用 属于物理层设备 工作机制 使用集线器互连而成的以太网被称为共享式以太网。当某个主机要给另一个主机发送单…

如何使用 SwiftUI 中新地图框架 MapKit

文章目录 前言MapKit 弃用项MapContentBuilder&#xff08;iOS 17&#xff09;地图交互地图样式地图控件地图相机位置总结 前言 了解 iOS 17 中的 MapKit 后&#xff0c;我们会发现 Apple 引入了更适合 SwiftUI 的 API。 MapKit 弃用项 一旦将你的 App 目标更新到 iOS 17&am…

OpenGL ES入门教程(二)之绘制一个平面桌子

OpenGL ES入门教程&#xff08;二&#xff09;之绘制一个平面桌子 前言0. OpenGL绘制图形的整体框架概述1. 定义顶点2. 定义着色器3. 加载着色器4. 编译着色器5. 将着色器链接为OpenGL程序对象6. 将着色器需要的数据与拷贝到本地的数组相关联7. 在屏幕上绘制图形8. 让桌子有边框…

立创eda 焊接辅助工具使用

立创EDA为板级EDA设计软件。EDA指的是通过计算机的辅助完成电路原理图、印刷电路板文件等的绘制、制作、仿真设计。 立创EDA是一款基于浏览器的&#xff0c;专为中国人设计的&#xff0c;友好易用的EDA设计工具。起于2010年&#xff0c;完全由中国人独立开发&#xff0c;拥有独…

Ubuntu下安装vscode,并解决终端打不开vscode的问题

Visual Studio Code安装 1&#xff0c;使用 apt 安装 Visual Studio Code 在官方的微软 Apt 源仓库中可用。按照下面的步骤进行即可&#xff1a; 以 sudo 用户身份运行下面的命令&#xff0c;更新软件包索引&#xff0c;并且安装依赖软件&#xff1a; sudo apt update sud…

女孩子穿这种粉粉嫩嫩~的卫衣也太好看了吧

果然女孩子穿这种粉粉嫩嫩的衣服 真的超级有甜美可爱氛围哎 软糯亲肤的面料&#xff0c;上身很舒服哦 时尚polo领加上半拉链设计 既实用又美观&#xff0c;穿脱很方便

如何使用Selenium处理Cookie,今天彻底学会了!

01、cookie介绍 HTTP协议是无状态的协议。一旦数据交换完毕&#xff0c;客户端与服务器端的连接就会关闭&#xff0c;再次交换数据需要建立新的连接&#xff0c;这就意味着服务器无法从连接上跟踪会话。也就是说即使第一次和服务器连接后并且登录成功后&#xff0c;第二次请求…

进程终止(不同情况+如何查看:strerror,echo $?),终止的方法(return,exit,_exit),exit和_exit的不同

目录 进程终止 进程终止是什么 进程终止的情况 代码跑完,结果正确/不正确 提前知道结果 不知道结果 strerror 示例 -- echo $? 代码未跑完,程序崩溃 示例 进程退出方法 return退出码 exit(status) _exit(status) exit()和_exit()的不同 示例 缓冲区位置 进…

华为云RDS数据库(Mysql)不买公网IP无法Navicate连接

前言 最近公司有一个项目甲方为了便宜购买了华为云的ECS服务器与RDS云数据库&#xff08;Mysql&#xff09;进行项目部署&#xff0c;实际部署数据库时发现&#xff0c;华为云的数据库需要购买公网IP才能使用Navicate连接数据库&#xff08;不可思议的我还提交工单确认了一下以…

FlexmonsterPivotTable-2.9.63 LICENSE

FlexmonsterPivotTable-v2.9.63用于网络报告的数据透视表组件&#xff0c;用于可视化业务数据的最强大的 JavaScript 工具 与任何技术堆栈集成 该组件可与任何技术堆栈无缝协作&#xff1a; 与Angular、React、jQuery、Vue等 完美集成 没有服务器端依赖项 只需几行代码 即可开始…

ROS笔记之TF坐标变换

ROS笔记之TF坐标变换 code review! 文章目录 ROS笔记之TF坐标变换一些相关函数的用法tf::TransFormBroadcaster tf1; tf1.sendTransform()tf::StampedTransform()tf::Transform()tf::Vector3()详解br.sendTransform(tf::StampedTransform(tf::Transform(tf::Quaternion::getI…