Kafka保证消息幂等以及解决方案

news2025/1/22 16:09:16

1、幂等的基本概念
幂等简单点讲,就是用户对于同一操作发起的一次请求或者多次请求的结果是一致的,不会产生任何副作用。幂等分很多种,比如接口的幂等、消息的幂等,它是分布式系统设计时必须要考虑的一个方面。

查询操作(天然幂等)
查询一次和查询多次,在数据不变的情况下,查询结果是一样的。查询是天然的幂等操作删除操作 (天然幂等) 删除操作也是幂等的,删除一次和删除多次都是把数据删除(注意可能返回结果不一样,删除的数据不存在返回 0,删除的数据多条,返回结果多个)。

删除操作 (天然幂等)
删除操作也是幂等的,删除一次和删除多次都是把数据删除(注意可能返回结果不一样,删除的数据不存在, 返回 0,删除的数据多条,返回结果多个).

新增操作
新增操作,这种情况下多次请求,可能会产生重复数据;

修改操作
修改操作,如果只是单纯的更新数据,比如: update account set money=100 where id=1,是没有问题的,如果还有计算,比如: update account set money=money+100 where id=l,这种情况下多次请求,可能会导致数据错误。

总结:当出现消费者对某条消息重复消费的情况时,重复消费的结果与消费一次的结果是相同的,并且多次消费并未对业务系统产生任何负面影响,那么这个消费者的处理过程就是幂等的。例如,在支付场景下,消费者消费扣款消息,对一笔订单执行扣款操作,扣款金额为 100 元。如果因网络不稳定等原因导致扣款消息重复投递,消费者重复消费了该扣款消息,但最终的业务结果是只扣款一次扣费 100 元,且用户的扣款记录中对应的订单只有一条扣款流水,不会多次扣除费用。那么这次扣款操作是符合要求的,整个消费过程实现了消费幂等。

2、产生消息重复的原因
在可联网应用中,尤其在网络不稳定的情况下,消息队列的消息有可能会出现重复,如果消息重复消费会影响您的业务处理,请对消息做幂等处理。消息重复的可能原因如下:

发送时消息重复 当一条消息已被成功发送到服务端并完成持久化,此时出现了网络闪断或者生产者宕机,导致服务端对生产者应答失败。 如果此时生产者 Producer 意识到消息发送失败并尝试再次发送消息,消费者 Consumer 后续会收到两条内容相同的消息。

投递时消息重复 消息消费的场景下,消息已投递到消费者 Consumer 并完成业务处理,当消费者给服务端反馈应答的时候网络闪断,为了保证消息至少被消费一次,消息队列的服务端将在网络恢复后再次尝试投递之前已被处理过的消息,消费者 Consumer 后续会收到两条内容相同的消息;

负载均衡时消息重复 当消息队列的服务端或消费者重启、扩容或缩容时,都有可能会触发 rebalance,此时消费者 Consumer 可能会收到重复消息。

