【Kafka】Kafka的重复消费和消息丢失问题

news2025/1/19 11:06:48

文章目录

    • 前言
    • 一、重复消费
      • 1.1 重复消费出现的场景
        • 1.1.1 Consumer消费过程中,进程挂掉/异常退出
        • 1.1.2 消费者消费时间过长
      • 1.2 重复消费解决方案
        • 1.2.1 针对于消费端挂掉等原因造成的重复消费问题
        • 1.2.2 针对于Consumer消费时间过长带来的重复消费问题
    • 二、消息丢失
      • 2.1 生产端问题
      • 2.2 消费端问题
    • 三、参考

前言

在Kafka中,生产者(Producer)和消费者(Consumer)是通过发布订阅模式进行协作的,生产者将消息发送到Kafka集群,而消费者从Kafka集群中拉取消息进行消费,无论是生产者发送消息到Kafka集群还是消费者从Kafka集群中拉取消息进行消费,都是容易出现问题的,比较典型的就是消费端的重复消费问题、生产端和消费端产生的消息丢失问题。下面将对这两个问题出现的场景以及常见的解决方案进行讲解。

一、重复消费

1.1 重复消费出现的场景

重复消费出现的常见场景主要分为两种,一种是 Consumer在消费过程中,应用进程被强制kill掉或者发生异常退出(挂掉…),另一种则是Consumer消费的时间过长。

1.1.1 Consumer消费过程中,进程挂掉/异常退出

在Kafka消费端的使用中,位移(Offset)的提交有两种方式,自动提交和手动提交。自动提交情况下,当消费者拉取一批消息进行消费后,需要进行Offset的提交,在消费端提交Offset之前,Consumer挂掉了,当Consumer重启后再次拉取Offset,这时候拉取的依然是挂掉之前消费的Offset,因此造成重复消费的问题。在手动提交模式下,在提交代码调用之前,Consumer挂掉也会造成重复消费。

1.1.2 消费者消费时间过长

Kafka消费端的参数max.poll.interval.ms定义了两次poll的最大间隔,它的默认值是 5 分钟,表示 Consumer 如果在 5 分钟之内无法消费完 poll方法返回的消息,那么Consumer 会主动发起“离开组”的请求。

在离开消费组后,开始Rebalance,因此提交Offset失败。之后重新Rebalance,消费者再次分配Partition后,再次poll拉取消息依然从之前消费过的消息处开始消费,这样就造成重复消费。而且若不解决消费单次消费时间过长的问题,这部分消息可能会一直重复消费。

整体上来说,如果我们在消费中将消息数据处理入库,但是在执行Offset提交时,Kafka宕机或者网络原因等无法提交Offset,当我们重启服务或者Rebalance过程触发,Consumer将再次消费此消息数据。

1.2 重复消费解决方案

1.2.1 针对于消费端挂掉等原因造成的重复消费问题

这部分主要集中在消费端的编码层面,需要我们在设计代码时以幂等性的角度进行开发设计,保证同一数据无论进行多少次消费,所造成的结果都一样。处理方式可以在消息体中添加唯一标识(比如将消息生成md5保存到Mysql或者是Redis中,在处理消息前先检查下Mysql/Redis是否已经处理过该消息了),消费端进行确认此唯一标识是否已经消费过,如果消费过,则不进行之后处理。从而尽可能的避免了重复消费。

1.2.2 针对于Consumer消费时间过长带来的重复消费问题

  • 提高单条消息的处理速度。例如对消息处理中比较耗时的操作可通过异步的方式进行处理、利用多线程处理等。
  • 其次,在缩短单条消息消费时常的同时,根据实际场景可将max.poll.interval.ms值设置大一点,避免不必要的rebalance,此外可适当减小max.poll.records的值,默认值是500,可根据实际消息速率适当调小。

二、消息丢失

在Kafka中,消息丢失在Kafka的生产端和消费端都会出现。在此之前我们先来了解一下生产者和消费者的原理。

2.1 生产端问题

