Spring-Kafka 3.0 消费者消费失败处理方案

news2025/1/16 2:32:58

一、背景

我们作为Kafka在使用Kafka是,必然考虑消息消费失败的重试次数,重试后仍然失败如何处理,要么阻塞,要么丢弃,或者保存

二、设置消费失败重试次数

1 默认重试次数在哪里看

Kafka3.0 版本默认失败重试次数为10次,准确讲应该是1次正常调用+9次重试,这个在这个类可以看到 org.springframework.kafka.listener.SeekUtils

2 如何修改重试次数

据我的实验,spring-kafka3.0版本通过application.yml 配置是行不通的,也没有找到任何一项配置可以改重试次数的(网上很多说的通过配置spring.kafka.consumer.retries 可以配置,我尝试过了,至少3.0版本是不行的,如果有人成功试过可以通过application.yml 配置消费者的消费的重试次数可以留言通知我,谢谢)

经过我不懈努力和尝试,只能通过Java代码配置的方式才可以,并且这种方式相对于application.yml配置更加灵活细致,上代码

    public CommonErrorHandler commonErrorHandler() {
        BackOff backOff = new FixedBackOff(5000L, 3L);
        return  new DefaultErrorHandler(backOff);
    }

然后把这个handler 添加到ConcurrentKafkaListenerContainerFactory中就行了

三、设置消费失败处理方式

1 保存到数据库重试

我们需要在创建DefaultErrorHandler类时加入一个ConsumerAwareRecordRecoverer参数就可以了,这样在重试3次后仍然失败就会保存到数据库中,注意这里save to db成功之后,我认为没有必要执行consumer.commitSync方法,首先这个consumer.commitSync这个方法默认是提交当前批次的最大的offset(可能会导致丢失消息),其次不提交Kafka的消费者仍然回去消费后面的消息,只要后面的消息,消费成功了,那么依然会提交offset,覆盖了这个offset

    public CommonErrorHandler commonErrorHandler() {
        // 创建 FixedBackOff 对象
        BackOff backOff = new FixedBackOff(5000L, 3L);
        DefaultErrorHandler defaultErrorHandler = new DefaultErrorHandler((ConsumerAwareRecordRecoverer) (record, consumer, exception) -> {
            log.info("save to db " + record.value().toString());
        }, backOff);
        return defaultErrorHandler;
    }

如果你硬要提交也可以试试下面这种,指定提交当前的offset

    public CommonErrorHandler commonErrorHandler() {
        // 创建 FixedBackOff 对象
        BackOff backOff = new FixedBackOff(5000L, 3L);
        DefaultErrorHandler defaultErrorHandler = new DefaultErrorHandler((ConsumerAwareRecordRecoverer) (record, consumer, exception) -> {
            log.info("save to db " + record.value().toString());
            Map<TopicPartition, OffsetAndMetadata> offsets = new HashMap<>();
            offsets.put(new TopicPartition(record.topic(),record.partition()),new OffsetAndMetadata(record.offset()));
            consumer.commitSync(offsets);
        }, backOff);
        return defaultErrorHandler;
    }

2 发送到Kafka死信队列

仍然在创建DefaultErrorHandler类时加入一个DeadLetterPublishingRecoverer 类就行了,默认会把消息发到kafkaTemplate 配置的topic名字为your_topic+.DLT

@Autowired
private KafkaTemplate<String, String> kafkaTemplate;
   

 public CommonErrorHandler commonErrorHandler() {
        // 创建 FixedBackOff 对象
        BackOff backOff = new FixedBackOff(5000L, 3L);

        DefaultErrorHandler defaultErrorHandler = new DefaultErrorHandler(new DeadLetterPublishingRecoverer(kafkaTemplate), backOff);

        return defaultErrorHandler;
    }
ConsumerRecordRecoverer 接口总共就这2种实现方式

四、整体消费者代码粘贴

