kafka如何保证消息不丢失 不重复消费 消息的顺序

news2025/1/10 22:28:18

如何保证消息的不丢失

消息为什么会丢失

在这里插入图片描述
想要保证消息不丢失就要首先知道消息为什么会丢失,在哪个环节会丢失,然后在丢失的环节做处理

1.生产者生产消息发送到broker,broker收到消息后会给生产者发送一个ack指令.生产者接收到broker发送成功的指令,这个时候我们就可以认为消息发送成功了.没有接收到ack指令我们就认为消息发送失败.

 public <T,Throwable> void sendEventByKafka(String topic, String content ,T t, KafkaSendErrorCallback<T, java.lang.Throwable> function) {
        kafkaTemplate.send(topic, content).addCallback(success -> {
            log.info("执行kafka消息发送kafka成功!");
            log.info(content);
        }, failure -> {
            log.error("执行kafka消息发送kafka失败!");
            //失败的消息保存到消息表
            function.saveMqDb(t,failure);
        });
    }

上述的逻辑有个前提条件就是,确定broker确实是接受并保存了消息.需要设置ack的级别
acks=0(不等待确认):
在这种模式下,生产者发送消息后不会等待来自Broker的任何确认。它会立即继续发送下一条消息。
这是最低延迟的选项,但也是最不可靠的,因为生产者无法知道消息是否已经成功到达Broker。
acks=1(Leader确认):
在这种模式下,生产者发送消息后会等待Broker的领导者(Leader)确认。领导者会确认消息已经被接收,但不一定已经被完全复制到所有的副本。
这种模式提供了一定程度的可靠性,因为生产者知道消息至少已经被领导者接收,但仍然可能丢失消息,因为它们可能还没有被复制到其他副本。
acks=all(全部确认):
这是最可靠的确认模式,在这种模式下,生产者发送消息后会等待所有的ISR(In-Sync Replicas,同步副本)确认。ISR是分区的所有副本中与领导者保持同步的副本集合。
在这种模式下,消息只有在被领导者和所有同步副本都确认接收后才被视为已提交。这确保了消息的可靠性。

如何保证不重复消费

2.不重复消费,在处理业务时,用唯一建来处理,如果没有唯一建,可以借助消息表来做,处理完了之后给这条消息打个已处理的标记.

.消费者接受消息处理业务给broker发送ack,broker认为消息消费成功,删除这条消息

    @KafkaListener(id = KafkaConstants.MESSAGE_GROUP, topics = KafkaConstants.MESSAGE_TOPIC, concurrency = "5")
    public void listen(ConsumerRecord<?, ?> record, Acknowledgment acknowledgment) {
       try{
           //处理业务数据
       }catch (Exception e){
           //消费失败后的处理,保存到消息表
       }finally {
           //ack确认
           acknowledgment.acknowledge();
       }

    }

如何保证消息的顺序

为什么顺序会乱.kafka在生产者生产消息的时候使我们代码控制的,可以保证顺序,比如付款成功后我先发送一个修改订单状态的消息,再发送一个扣减库存的消息,再发送一个物流通知的消息 一个topic 一个partion 代码写入的顺序就是消息的顺序.如果只有一个消费者监听一个partion也是可以保证顺序的.但是多个消费者监听同一个partion消费者2执行完成 消费者1.3还没有执行.这样顺序就乱了.
一个 topic,一个 partition,一个 consumer,内部单线程消费,单线程吞吐量太低,一般不会用这个。
写 N 个内存 queue,具有相同 key 的数据都到同一个内存 queue;然后对于 N 个线程,每个线程分别消费一个内存 queue 即可,这样就能保证顺序性。(我们就是这么干的,相同的key的数据在一个队列里面,然后使用多线程开worker按照key不同进行分别消费)

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/1265191.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

深度学习早停机制(Early Stopping)与早退机制(Early exiting)