生产者原理
Kafka生产者生产消息后,会将消息发送到Kafka集群的Leader中,然后Kafka集群的Leader收到消息后会返回ACK确认消息给生产者Producer。主要拆解为以下几个步骤。

  • Producer先从Kafka集群找到该Partition的Leader。
  • Producer将消息发送给Leader,Leader将该消息写入本地。
  • Follwer从Leader pull消息,写入本地Log后Leader发送ACK。
  • Leader 收到所有 ISR 中的 Replica 的 ACK 后,增加High Watermark,并向 Producer 发送 ACK。
    在这里插入图片描述
    因此,Kafka集群(其实是分区的Leader)最终会返回一个ACK来确认Producer推送消息的结果,这里Kafka提供了三种模式:
  • NoResponse RequiredAcks = 0:这个代表的就是不进行消息推送是否成功的确认。
  • WaitForLocal RequiredAcks = 1:当local(Leader)确认接收成功后,就可以返回了。
  • WaitForAll RequiredAcks = -1:当所有的Leader和Follower都接收成功时,才会返回。

因此这个配置的影响也分为下面三种情况:

  • 设置为0,Producer不进行消息发送的确认,Kafka集群(Broker)可能由于一些原因并没有收到对应消息,从而引起消息丢失。
  • 设置为1,Producer在确认到 Topic Leader 已经接收到消息后,完成发送,此时有可能 Follower 并没有接收到对应消息。此时如果 Leader 突然宕机,在经过选举之后,没有接到消息的 Follower 晋升为 Leader,从而引起消息丢失。
  • 设置为-1,可以很好的确认Kafka集群是否已经完成消息的接收和本地化存储,并且可以在Producer发送失败时进行重试。

生产端解决消息丢失方案:

  • 通过设置RequiredAcks模式来解决,选用WaitForAll(对应值为-1)可以保证数据推送成功,不过会影响延时。
  • 引入重试机制,设置重试次数和重试间隔。
  • 当然,最后就是使用Kafka的多副本机制保证Kafka集群本身的可靠性,确保当Leader挂掉之后能进行Follower选举晋升为新的Leader。

2.2 消费端问题

消费端的消息丢失问题
消费端的消息丢失主要是因为在消费过程中出现了异常,但是对应消息的 Offset 已经提交,那么消费异常的消息将会丢失。
前面介绍过,Offset的提交包括手动提交和自动提交,可通过kafka.consumer.enable-auto-commit进行配置。
手动提交可以灵活的确认是否将本次消费数据的Offset进行提交,可以很好的避免消息丢失的情况。
自动提交是引起消息丢失的主要诱因。因为消息的消费并不会影响到Offset的提交。
大部分的解决方案为了尽可能的保证数据的完整性,都是尽量去选用手动提交的方式,当数据处理完之后再进行提交。
当然,在golang中我们主要使用sarama包的Kafka,sarama自动提交的原理是先进行标记,再进行提交,如下代码所示:

type exampleConsumerGroupHandler struct{}

