kafka-消费者-消费异常处理(SpringBoot整合Kafka)

news2025/4/16 12:59:49

文章目录

  • 1、消费异常处理
    • 1.1、application.yml配置
    • 1.2、注册异常处理器
    • 1.3、消费者使用异常处理器
    • 1.4、创建生产者发送消息
    • 1.5、创建SpringBoot启动类
    • 1.6、屏蔽 kafka debug 日志 logback.xml
    • 1.7、引入spring-kafka依赖
    • 1.8、消费者控制台:
      • 1.8.1、第一次启动SpringKafkaConsumerApplication
      • 1.8.n、第n次启动SpringKafkaConsumerApplication

1、消费异常处理

1.1、application.yml配置

server:
  port: 8120

# v1
spring:
  Kafka:
    bootstrap-servers: 192.168.74.148:9095,192.168.74.148:9096,192.168.74.148:9097
    consumer:
      # read-committed读事务已提交的消息 解决脏读问题
      isolation-level: read-committed # 消费者的事务隔离级别:read-uncommitted会导致脏读,可以读取生产者事务还未提交的消息
      # 消费者是否自动ack :true自动ack 消费者获取到消息后kafka提交消费者偏移量
      # 调用ack方法时才会提交ack给kafka
#      enable-auto-commit: false
      # 消费者提交ack时多长时间批量提交一次
      auto-commit-interval: 1000
      # 消费者第一次消费主题消息时从哪个位置开始
      # earliest:从最早的消息开始消费
      # latest:第一次从LEO位置开始消费
      # none:如果主题分区没有偏移量,则抛出异常
      auto-offset-reset: earliest  #指定Offset消费:earliest | latest | none
      key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
      value-deserializer: org.apache.kafka.common.serialization.StringDeserializer
    listener:
      # 手动ack:manual手动ack时 如果有异常会尝试一直消费
#      ack-mode: manual
      # 手动ack:消费有异常时停止
      ack-mode: manual_immediate

在这里插入图片描述

1.2、注册异常处理器

package com.atguigu.spring.kafka.consumer.config;
import org.apache.kafka.clients.admin.NewTopic;
import org.apache.kafka.clients.consumer.Consumer;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.config.TopicBuilder;
import org.springframework.kafka.listener.ConsumerAwareListenerErrorHandler;
import org.springframework.kafka.listener.ListenerExecutionFailedException;
import org.springframework.messaging.Message;
@Configuration
public class MyKafkaConfig {
    @Bean
    public NewTopic springTestPartitionTopic() {
        return TopicBuilder.name("my_topic1") //主题名称
                .partitions(3) //分区数量
                .replicas(3) //副本数量
                .build();
    }

    //方法名就是注入到容器中对象的名称
    @Bean
    public ConsumerAwareListenerErrorHandler myErrorHandler() {
        //创建异常处理器:消费者异常时 且注册使用当前异常处理器 会生效
        return new ConsumerAwareListenerErrorHandler() {

            @Override
            public Object handleError(Message<?> message, ListenerExecutionFailedException e, Consumer<?, ?> consumer) {
                System.out.println("出现异常,消息内容:value = " + message.getPayload());
                System.out.println("header = "+message.getHeaders());
                System.out.println("异常信息:" + e.getMessage());
                System.out.println("=================");
                return null;
            }
        };
    }
}

1.3、消费者使用异常处理器