1 application.yml

kafka-consumer:
  bootstrapServers: 192.168.31.114:9092
  groupId: goods-center
  #后台的心跳线程必须在30秒之内提交心跳,否则会reBalance
  sessionTimeOut: 30000
  autoOffsetReset: latest
  #取消自动提交,即便如此 spring会帮助我们自动提交
  enableAutoCommit: false
  #自动提交间隔
  autoCommitInterval: 1000
  #拉取的最小字节
  fetchMinSize: 1
  #拉去最小字节的最大等待时间
  fetchMaxWait: 500
  maxPollRecords: 50
  #300秒的提交间隔,如果程序大于300秒提交,会报错
  maxPollInterval: 300000
  #心跳间隔
  heartbeatInterval: 10000
  keyDeserializer: org.apache.kafka.common.serialization.LongDeserializer
  valueDeserializer: org.springframework.kafka.support.serializer.JsonDeserializer

2 KafkaListenerProperties

package com.ychen.goodscenter.fafka;

import lombok.Getter;
import lombok.Setter;
import org.springframework.boot.context.properties.ConfigurationProperties;
import org.springframework.context.annotation.Configuration;

@Configuration
//指定配置文件的前缀
@ConfigurationProperties(prefix = "kafka-consumer")
@Getter
@Setter
public class KafkaListenerProperties {
 
    private String groupId;
 
    private String sessionTimeOut;
 
    private String bootstrapServers;
 
    private String autoOffsetReset;
 
    private boolean enableAutoCommit;
 
    private String autoCommitInterval;
 
    private String fetchMinSize;
 
    private String fetchMaxWait;
 
    private String maxPollRecords;
 
    private String maxPollInterval;
 
    private String heartbeatInterval;
 
    private String keyDeserializer;
 
    private String valueDeserializer;
 
}

3 KafkaConsumerConfig

package com.ychen.goodscenter.fafka;

import com.alibaba.fastjson2.JSONObject;
import lombok.extern.slf4j.Slf4j;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.context.properties.EnableConfigurationProperties;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory;
import org.springframework.kafka.config.KafkaListenerContainerFactory;
import org.springframework.kafka.core.ConsumerFactory;
import org.springframework.kafka.core.DefaultKafkaConsumerFactory;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.kafka.listener.*;
import org.springframework.util.backoff.BackOff;
import org.springframework.util.backoff.FixedBackOff;

import java.util.HashMap;
import java.util.Map;

@Configuration
@EnableConfigurationProperties(KafkaListenerProperties.class)
@Slf4j
public class KafkaConsumerConfig {
    @Autowired
    private KafkaListenerProperties kafkaListenerProperties;

    @Autowired
    private KafkaTemplate<String, String> kafkaTemplate;

    @Bean
    public KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<String, String>> kafkaListenerContainerFactory() {
        ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<>();
        factory.setConsumerFactory(consumerFactory());
        // 并发数 多个微服务实例会均分
        factory.setConcurrency(2);
//        factory.setBatchListener(true);
        factory.setCommonErrorHandler(commonErrorHandler());

        ContainerProperties containerProperties = factory.getContainerProperties();
        // 是否设置手动提交
        containerProperties.setAckMode(ContainerProperties.AckMode.MANUAL_IMMEDIATE);

        return factory;
    }


    private ConsumerFactory<String, String> consumerFactory() {
        Map<String, Object> consumerConfigs = consumerConfigs();
        log.info("消费者的配置信息:{}", JSONObject.toJSONString(consumerConfigs));
        return new DefaultKafkaConsumerFactory<>(consumerConfigs);
    }