func (exampleConsumerGroupHandler) Setup(_ ConsumerGroupSession) error   { return nil }
func (exampleConsumerGroupHandler) Cleanup(_ ConsumerGroupSession) error { return nil }
func (h exampleConsumerGroupHandler) ConsumeClaim(sess ConsumerGroupSession, claim ConsumerGroupClaim) error {
   for msg := range claim.Messages() {
      fmt.Printf("Message topic:%q partition:%d offset:%d
", msg.Topic, msg.Partition, msg.Offset)
      // 标记消息已处理,sarama会自动提交
      // 处理数据(如真正持久化mysql...)
      sess.MarkMessage(msg, "")
   }
   return nil

因此,我们完全可以在标记之前进行数据的处理,例如插入Mysql等,当出现插入成功后程序崩溃,下一次最多重复消费一次(因为还没标记,Offset没有提交),而不会因为Offset超前,导致应用层消息丢失了。

手动提交模式下当然是很灵活的控制的,但确实已经没必要了:

consumerConfig := sarama.NewConfig()
consumerConfig.Version = sarama.V2_8_0_0
consumerConfig.Consumer.Return.Errors = false
consumerConfig.Consumer.Offsets.AutoCommit.Enable = false  // 禁用自动提交,改为手动
consumerConfig.Consumer.Offsets.Initial = sarama.OffsetNewest
 
func (h msgConsumerGroup) ConsumeClaim(sess sarama.ConsumerGroupSession, claim sarama.ConsumerGroupClaim) error {
   for msg := range claim.Messages() {
      fmt.Printf("%s Message topic:%q partition:%d offset:%d  value:%s
", h.name, msg.Topic, msg.Partition, msg.Offset, string(msg.Value))
 
      // 插入mysql....
      // 手动提交模式下,也需要先进行标记
      sess.MarkMessage(msg, "")
      consumerCount++
      if consumerCount%3 == 0 {
         // 手动提交,不能频繁调用
         t1 := time.Now().Nanosecond()
         sess.Commit()
         t2 := time.Now().Nanosecond()
         fmt.Println("commit cost:", (t2-t1)/(1000*1000), "ms")
      }
   }
   return nil
}

三、参考

1、Go语言如何操纵Kafka保证无消息丢失
2、kafka消息重复消费和消息丢失
3、sarama Kafka客户端生产者与消费者梳理

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/47975.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

Multipass,多平台本地轻量级Linux体验!

Multipass 是由Ubuntu官方提供,在Linux,MacOS和Windows上快速生成 Ubuntu虚拟机 的工具。它提供了一个简单但功能强大的CLI,可让我们在本地快速进入Ubuntu系统环境并使用Linux命令,亦可以在本地计算机创建自己的迷你型云服务器。总…

zabbix监控网络连接状态

目录 一、环境准备 二、网络连接状态介绍 三、自定义监控key 四、给主机添加监控项 一、环境准备 1、搭建zabbix基础环境 zabbix基础环境部署参照:zabbix基础环境部署_桂安俊kylinOS的博客-CSDN博客 2、给web1安装nginx环境,并加载status模块 以…

SpringCloud学习笔记 - 链路监控 - SpringCloud Sleuth

1. Sleuth简介 在微服务框架中,一个由客户端发起的请求,在后端系统中会调用多个不同的的服务节点,来协同产生最后的响应结果,因此每一个前端请求都会形成一条复杂的分布式服务调用链路,链路中的任何一环出现高延时或错…

信号量Semaphore详解

前言 大家应该都用过synchronized 关键字加锁,用来保证某个时刻只允许一个线程运行。那么如果控制某个时刻允许指定数量的线程执行,有什么好的办法呢? 答案就是JUC提供的信号量Semaphore。 介绍和使用 Semaphore(信号量)可以用…

iwebsec靶场 SQL注入漏洞通关笔记10- 双重url编码绕过

系列文章目录 iwebsec靶场 SQL注入漏洞通关笔记1- 数字型注入_mooyuan的博客-CSDN博客 iwebsec靶场 SQL注入漏洞通关笔记2- 字符型注入(宽字节注入)_mooyuan的博客-CSDN博客 iwebsec靶场 SQL注入漏洞通关笔记3- bool注入(布尔型盲注&#…

VF01销售开票发票金额控制增强

实施隐式增强 全部代码如下: method IF_EX_BADI_SD_BILLING~INVOICE_DOCUMENT_CHECK. CALL FUNCTION ‘SIPT_DOC_CHECK_SD’ EXPORTING it_xvbrk fxvbrk it_xvbrp fxvbrp it_xkomv fxkomv it_xvbpa fxvbpa IMPORTING ev_bad_data fbad_data. “”“”“”“…

【LeetCode每日一题】——171.Excel 表列序号

文章目录一【题目类别】二【题目难度】三【题目编号】四【题目描述】五【题目示例】六【解题思路】七【题目提示】八【时间频度】九【代码实现】十【提交结果】一【题目类别】 字符串 二【题目难度】 简单 三【题目编号】 171.Excel 表列序号 四【题目描述】 给你一个字…

SecXOps 技术发展趋势

可以预见,安全智能分析技术的发展,将全面提升网络安全 关键应用场景下威胁检测的效果,推动安全分析从基础级、领先级,向卓越级演进。根据 Gartner 2021 年十大数据和分 析技术趋势,XOps 的目标是利用 DevOps 最佳实践实…

计算机网络性能指标——时延,时延带宽积,RTT和利用率

时延 时延:数据(报文、分组、比特流)从网络的一端传送到另一端所需要的时间。也叫延迟或迟延,单位是s。 时延包括:发送时延(传输时延),传播时延,排队时延,处理时延。 高…

2022年最新安徽食品安全管理员模拟试题及答案

百分百题库提供食品安全管理员考试试题、食品安全管理员考试预测题、食品安全管理员考试真题、食品安全管理员证考试题库等,提供在线做题刷题,在线模拟考试,助你考试轻松过关。 1.引起副溶血弧菌食物中毒的主要食品是: A.罐头食品…

公司新来一个同事,把网关系统设计的炉火纯青,万能通用,稳的一批。。

本文准备围绕七个点来讲网关,分别是网关的基本概念、网关设计思路、网关设计重点、流量网关、业务网关、常见网关对比,对基础概念熟悉的朋友可以根据目录查看自己感兴趣的部分。 什么是网关 网关,很多地方将网关比如成门, 没什么…

操作系统学习(九)死锁

目录 学习建议: 基本内容: 一、死锁的形成: 二、死锁的必要条件: 三、资源分配图: 四、解决死锁问题的几个策略: 五、死锁的防止: 1.互斥条件: 2.占有并等待资源&#xff1a…

[NCTF2019]SQLi

进来就有个弹窗 甚至给了sql语句 sqlquery : select * from users where username and passwd 先扫一下目录,发现有个robots.txt 提示有个hint.txt $black_list "/limit|by|substr|mid|,|admin|benchmark|like|or|char|union|substring|select|greatest|%00…

特种品种权限开通和豁免

目前国内期货市场,许多商品期货品种需要特殊开户流程。主要有7个(未包含期权以及金融期货):铁矿石、PTA、20号胶、低硫燃料油、棕榈油、原油、国际铜。 想要交易铁矿石,首先需要拥有一个期货账户,其次再向…

TCL 基于 StarRocks 构建统一的数据分析平台

作者:陈树煌,TCL 实业数据管理部副总监(本文为作者在 StarRocks Summit Asia 2022 上的分享) 作为伴随改革开放浪潮成长起来的中国领先电子企业,TCL 拥有 13 万员工,业务遍及 160 多个国家和地区&#xff…

[附源码]计算机毕业设计springboot公共台账管理系统

项目运行 环境配置: Jdk1.8 Tomcat7.0 Mysql HBuilderX(Webstorm也行) Eclispe(IntelliJ IDEA,Eclispe,MyEclispe,Sts都支持)。 项目技术: SSM mybatis Maven Vue 等等组成,B/S模式 M…

【重温C++ Primer】第一章、初识C++

目录前言一、编写一个简单的c程序二、编译、运行程序三、初识输入输出四、注释类型五、控制流5.1、循环语句:while for5.2、读取数量不定的输入数据5.2、if 语句六、类Reference前言 上次学C还是在大一的时候,一直都想好好的温习一下C。刚好最后被隔离…

CS5263数据手册|CS5263替代PS176|DP转HDMI2.0芯片设计资料

CS5263替代PS176用于DP转HDMI2.0转换器芯片,DP转HDMI 4K60HZ转换方案芯片,CS5263芯片管脚分布及定义: CS5263芯片DEMO功能演示板如下: CS5263替代PS176支持HDCP 1.4和HDCP2.3,带有支持HDCP中继器的片上键。用于Typec扩…

电脑怎么提取图片中的文字?

图片记录着我们生活的点点滴滴,比如各种办公截图、查快递单号、布置的课堂作业等等,都离不开这种便捷的方法。而我们有时难免需要从图片中提取想要的文字,总不能就靠打字打到手软吧,那么电脑怎么提取图片中的文字呢?有需要的朋友…

基于改进粒子群算法的微电网多目标优化调度(Matlab代码实现)

👨‍🎓个人主页:研学社的博客 💥💥💞💞欢迎来到本博客❤️❤️💥💥 🏆博主优势:🌞🌞🌞博客内容尽量做到思维缜…