package com.atguigu.spring.kafka.consumer.listener;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.kafka.annotation.TopicPartition;
import org.springframework.kafka.support.Acknowledgment;
import org.springframework.stereotype.Component;
@Component
public class MyKafkaListenerAck {
    /**
     * 自动ack可能会导致漏消息
     *      spring-kafka:
     *                     自动ack 如果有异常,会死循环获取消息重新消费
     *                        不能继续向后消费消息,会导致消息积压
     *
     *                    手动ack 配置了手动ack,且ack-mode为manual_immediate时,
     *                        如果消息消费失败,会继续向后消费
     * @param record
     */
    @KafkaListener(
            topicPartitions = {
                    @TopicPartition(topic = "my_topic1",partitions = {"0"})}
            , groupId = "my_group1",errorHandler = "myErrorHandler")
    public void onMessage1(ConsumerRecord<String, String> record, Acknowledgment acknowledgment) {
        System.out.println("my_group1消费者获取分区0的消息:topic = "+ record.topic()
                +",partition:"+record.partition()
                +",offset = "+record.offset()
                +",key = "+record.key()
                +",value = "+record.value());
        int i = 1/0;
        // 手动ack:手动确认消息已经消费 broker 会提交消费者偏移量
        acknowledgment.acknowledge();
    }
}

在这里插入图片描述

1.4、创建生产者发送消息

package com.atguigu.spring.kafka.consumer;

import jakarta.annotation.Resource;
import org.junit.jupiter.api.Test;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.kafka.core.KafkaTemplate;

@SpringBootTest
class SpringKafkaConsumerApplicationTests {
    @Resource
    KafkaTemplate kafkaTemplate;

    @Test
    void contextLoads() {
        for (int i = 0; i < 10; i++) {
            kafkaTemplate.send("my_topic1",i%3,"", "指定ack-mode: manual_immediate消费"+i);
        }
    }

}

1.5、创建SpringBoot启动类

package com.atguigu.spring.kafka.consumer;

import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;


// Generated by https://start.springboot.io
// 优质的 spring/boot/data/security/cloud 框架中文文档尽在 => https://springdoc.cn
@SpringBootApplication
public class SpringKafkaConsumerApplication {

    public static void main(String[] args) {
        SpringApplication.run(SpringKafkaConsumerApplication.class, args);
    }

}

1.6、屏蔽 kafka debug 日志 logback.xml

<configuration>      
    <!-- 如果觉得idea控制台日志太多,src\main\resources目录下新建logback.xml
屏蔽kafka debug -->
    <logger name="org.apache.kafka.clients" level="debug" />
</configuration>

1.7、引入spring-kafka依赖

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>
    <parent>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-parent</artifactId>
        <version>3.0.5</version>
        <relativePath/> <!-- lookup parent from repository -->
    </parent>

    <!-- Generated by https://start.springboot.io -->
    <!-- 优质的 spring/boot/data/security/cloud 框架中文文档尽在 => https://springdoc.cn -->
    <groupId>com.atguigu</groupId>
    <artifactId>spring-kafka-consumer</artifactId>
    <version>0.0.1-SNAPSHOT</version>
    <name>spring-kafka-consumer</name>
    <description>spring-kafka-consumer</description>
    <properties>
        <java.version>17</java.version>
    </properties>
    <dependencies>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter</artifactId>
        </dependency>

        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-test</artifactId>
            <scope>test</scope>
        </dependency>

        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-web</artifactId>
        </dependency>
        <dependency>
            <groupId>org.springframework.kafka</groupId>
            <artifactId>spring-kafka</artifactId>
        </dependency>

    </dependencies>

    <build>
        <plugins>
            <plugin>
                <groupId>org.springframework.boot</groupId>
                <artifactId>spring-boot-maven-plugin</artifactId>
            </plugin>
        </plugins>
    </build>

</project>

1.8、消费者控制台:

1.8.1、第一次启动SpringKafkaConsumerApplication

  .   ____          _            __ _ _
 /\\ / ___'_ __ _ _(_)_ __  __ _ \ \ \ \
( ( )\___ | '_ | '_| | '_ \/ _` | \ \ \ \
 \\/  ___)| |_)| | | | | || (_| |  ) ) ) )
  '  |____| .__|_| |_|_| |_\__, | / / / /
 =========|_|==============|___/=/_/_/_/
 :: Spring Boot ::                (v3.0.5)

my_group1消费者获取分区0的消息:topic = my_topic1,partition:0,offset = 0,key = ,value = 指定ack-mode: manual_immediate消费0
出现异常,消息内容:value = ConsumerRecord(topic = my_topic1, partition = 0, leaderEpoch = 0, offset = 0, CreateTime = 1717672170845, serialized key size = 0, serialized value size = 39, headers = RecordHeaders(headers = [], isReadOnly = false), key = , value = 指定ack-mode: manual_immediate消费0)
header = {id=91c96206-2381-5fa1-b391-52627056762f, timestamp=1717672488753}
异常信息:Listener method 'public void com.atguigu.spring.kafka.consumer.listener.MyKafkaListenerAck.onMessage1(org.apache.kafka.clients.consumer.ConsumerRecord<java.lang.String, java.lang.String>,org.springframework.kafka.support.Acknowledgment)' threw exception
=================
my_group1消费者获取分区0的消息:topic = my_topic1,partition:0,offset = 1,key = ,value = 指定ack-mode: manual_immediate消费3
出现异常,消息内容:value = ConsumerRecord(topic = my_topic1, partition = 0, leaderEpoch = 0, offset = 1, CreateTime = 1717672170855, serialized key size = 0, serialized value size = 39, headers = RecordHeaders(headers = [], isReadOnly = false), key = , value = 指定ack-mode: manual_immediate消费3)
header = {id=8755d954-615c-37ff-67f0-85521e090b03, timestamp=1717672488753}
异常信息:Listener method 'public void com.atguigu.spring.kafka.consumer.listener.MyKafkaListenerAck.onMessage1(org.apache.kafka.clients.consumer.ConsumerRecord<java.lang.String, java.lang.String>,org.springframework.kafka.support.Acknowledgment)' threw exception
=================
my_group1消费者获取分区0的消息:topic = my_topic1,partition:0,offset = 2,key = ,value = 指定ack-mode: manual_immediate消费6
出现异常,消息内容:value = ConsumerRecord(topic = my_topic1, partition = 0, leaderEpoch = 0, offset = 2, CreateTime = 1717672170855, serialized key size = 0, serialized value size = 39, headers = RecordHeaders(headers = [], isReadOnly = false), key = , value = 指定ack-mode: manual_immediate消费6)
header = {id=68082673-07d2-6f69-3935-c7034a7caf81, timestamp=1717672488753}
异常信息:Listener method 'public void com.atguigu.spring.kafka.consumer.listener.MyKafkaListenerAck.onMessage1(org.apache.kafka.clients.consumer.ConsumerRecord<java.lang.String, java.lang.String>,org.springframework.kafka.support.Acknowledgment)' threw exception
=================
my_group1消费者获取分区0的消息:topic = my_topic1,partition:0,offset = 3,key = ,value = 指定ack-mode: manual_immediate消费9
出现异常,消息内容:value = ConsumerRecord(topic = my_topic1, partition = 0, leaderEpoch = 0, offset = 3, CreateTime = 1717672170855, serialized key size = 0, serialized value size = 39, headers = RecordHeaders(headers = [], isReadOnly = false), key = , value = 指定ack-mode: manual_immediate消费9)
header = {id=bcbda053-9536-df91-d937-2688b5d4c6ea, timestamp=1717672488754}
异常信息:Listener method 'public void com.atguigu.spring.kafka.consumer.listener.MyKafkaListenerAck.onMessage1(org.apache.kafka.clients.consumer.ConsumerRecord<java.lang.String, java.lang.String>,org.springframework.kafka.support.Acknowledgment)' threw exception
=================

1.8.n、第n次启动SpringKafkaConsumerApplication

  .   ____          _            __ _ _
 /\\ / ___'_ __ _ _(_)_ __  __ _ \ \ \ \
( ( )\___ | '_ | '_| | '_ \/ _` | \ \ \ \
 \\/  ___)| |_)| | | | | || (_| |  ) ) ) )
  '  |____| .__|_| |_|_| |_\__, | / / / /
 =========|_|==============|___/=/_/_/_/
 :: Spring Boot ::                (v3.0.5)