    public CommonErrorHandler commonErrorHandler() {
        // 创建 FixedBackOff 对象
        BackOff backOff = new FixedBackOff(5000L, 3L);

        DefaultErrorHandler defaultErrorHandler = new DefaultErrorHandler(new DeadLetterPublishingRecoverer(kafkaTemplate), backOff);
//        DefaultErrorHandler defaultErrorHandler = new DefaultErrorHandler((ConsumerAwareRecordRecoverer) (record, consumer, exception) -> {
//            log.info("save to db " + record.value().toString());
//            Map<TopicPartition, OffsetAndMetadata> offsets = new HashMap<>();
//            offsets.put(new TopicPartition(record.topic(),record.partition()),new OffsetAndMetadata(record.offset()));
//            consumer.commitSync(offsets);
//        }, backOff);
        return defaultErrorHandler;
    }

    public Map<String, Object> consumerConfigs() {
        Map<String, Object> propsMap = new HashMap<>();
        // 服务器地址
        propsMap.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaListenerProperties.getBootstrapServers());
        // 是否自动提交
        propsMap.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, kafkaListenerProperties.isEnableAutoCommit());
        // 自动提交间隔
        propsMap.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, kafkaListenerProperties.getAutoCommitInterval());

        //会话时间
        propsMap.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, kafkaListenerProperties.getSessionTimeOut());
        //key序列化
        propsMap.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, kafkaListenerProperties.getKeyDeserializer());
        //value序列化
        propsMap.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, kafkaListenerProperties.getValueDeserializer());
        // 心跳时间
        propsMap.put(ConsumerConfig.HEARTBEAT_INTERVAL_MS_CONFIG, kafkaListenerProperties.getHeartbeatInterval());

        // 分组id
        propsMap.put(ConsumerConfig.GROUP_ID_CONFIG, kafkaListenerProperties.getGroupId());
        //消费策略
        propsMap.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, kafkaListenerProperties.getAutoOffsetReset());
        // poll记录数
        propsMap.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, kafkaListenerProperties.getMaxPollRecords());
        //poll时间
        propsMap.put(ConsumerConfig.MAX_POLL_INTERVAL_MS_CONFIG, kafkaListenerProperties.getMaxPollInterval());

        propsMap.put("spring.json.trusted.packages", "com.ychen.**");

        return propsMap;
    }


}

4 MessageListener

package com.ychen.goodscenter.fafka;

import com.ychen.goodscenter.service.OrderService;
import com.ychen.goodscenter.vo.req.SubmitOrderReq;
import lombok.extern.slf4j.Slf4j;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.dao.DuplicateKeyException;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.kafka.support.Acknowledgment;
import org.springframework.stereotype.Component;

@Component
@Slf4j
public class MessageListener {
    @Autowired
    private OrderService orderService;

    @KafkaListener(topics = "order-message-topic", containerFactory = "kafkaListenerContainerFactory")
    public void processMessage(ConsumerRecord<Long, SubmitOrderReq> record, Acknowledgment acknowledgment) {
        log.info("order-message-topic message Listener, Thread ID: " + Thread.currentThread().getId());

        try {
            log.info("order-message-topic message received, orderId: {}", record.value().getOrderId());

            orderService.submitOrder(record.value());
            // 同步提交
            acknowledgment.acknowledge();
            log.info("order-message-topic message acked: orderId: {}", record.value().getOrderId());
        } catch (DuplicateKeyException dupe) {
            // 处理异常情况
            log.error("order-message-topic message error DuplicateKeyException", dupe);
            // 重复数据,忽略掉,同步提交
            acknowledgment.acknowledge();
        }
    }
}

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/1411147.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

【AI Agent系列】【MetaGPT】9. 一句话订阅专属信息 - 订阅智能体进阶,实现一个更通用的订阅智能体(2)

文章目录 0. 前置推荐阅读和本文内容0.1 前置推荐阅读0.2 本文内容 1. 修改一&#xff1a;直接用大模型获取网页信息&#xff0c;不用爬虫程序1.1 我们要给大模型什么内容1.2 提取网页文本信息1.3 组织Action1.4 完整代码及细节注释1.5 可能存在的问题及思考 2. 修改二&#xf…