早停机制&#xff0c;一种机器学习模型调优策略&#xff0c;提升调优效率 下图损失值明显经过了欠拟合到过拟合 使用早停机制后&#xff0c;模型不再过拟合 模型早停是面向模型训练过程的。而在模型内部&#xff0c;也会出现类似的现象&#xff0c;这一现象被叫做过度思考(Ove…

Android Studio 模拟器设置独立窗口

目录 模拟器在窗口内部运行 设置成独立窗口 模拟器在窗口内部运行 操作起来十分不便 设置成独立窗口 Android Studio -> Preferences(Settings) -> Tools-> Emulator ->取消勾选 Launch in a tool window -> 点击右下角的 OK 按钮 -> 重启 Android Studio

0 NLP: 数据获取与EDA

0数据准备与分析 二分类任务&#xff0c;正负样本共计6W&#xff1b; 数据集下载 https://github.com/SophonPlus/ChineseNlpCorpus/raw/master/datasets/online_shopping_10_cats/online_shopping_10_cats.zip 样本的分布 正负样本中评论字段的长度 &#xff0c;超过500的都…

【Python基础】爬取豆瓣电影Top250+爬取知乎专栏文章标题

&#x1f4e2;&#xff1a;如果你也对机器人、人工智能感兴趣&#xff0c;看来我们志同道合✨ &#x1f4e2;&#xff1a;不妨浏览一下我的博客主页【https://blog.csdn.net/weixin_51244852】 &#x1f4e2;&#xff1a;文章若有幸对你有帮助&#xff0c;可点赞 &#x1f44d;…

一键删除方舟编译器缓存文件js、js.map插件ArkCompilerSupport

新手学习鸿蒙开发&#xff0c;发现DevEco Studio编译过种会生成js、js.map&#xff0c;在论坛上看了其它开发者也提了问题但无没解决&#xff0c;写了一个插件大家试下&#xff1a; https://plugins.jetbrains.com/plugin/23192-arkcompilersupport 源码&#xff1a;https://g…

服务器中深度学习环境的配置

安装流程 11.17 日&#xff0c;周末去高校参加学术会议&#xff0c;起因&#xff0c; 由于使用了某高校内的公共有线网络&#xff0c; 远程连接服务器后&#xff0c;黑客利用 ssh 开放的 22 端口&#xff0c; 篡改了主机的配置&#xff0c; 使得只要一连上网络&#xff0c; 服…

Python变量及其使用

无论使用什么语言编程&#xff0c;总要处理数据&#xff0c;处理数据就需要使用变量来保存数据。 形象地看&#xff0c;变量就像一个个小容器&#xff0c;用于“盛装”程序中的数据。常量同样也用于“盛装”程序中的数据。常量与变量的区别是&#xff1a;常量一旦保存某个数据…

全局配置

1.全局配置文件及其配置项 1.1.小程序窗口 1.2 窗口节点 1.2.1 导航栏标题 标题&#xff1a; 标题颜色&#xff1a; 背景色&#xff1a;只支持16进制值 下拉刷新&#xff1a; 刷新背景色&#xff1a; 刷新样式&#xff1a; 触底距离&#xff1a;

Docker 安装kafka 并创建topic 进行消息通信

Apache Kafka是一个分布式流处理平台&#xff0c;用于构建高性能、可扩展的实时数据流应用程序。本文将介绍如何使用Docker容器化技术来安装和配置Apache Kafka。 一、使用镜像安装 1、kafka安装必须先安装Zookpper 2、下载镜像 docker pull wurstmeister/kafka 3、查看下载…

(三)Pytorch快速搭建卷积神经网络模型实现手写数字识别(代码+详细注解)

提示&#xff1a;文章写完后&#xff0c;目录可以自动生成&#xff0c;如何生成可参考右边的帮助文档 文章目录 前言Q1&#xff1a;卷积网络和传统网络的区别Q2:卷积神经网络的架构Q3:卷积神经网络中的参数共享&#xff0c;也是比传统网络的优势所在4、 具体的实现代码网络搭建…