my_group1消费者获取分区0的消息:topic = my_topic1,partition:0,offset = 0,key = ,value = 指定ack-mode: manual_immediate消费0
出现异常,消息内容:value = ConsumerRecord(topic = my_topic1, partition = 0, leaderEpoch = 0, offset = 0, CreateTime = 1717672170845, serialized key size = 0, serialized value size = 39, headers = RecordHeaders(headers = [], isReadOnly = false), key = , value = 指定ack-mode: manual_immediate消费0)
header = {id=6bc9edb9-ad1c-e00e-9b27-c0c540248091, timestamp=1717672854508}
异常信息:Listener method 'public void com.atguigu.spring.kafka.consumer.listener.MyKafkaListenerAck.onMessage1(org.apache.kafka.clients.consumer.ConsumerRecord<java.lang.String, java.lang.String>,org.springframework.kafka.support.Acknowledgment)' threw exception
=================
my_group1消费者获取分区0的消息:topic = my_topic1,partition:0,offset = 1,key = ,value = 指定ack-mode: manual_immediate消费3
出现异常,消息内容:value = ConsumerRecord(topic = my_topic1, partition = 0, leaderEpoch = 0, offset = 1, CreateTime = 1717672170855, serialized key size = 0, serialized value size = 39, headers = RecordHeaders(headers = [], isReadOnly = false), key = , value = 指定ack-mode: manual_immediate消费3)
header = {id=14e6951b-b25f-10ca-702c-a1699d25645b, timestamp=1717672854508}
异常信息:Listener method 'public void com.atguigu.spring.kafka.consumer.listener.MyKafkaListenerAck.onMessage1(org.apache.kafka.clients.consumer.ConsumerRecord<java.lang.String, java.lang.String>,org.springframework.kafka.support.Acknowledgment)' threw exception
=================
my_group1消费者获取分区0的消息:topic = my_topic1,partition:0,offset = 2,key = ,value = 指定ack-mode: manual_immediate消费6
出现异常,消息内容:value = ConsumerRecord(topic = my_topic1, partition = 0, leaderEpoch = 0, offset = 2, CreateTime = 1717672170855, serialized key size = 0, serialized value size = 39, headers = RecordHeaders(headers = [], isReadOnly = false), key = , value = 指定ack-mode: manual_immediate消费6)
header = {id=26e85ef8-2502-fd29-8d5b-091ec0362900, timestamp=1717672854509}
异常信息:Listener method 'public void com.atguigu.spring.kafka.consumer.listener.MyKafkaListenerAck.onMessage1(org.apache.kafka.clients.consumer.ConsumerRecord<java.lang.String, java.lang.String>,org.springframework.kafka.support.Acknowledgment)' threw exception
=================
my_group1消费者获取分区0的消息:topic = my_topic1,partition:0,offset = 3,key = ,value = 指定ack-mode: manual_immediate消费9
出现异常,消息内容:value = ConsumerRecord(topic = my_topic1, partition = 0, leaderEpoch = 0, offset = 3, CreateTime = 1717672170855, serialized key size = 0, serialized value size = 39, headers = RecordHeaders(headers = [], isReadOnly = false), key = , value = 指定ack-mode: manual_immediate消费9)
header = {id=25c6b6c8-d02d-2091-7c6f-ebaea6c52d1d, timestamp=1717672854509}
异常信息:Listener method 'public void com.atguigu.spring.kafka.consumer.listener.MyKafkaListenerAck.onMessage1(org.apache.kafka.clients.consumer.ConsumerRecord<java.lang.String, java.lang.String>,org.springframework.kafka.support.Acknowledgment)' threw exception
=================

此时如果不关闭SpringKafkaConsumerApplication,生产者继续发送消息,消费者只会往后消费,不会从头再次消费

在这里插入图片描述
在这里插入图片描述

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/1795556.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

【WP】猿人学_17_天杀的Http2.0

https://match.yuanrenxue.cn/match/17 抓包分析 居然对Fiddler有检测&#xff0c;不允许使用 那就使用浏览器抓包&#xff0c;好像没发现什么加密参数&#xff0c;然后重放也可以成功&#xff0c;时间长了也无需刷新页面&#xff0c;尝试Python复现。 Python复现 import …