python:socket基础操作(3)-《udp接收消息》

收跟发基本核心思想差不多&#xff0c;只不过收信息需要去绑定一下端口&#xff0c;如果我们发信息没有绑定端口&#xff0c;那系统会随机分配一个&#xff0c;如果是收信息&#xff0c;那我们必须要求自己绑定端口才行 基础的接收数据 import socketudp_socket socket.socke…

活字格V9获取图片失败bug,报错404,了解存储路径,已改为批量上传和批量获取

项目场景&#xff1a; 问题描述 原因分析&#xff1a; 解决方案&#xff1a; 完成了批量上传功能&#xff0c;这插件真的很方便 于是写了个批量获取附件的js代码&#xff0c;我真厉害 项目场景&#xff1a; 活字格V9版本获取图片链接Upload 【9.0.103.0】图片上传的存储路…

nodeJs+express+Vue+MongoDB

数据库【Sqlite3、MongoDB、Mysql】简介&小记 Sqlite3&#xff1a; SQLite3是一个轻量级的数据库系统&#xff0c;它被设计成嵌入式数据库。这意味着它是一个包含在应用程序中的数据库&#xff0c;而不是独立运行的系统服务。适用场景&#xff1a;如小型工具、游戏、本地…

golang入门

学习方法 1、在实践中学 2、适当的囫囵吞枣&#xff0c;有可能学到后面&#xff0c;对前面的疑问焕然大悟 3、注重整体&#xff0c;刚开始不要去扣细节 安装 需要配置3个环境变量&#xff0c;如果.msi文件安装时设置好了就不需要了&#xff0c;自己可以检查下 GOROOT&…

C++学习| QT快速入门

QT简单入门 QT Creater创建QT项目选择项目类型——不同项目类型的区别输入项目名字和路径选择合适的构建系统——不同构建系统的却别选择合适的类——QT基本类之间的关系Translation File选择构建套件——MinGW和MSVC的区别 简单案例&#xff1a;加法器设计界面——构建加法器界…

【自动化测试】读写64位操作系统的注册表

自动化测试经常需要修改注册表 很多系统的设置&#xff08;比如&#xff1a;IE的设置&#xff09;都是存在注册表中。 桌面应用程序的设置也是存在注册表中。 所以做自动化测试的时候&#xff0c;经常需要去修改注册表 Windows注册表简介 注册表编辑器在 C:\Windows\regedit…

金蝶云星空 DynamicForm RCE漏洞复现

0x01 产品简介 金蝶云星空是一款云端企业资源管理(ERP)软件,为企业提供财务管理、供应链管理以及业务流程管理等一体化解决方案。金蝶云星空聚焦多组织,多利润中心的大中型企业,以 “开放、标准、社交”三大特性为数字经济时代的企业提供开放的 ERP 云平台。服务涵盖:财…

C语言练习题110例(十)

91.杨辉三角 题目描述: KK知道什么叫杨辉三角之后对杨辉三角产生了浓厚的兴趣&#xff0c;他想知道杨辉三角的前n行&#xff0c;请编程帮他 解答。杨辉三角&#xff0c;本质上是二项式(ab)的n次方展开后各项的系数排成的三角形。其性质包括&#xff1a;每行的端点数为1&…

设计模式_装饰器模式_Decorator

生活案例 咖啡厅 咖啡定制案例 在咖啡厅中&#xff0c;有多种不同类型的咖啡&#xff0c;客户在预定了咖啡之后&#xff0c;还可以选择添加不同的调料来调整咖啡的口味&#xff0c;当客户点了咖啡添加了不同的调料&#xff0c;咖啡的价格需要做出相应的改变。 要求&#xff…

如何使用Stable Diffusion的ReActor换脸插件