美创科技受邀亮相第二届全球数字贸易博览会

11月23日-27日&#xff0c;由浙江省人民政府、商务部共同主办的第二届全球数字贸易博览会&#xff08;以下简称“数贸会”&#xff09;圆满落幕。围绕“国家级、国际性、数贸味”的目标定位&#xff0c;以“数字贸易 商通全球”为主题&#xff0c;数贸会重点展示数字贸易全产业…

哈希函数:保护数据完整性的关键

&#x1f90d; 前端开发工程师&#xff08;主业&#xff09;、技术博主&#xff08;副业&#xff09;、已过CET6 &#x1f368; 阿珊和她的猫_CSDN个人主页 &#x1f560; 牛客高级专题作者、在牛客打造高质量专栏《前端面试必备》 &#x1f35a; 蓝桥云课签约作者、已在蓝桥云…

佳易王个体诊所管理系统电子处方软件,个体诊所人员服务软件,卫生室配方模板电子病历系统教程

佳易王个体诊所管理系统电子处方软件&#xff0c;个体诊所人员服务软件&#xff0c;卫生室配方模板电子病历系统教程 软件试用版下载可以点击最下方官网卡片 软件功能&#xff1a; 1、配方模板&#xff1a;可以自由添加配方分类&#xff0c;预先设置药品配方&#xff0c;可以…

字符串逆序问题

写一个函数&#xff0c;可以将任意输入的字符串逆序&#xff08;要可以满足多组输入&#xff09; 这个题有三个点 1.要读入键盘输入的字符串&#xff0c;所以要用到字符串输入函数 2.可以进行多组输入 3.把输入的n组字符串都逆序 #define _CRT_SECURE_NO_WARNINGS 1 #incl…

[栈迁移+ret滑梯]gyctf_2020_borrowstack

题目来源buuctf——gyctf_2020_borrowstack 参考链接https://www.shawroot.cc/2097.html 题目信息ubuntu16、64位 第一个read仅溢出一个机器字长&#xff0c;需要栈迁移 解题步骤栈偏移到全局变量bank中&#xff0c;ret2libcgadget 关键步骤 ret滑梯 第二个payload需要添…

Android flutter项目 启动优化实战(一)使用benchmark分析项目

背景描述 启动时间是用户对应用的第一印象&#xff0c;较慢的加载会对用户的留存和互动造成负面影响 在刚上线的B端项目中&#xff1a; 1.提高启动速度能提高整体流程的效率 2.提高首次运行速度能提高应用推广的初体验效果 问题描述 项目刚上线没多久、目前存在冷启动过程存在…

《融合SCADA系统数据的天然气管道泄漏多源感知技术研究》误报数据识别模型开发

数据处理不作表述。因为我用的是处理后的数据&#xff0c;数据点这。 文章目录 工作内容1CC040VFD电流VFD转速压缩机转速反馈进出口差压 紧急截断阀开到位进出电动阀开到位发球筒电筒阀开到位收球筒电动阀开到位电动阀2005开到位越站阀开到位 工作内容2工作内容3 工作内容1 任…

【Python 训练营】N_12 打印菱形图案

题目 打印菱形图案 分析 先把图形分成两部分来看待&#xff0c;前四行一个规律&#xff0c;后三行一个规律&#xff0c;利用双重for循环&#xff0c;第一层控制行&#xff0c;第二层控制列。 答案 # 方法一 for i in range(4):block **(2*i1)print({:^7}.format(block))…

web:NewsCenter

题目 打开页面显示如下 页面有个输入框&#xff0c;猜测是sql注入&#xff0c;即search为注入参数点&#xff0c;先尝试一下 返回空白显示错误 正常显示如下 是因为单引号与服务端代码中的’形成闭合&#xff0c;输入的字符串hello包裹&#xff0c;服务端代码后面多出来一个‘导…