3、解决方案及案例分析
既然消息可能会产生重复,那如何解决消息幂等的问题呢?我们需要从生产者、中间件、消费者这几个不同层面,来保证消息的幂等,[消息的幂等业界有很多种方案,我这里列出常见的几种方案供大家参考

3.1 设置业务唯一 key 方案 (应用最广泛)
业务唯一key 可以是单个字段或者组合字段,这个方案是怎么实现的呢?

生产者消息休构造业务唯一 key,消息端针对这个 key 加分布式锁;

在消费端,创建一个消息防重表,利用插入记录唯一健约束控制, 但是这会与业务有一定的耦合,另外高并发下频繁对消息防重表进行操作,性能比较低,不太建议使用,我们通常是在消费端加一个redis分布式锁,防止短期内消息的重复投递;

数据库业务表加唯一索引 (数据库)

以用户在积分商城下单为例,具体业务流程如下:

1、客户发起支付流程;

2、生产者生产消息构造一个订单号作为消息体幂等的唯一 key;

3、发送消息给 broker,broker 持久化消息到磁盘;

4、消费者开始消费消息,在消费逻辑中加一个分布式锁,key为订单号,防止短时间内消息重复投递;

5、当加锁成功后,执行核心业务逻辑,然后释放分布式锁,当加锁失败,直接结束;

6、最后,为了防止后续生产者重复推送相同唯一key 的消息我们需要在数据库的业务表中给这个订单号加一个唯一索引,通过唯一健约束来保证数据库表不会出现两条相同的记录,从而实现消息幂等

3.2 设置业务唯一id方案
这个其实跟上一个方案类似,只是唯一id是需要我们通过 分布式id服务 生成,其他的处理方法跟上一个方案一样。

3.3 基于业务的状态机方案
在设计单据相关的业务,或者是任务相关的业务,肯定会涉及到状态机(状态变更图),我们以业务单据为例:在业务单据上面会有个状态,状态在不同的情况下会发生变更,一般情况下存在有限状态机,当消费业务消息的时候,如果状态机已经处于下一个状态,这时候来了一个上一个状态的消息,直接丢弃消息不处理,保证了有限状态机的幂等。

3.4 基于version版本号的乐观锁方案
此方案一般是适用于更新业务的场景,更新表的时候通过版本号对比来保证消息的幂等

具体业务流程
1、客户购买商品,完成后准备发送一条扣减账户200 的消息;

2、生产端开始生产消息,构造消息体{ id=1,money=200,version=1 };

3、发送消息给 broker,broker 持久化这条消息后,返回确认消息给生产者,此时时出现了网络闪断或者生产者宕机,导致 broker 对生产者响应失败, 如果此时生产者意识到消息发送失败并尝试再次发送消息,消费者后续会收到两条内容相同的消息;

4、消费者收到相同的消息,开始消费第一条消息{id=1,money=200,version=1},根据 version=1 更新记录更新成功;

5、接着开始消费第二条消息{id=1,money=200,version=l },根据 version=1更新记录,但是此时 version 已经被更新为 2,条件不满足更新失败;

6、消费者通过基于version的乐观锁保证了消息幂等。

3.5 insert ... on duplicate key update 方案
此方案一般适合一些统计更新类的业务或者定时同步第三方平台数据到自己数据库的场景,例如: 定时同步企业微信的成员数据到自已企微库的成员表,就可以采用这种方案实现。

on dupdate key update 语句基本功能是:当表中没有原来记录时,就插入,有的话就更新。

1. on duplicate key update 语句根据主键id或唯一键来判断当前插入是否已存在。
2. 记录已存在时,只会更新on duplicate key update之后指定的字段。
3. 如果同时传递了主键和唯一键,以主键为判断存在依据,唯一键字段内容可以被修改。

具体业务流程:
1、张三关注某 A 公司企业微信,加人深圳区企微群;
2、管理员在深圳区企微群投放一个活动,张三第一次点击这个活动,这个时候活动模块发送一条 kafka 消息;
3、在数据库创建一张活动效果统计表,act_code 和 usr_id 两个字段作为联合唯一索引;
4、数据处理模块消费这条消息,通过 insert on duplicate key update 插人一条记录;
5、过了几小时,张三第二次点击这个活动,这个时候活动模块再发送一条 kafka 消息,数据处理模块再次消费这条消息,通过 insert...on duplicate key update 更新对应唯一索引的这条记录的更新时间字段;
6、通过 insert...on duplicate key update 命令,可以实现数据库表不会出现重复的记录,还能实现业务的更新逻辑。

补充:

消息消费失败的时候,可以做好监控报警,以便进行人工干预;
消费消息的方法,确保在同一个事务,以便消费失败的时候,可以回滚;
 

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/1083637.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

如何使用数字化系统赋能企业营销?数字化系统对于企业的作用?

数字化系统对于企业的营销活动能够提供多方面的帮助。 1. 数字化系统可以帮助企业更精准地了解客户的需求和行为,从而更好地定位产品和服务。如蚓链数字化营销系统可以通过数据分析和挖掘,帮助企业深入了解客户群体的特点和喜好,进而调整企业…

zabbix监控项

一、监控项(items) 1、获取监控数据的方式: ① zabbix-agent:代理程序是在被监控主机上运行的软件,负责收集和报告有关主机性能和状态的数据,监控系统通过与代理程序通信来获取数据。 ② SNMP&#xff1…

java基础 递归

递归注意事项: 递归算法遍历目录 递归算法删除目录: package wwx;import jdk.swing.interop.SwingInterOpUtils;import java.io.File; import java.io.IOException;public class Test {public static void main(String[] args) {File f new File(&quo…

【HarmonyOS】低代码平台组件拖拽使用技巧之滚动容器

【关键字】 HarmonyOS、低代码平台、组件拖拽、滚动容器 1、写在前面 上一篇中我们介绍了低代码平台组件库中的堆叠容器的一些拖拽技巧,本篇我们继续,今天带大家一些来看一个新的组件——滚动容器。关于滚动容器的直接使用其实很简单,所以这…

2023年医药商业行业发展研究报告

第一章 行业概况 1.1 定义 医药商业行业,作为医药领域的重要组成部分,扮演着至关重要的角色。这一行业专注于医药商品的经营与流通,确保药品能够有效、安全地到达消费者手中。随着医药科技的进步和市场需求的增长,医药商业行业在…

uni-app编程checkbox-group获取选中的每个checkbox的value值

uni-app编程checkbox-group获取选中的每个checkbox的value值_uniappcheckboxvalue-CSDN博客

前端-Vue-element-开发指南

电商管理系统框架 Vue-element 电商管理系统 电商管理系统框架1 介绍了解 项目实战的学习目标2 电商项目基本业务概述3 后台管理系统功能划分4 项目开发模式技术选型5 项目初始化可视化面板 配置 6 配置码云SSH7 托管云8 安装mysql9 API服务器Postman调试10 分析登陆和token原理…

信钰证券:买基金有哪些平台?

跟着人们理财意识的前进,出资基金成为了一种受欢迎的方法。但是关于一般出资者来说,在许多的基金公司平缓台中选择一个可靠的、确保出资安全的途径来购买基金却是一件扎手的作业。本文将从多个角度剖析不同的购买途径,以协助出资者作出更正确…

手写 分页

子组件&#xff1a;TimePage.vue 效果图 <template><div class"click-scroll-X"><!-- 上 --><!-- eslint-disable-next-line --><span class"left_btn" :disabled"pageNo 1" click"leftSlide"><&…

记录一次线上fullgc问题排查过程

某天&#xff0c;接到测试部门反馈说线上项目突然很快&#xff0c;由于当前版本代码和上一版本相比就多了一个刚上线了一个5分钟1次的跑批任务&#xff0c;先关闭次任务后观察是否卡顿&#xff0c;并检查堆内存是否使用完造成频繁gc 1.通过jmap命令查看堆内存中的对象 2.生成当…

解决:uniapp项目中调用小程序的chooseAddress() API失效

目录 问题描述 解决方案 问题描述 使用 Hbuilder X 编辑器和 uni-app 框架开发小程序项目&#xff0c;在调用小程序提供的 uni.chooseAddress() API实现选择收货地址的功能时&#xff0c;点击选择收货地址没有反应&#xff0c;获取不到用户收货地址&#xff0c;API失效了 …

【python获取.doc内表格指定单元格数据】

python获取.doc内表格指定单元格数据 需求说明代码部分 需求说明 读取.doc内表格中指定数据并以对象形式输出 1、获取第一张表内&#xff1a;部门编码、物料编码、退库数量(数字型)、物料质量问题描述、物料来源、填写人数据 2、获取第二张表格内&#xff1a;采购人员ID案例表…

简单列举客户关系管理系统的核心功能

CRM客户关系管理系统能够帮助企业收集、管理、维护客户信息&#xff0c;轻松跟进客户全过程&#xff0c;提供高质量的客户服务&#xff0c;通过数据分析为管理数据赋能。CRM系统能够让企业以客户为中心&#xff0c;实现全维度管理。那么&#xff0c;CRM系统的核心功能有哪些&am…

如何实现全网采集

品牌在控价过程中&#xff0c;需要对产品链接的价格进行监测&#xff0c;监测就要先做采集&#xff0c;采集完成后&#xff0c;再对数据进行促销信息的汇总计算&#xff0c;得出到手价&#xff0c;输出低价报表&#xff0c;从过程中可以看出&#xff0c;采集是非常关键的一步&a…

简单理解JS回调函数(callback)

一、回调函数到底是什么&#xff1f; 其实回调函数&#xff08;没有调用也会执行&#xff09;就是一个参数&#xff0c;把这个参数传到另一个函数里面&#xff0c;也就是主函数里面&#xff0c;当主函数里面的事情干完再回头去执行当做参数传进去的回调函数&#xff0c;回头去调…

Kelper.js 笔记 python交互

1 加载Kepler 地图 KeplerGl() 1.1 主要参数 height 可选 默认值&#xff1a;400 地图显示的高度 data 数据集 字典&#xff0c;键是数据集的名称 config地图配置字典 1.2 举例 from keplergl import KeplerGlmap_KeplerGl() map_ 默认的位置 1.3 添加自己的图 1.3.1 读…

关键词搜索苏宁商品列表数据,苏宁商品列表数据接口,苏宁API接口

在网页抓取方面&#xff0c;可以使用 Python、Java 等编程语言编写程序&#xff0c;通过模拟 HTTP 请求&#xff0c;获取苏宁网站上的商品页面。在数据提取方面&#xff0c;可以使用正则表达式、XPath 等方式从 HTML 代码中提取出有用的信息。值得注意的是&#xff0c;苏宁网站…

试过GPT-4V后,微软写了个166页的测评报告,业内人士:高级用户必读

一周之前&#xff0c;ChatGPT迎来重大更新&#xff0c;不管是 GPT-4 还是 GPT-3.5 模型&#xff0c;都可以基于图像进行分析和对话。与之对应的&#xff0c;多模态版GPT-4V模型相关文档也一并放出。当时 OpenAI 放出的文档只有18页&#xff0c;很多内容都无从得知&#xff0c;对…

作战仿真试验理论体系研究

源自&#xff1a;装甲兵工程学院学报 作者&#xff1a;徐享忠&#xff0c;杨建东&#xff0c;郭齐胜 “人工智能技术与咨询” 发布 摘要 建立了作战仿真试验的概念框架&#xff0c;以预测论、作战仿真理论和试验理论为基础理论&#xff0c;以仿真试验目标、仿真试验模式、…

Java基于SpringBoot+Vue的汽车租赁系统

1 简介 致远汽车租赁管理方面的任务繁琐,以至于公司每年都在致远汽车租赁管理这方面投入较多的精力却效果甚微,致远汽车租赁系统的目标就是为了能够缓解致远汽车租赁管理工作方面面临的压力,让致远汽车租赁管理方面的工作变得更加高效准确。 文章首发地址 2 技术栈 开发语言…