ReActor插件是从roop插件分叉而来的一个更轻便、安装更简单的换脸插件。操作简单&#xff0c;非常容易上手&#xff0c;下面我们就介绍一下&#xff0c;如何将ReActor作为stable diffusion的插件进行安装和使用。 一&#xff1a;安装ReActor插件 项目地址&#xff1a;https:/…

【Docker】实战案例 - 操作系统

作者主页&#xff1a; 正函数的个人主页 文章收录专栏&#xff1a; Docker 欢迎大家点赞 &#x1f44d; 收藏 ⭐ 加关注哦&#xff01; 操作系统 目前常用的 Linux 发行版主要包括 Debian/Ubuntu 系列和 CentOS/Fedora 系列。 前者以自带软件包版本较新而出名&#xff1b;后者…

android学习笔记----SQLite数据库

用SQLite语句执行&#xff1a; 首先看到界面&#xff1a; 代码如下&#xff1a; MainActivity.java import android.support.v7.app.AppCompatActivity; import android.os.Bundle; import android.text.TextUtils; import android.view.View; import android.widget.EditTe…

SpringBoot_基础

学习目标 基于SpringBoot框架的程序开发步骤 熟练使用SpringBoot配置信息修改服务器配置 基于SpringBoot的完成SSM整合项目开发 一、SpringBoot简介 1. 入门案例 问题导入 SpringMVC的HelloWord程序大家还记得吗&#xff1f; SpringBoot是由Pivotal团队提供的全新框架&…

2024年前端会流行什么技术和框架了?

分享下一款面向研发开发使用、全源码支持、前后端一体的低代码工具JNPF&#xff0c;是引迈信息明星项目&#xff0c;拥有强大的可视化建模、数据库和API集成能力。 目前已有将近超千家企业将JNPF低代码开发工具融入内部研发体系&#xff0c;相较于传统的产研开发&#xff0c;使…

【oracle】oracle客户端及oracle连接工具

一、关于oracle客户端 1.1 Oracle Client 完整客户端 包含完整的客户端连接工具。 包很大&#xff0c;需要安装 1.2 instantclient 即时客户端 是 Oracle(R) 发布的轻量级数据库客户端&#xff0c;减少甚至只包含几个文件&#xff0c;您无需安装标准的客户端&#xff0c;就可以…

百度Apollo | 实车自动驾驶:感知、决策、执行的无缝融合

&#x1f3ac; 鸽芷咕&#xff1a;个人主页 &#x1f525; 个人专栏:《linux深造日志》《粉丝福利》 ⛺️生活的理想&#xff0c;就是为了理想的生活! ⛳️ 推荐 前些天发现了一个巨牛的人工智能学习网站&#xff0c;通俗易懂&#xff0c;风趣幽默&#xff0c;忍不住分享一下…

MongoDB相关概念

1.1 业务应用场景 传统的关系型数据库&#xff08;如MySQL&#xff09;&#xff0c;在数据操作的“三高”需求以及应对Web2.0的网站需求面前&#xff0c;显得力不从心。“三高”需求&#xff1a; High performance - 对数据库高并发读写的需求。Huge Storage - 对海量数据的高…

论文阅读《thanking frequency fordeepfake detection》

这篇论文从频域的角度出发&#xff0c;提出了频域感知模型用于deepfake检测的模型 整体架构图&#xff1a; 1.FAD&#xff1a; 频域感知分解&#xff0c;其实就是利用DCT变换&#xff0c;将空间域转换为频域&#xff0c;变换后的图像低频信息在左上角&#xff0c;高频信息在右…

移动端短视频SDK,企业级视频编辑解决方案

短视频已成为企业营销、品牌推广、信息传递的重要手段&#xff0c;美摄科技凭借其在视频处理领域的深厚积累&#xff0c;推出了全方位的移动端短视频SDK方案&#xff0c;为企业提供从拍摄、特效、人脸道具到快速编辑的一站式解决方案。 1、拍摄与录制 美摄科技移动端短视频SD…