在线标注流程

文章目录 在线标注流程标注方法 在线标注流程 登录地址&#xff1a;http://7a27c5e078f644a2a9b734603913c65e.login.bce.baidu.com 出现页面&#xff1a; 登录名&#xff1a; 三个中任意一个 密码&#xff1a;ZNSJ123a 登录之后叉掉。再打开这个网站&#xff1a;https://…

CAD 文件(DXF / DWG)转换为(DXF / PDF / PNG / SVG)

方法一Github 这个是ezdxf出品的&#xff0c;可以使用命令行的方式进行转换 ezdxf draw -o file.<png|svg|pdf> <file.dxf>也可以自己改动代码 examples/addons/drawing/pdf_export.py 但是直接运行会有误&#xff0c;以下是我改动后的代码&#xff1a; from ez…

ArrayList——简单洗牌算法

特殊语法介绍&#xff1a; List<List<E>> 该语法情况比较特殊&#xff0c;相当于一个“二维数组”存着一个个线性表的结构&#xff0c;如图&#xff1a; 该语法的灵活性强&#xff0c;可适用于多种类型和多种情况。接下来就使用该语法来实现一个简单的洗牌操作。…

【Linux取经路】网络套接字编程——初识篇

文章目录 一、端口号1.1 认识端口号1.2 端口号 VS 进程 PID 二、认识 TCP 协议三、认识 UDP四、网络字节序列五、socket 编程接口5.1 常用 API5.2 sockaddr 结构 六、结语 一、端口号 网络通信的本质是应用层软件进行数据的发送和接受&#xff0c;软件在启动之后&#xff0c;本…

快速自定义表单开发的优势介绍

进行高效率的办公是很多职场人的梦想。借助什么样的软件平台可以提质增效&#xff1f;低代码技术平台是当前较为流行的办公软件平台产品&#xff0c;具有灵活性、易操作、好维护等多个优势特点&#xff0c;操作人员只需要像搭积木似地操作&#xff0c;就可以搭建属于客户的个性…

[element-ui]el-form自定义校验-图片上传验证(手动触发部分验证方法)

背景&#xff1a; 在做导入文件功能的时候&#xff0c;需要校验表单&#xff0c;如图所示 店铺字段绑定在表单数据对象上&#xff0c;在点击确定的时候正常按照表单验证规则去校验&#xff0c;就不再赘述。 文件上传是个异步过程&#xff0c;属性值改变后不会去触发验证规则…

一.网络基础——OSI七层模型

一.OSI七层模型 OSI&#xff08;Open System Interconnection&#xff0c;开放系统互连&#xff09;七层网络模型被称为开放式系统互联参考模型&#xff0c;它是一个逻辑上的定义和规范; 它把网络从逻辑上分为了7层. 每一层都有相关、相对应的物理设备&#xff0c;比如路由器&…

[数据集][目标检测]焊接处缺陷检测数据集VOC+YOLO格式3400张8类别

数据集格式&#xff1a;Pascal VOC格式YOLO格式(不包含分割路径的txt文件&#xff0c;仅仅包含jpg图片以及对应的VOC格式xml文件和yolo格式txt文件) 图片数量(jpg文件个数)&#xff1a;3400 标注数量(xml文件个数)&#xff1a;3400 标注数量(txt文件个数)&#xff1a;3400 标注…

FFmpeg播放器的相关概念【1】

播放器框架 相关术语 •容器&#xff0f;文件&#xff08;Conainer/File&#xff09;&#xff1a;即特定格式的多媒体文件&#xff0c;比如mp4、flv、mkv等。 • 媒体流&#xff08;Stream&#xff09;&#xff1a;表示时间轴上的一段连续数据&#xff0c;如一段声音数据、一段…

6.6学习总结

一.算法练习(Codeforces Round 949 (Div. 2)和) B. Turtle and an Infinite Sequence 思路&#xff1a;对于数字而言&#xff0c;轮之后的结果是所有数的或。因此只需要求区间或就行了。(其实就是找区间左边界,二进制中的特殊位置,将后面的所有位都变成1,最后输出结果) 代码实…

关于计算机是如何工作的

计算机的发展历程 世界上的第一个计算机 冯诺依曼机构体系 1.存储器 (包括内存(存储空间小,访问速度快,成本高,掉电后数据丢失) 外存(硬盘,软盘,U盘,光盘)),存储空间小,访问速度慢,成本低,掉电后数据仍在 2.CPU(中央处理单元,计算机最核心的部分,用于算术运算和逻辑判断),…

【ARM Cache 及 MMU 系列文章 6.2 -- ARMv8/v9 Cache 内部数据读取方法详细介绍】

请阅读【ARM Cache 及 MMU/MPU 系列文章专栏导读】 及【嵌入式开发学习必备专栏】 文章目录 Direct access to internal memoryL1 cache encodingsL1 Cache Data 寄存器Cache 数据读取代码实现Direct access to internal memory 在ARMv8架构中,缓存(Cache)是用来加速数据访…

算法金 | 再见!!!KNN

大侠幸会&#xff0c;在下全网同名「算法金」 0 基础转 AI 上岸&#xff0c;多个算法赛 Top 「日更万日&#xff0c;让更多人享受智能乐趣」 KNN算法的工作原理简单直观&#xff0c;易于理解和实现&#xff0c;这使得它在各种应用场景中备受青睐。 我们将深入探讨KNN算法&…

带你了解消防安全与应急救援,2024北京消防展6月盛大开启

带你了解消防安全与应急救援&#xff0c;2024北京国际消防展6.26盛大开启 在日益关注安全问题的今天&#xff0c;消防安全与应急救援已经成为社会发展的重要一环。为了提高全民消防安全意识&#xff0c;推动应急救援技术的发展&#xff0c;2024年北京国际消防展将于6月26日盛大…

Docker自定义镜像实现(SpringBoot程序为例)

✅作者简介&#xff1a;大家好&#xff0c;我是 Meteors., 向往着更加简洁高效的代码写法与编程方式&#xff0c;持续分享Java技术内容。&#x1f34e;个人主页&#xff1a;Meteors.的博客&#x1f49e;当前专栏&#xff1a;知识备份✨特色专栏&#xff1a;知识分享&#x1f96…

【窗口函数的详细使用】

前言&#xff1a; &#x1f49e;&#x1f49e;大家好&#xff0c;我是书生♡&#xff0c;今天主要和大家分享一下可MySQL中的窗口函数的概念&#xff0c;语法以及常用的窗口函数,希望对大家有所帮助。感谢大家关注点赞。 &#x1f49e;&#x1f49e;前路漫漫&#xff0c;希望大…

(面试官问我微服务与naocs的使用我回答了如下,面试官让我回去等通知)微服务拆分与nacos的配置使用

微服务架构 正常的小项目就是所有的功能集成在一个模块中&#xff0c;这样代码之间不仅非常耦合&#xff0c;而且修改处理的时候也非常的麻烦&#xff0c;应对高并发时也不好处理&#xff0c;所以 我们可以使用微服务架构&#xff0c;对项目进行模块之间的拆分&#xff0c;每一…

Nacos服务管理

1.前言 在当前的分布式系统和微服务架构中&#xff0c;服务发现和管理变得至关重要。Nacos&#xff08;NAme COntrol Service&#xff09;作为一个开源的动态服务发现、配置管理和服务管理平台&#xff0c;为解决这些挑战提供了一站式的解决方案。其提供的负载均衡、分环境管理…

接口自动化框架封装思想建立(全)

httprunner框架&#xff08;上&#xff09; 一、什么是Httprunner&#xff1f; 1.httprunner是一个面向http协议的通用测试框架&#xff0c;以前比较流行的是2.X版本。 2.他的思想是只需要维护yaml/json文件就可以实现接口自动化测试&#xff0c;性能测试&#xff0c;线上监…