kafka-重试和死信主题(SpringBoot整合Kafka)

news2025/1/24 2:10:15

文章目录

  • 1、重试和死信主题
  • 2、死信队列
  • 3、代码演示
    • 3.1、appication.yml
    • 3.2、引入spring-kafka依赖
    • 3.3、创建SpringBoot启动类
    • 3.4、创建生产者发送消息
    • 3.5、创建消费者消费消息

1、重试和死信主题

kafka默认支持重试和死信主题

  1. 重试主题:当消费者消费消息异常时,手动ack如果有异常继续向后消费,为了保证消息尽可能成功消费,可以将消费异常的消息扔到重试主题,再通过重试主题的消费者尝试消费
  2. 死信主题:当一个消息消费多次仍然失败,可以将该消息存到死信主题。避免漏消息,以后可以人工介入手动解决消息消费失败的问题

2、死信队列

在Kafka中,DLT通常指的是 Dead Letter Topic(死信队列)

Dead Letter Topic(DLT)的定义与功能

  1. 背景:原生Kafka是不支持Retry Topic和DLT的,但Spring Kafka在客户端实现了这两个功能。
  2. 功能:当消息在Kafka中被消费时,如果消费逻辑抛出异常,并且重试策略(如默认重试10次后)仍无法成功处理该消息,Spring Kafka会将该消息发送到DLT中。
  3. 自定义处理:可以通过自定义SeekToCurrentErrorHandler来控制消费失败后的处理逻辑,例如添加重试间隔、设置重试次数等。如果在重试后消息仍然消费失败,Spring Kafka会将该消息发送到DLT。

DLT的使用与意义

  1. 错误处理:DLT为Kafka提供了一种机制来处理那些无法被成功消费的消息,从而避免了消息的丢失或阻塞。
  2. 监控与告警:通过对DLT的监控,可以及时发现并处理那些无法被成功消费的消息,从而保障Kafka系统的稳定性和可靠性。
  3. 二次处理:对于发送到DLT的消息,可以进行二次处理,如手动干预、修复数据等,以确保这些消息能够最终得到处理。

总之,在Kafka中,DLT是一个用于处理无法被成功消费的消息的特殊Topic,它提供了一种灵活且可靠的机制来保障Kafka系统的稳定性和可靠性。

3、代码演示

3.1、appication.yml

server:
  port: 8120
# v1
spring:
  Kafka:
    bootstrap-servers: 192.168.74.148:9095,192.168.74.148:9096,192.168.74.148:9097
    consumer:
      # read-committed读事务已提交的消息 解决脏读问题
      isolation-level: read-committed # 消费者的事务隔离级别:read-uncommitted会导致脏读,可以读取生产者事务还未提交的消息
      # 消费者是否自动ack :true自动ack 消费者获取到消息后kafka提交消费者偏移量
      # 调用ack方法时才会提交ack给kafka
#      enable-auto-commit: false
      # 消费者提交ack时多长时间批量提交一次
      auto-commit-interval: 1000
      # 消费者第一次消费主题消息时从哪个位置开始
      # earliest:从最早的消息开始消费
      # latest:第一次从LEO位置开始消费
      # none:如果主题分区没有偏移量,则抛出异常
      auto-offset-reset: earliest  #指定Offset消费:earliest | latest | none
      key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
      value-deserializer: org.apache.kafka.common.serialization.StringDeserializer
      # json反序列化器
#      value-deserializer: org.springframework.kafka.support.serializer.JsonDeserializer
      properties:
        spring.json.trusted.packages: "*"
    listener:
      # 手动ack:manual手动ack时 如果有异常会尝试一直消费
#      ack-mode: manual
      # 手动ack:消费有异常时停止
      ack-mode: manual_immediate


3.2、引入spring-kafka依赖

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>
    <parent>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-parent</artifactId>
        <version>3.0.5</version>
        <relativePath/> <!-- lookup parent from repository -->
    </parent>

    <!-- Generated by https://start.springboot.io -->
    <!-- 优质的 spring/boot/data/security/cloud 框架中文文档尽在 => https://springdoc.cn -->
    <groupId>com.atguigu</groupId>
    <artifactId>spring-kafka-consumer</artifactId>
    <version>0.0.1-SNAPSHOT</version>
    <name>spring-kafka-consumer</name>
    <description>spring-kafka-consumer</description>
    <properties>
        <java.version>17</java.version>
    </properties>
    <dependencies>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter</artifactId>
        </dependency>

        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-test</artifactId>
            <scope>test</scope>
        </dependency>

        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-web</artifactId>
        </dependency>
        <dependency>
            <groupId>org.springframework.kafka</groupId>
            <artifactId>spring-kafka</artifactId>
        </dependency>

        <dependency>
            <groupId>org.projectlombok</groupId>
            <artifactId>lombok</artifactId>
        </dependency>

    </dependencies>

    <build>
        <plugins>
            <plugin>
                <groupId>org.springframework.boot</groupId>
                <artifactId>spring-boot-maven-plugin</artifactId>
            </plugin>
        </plugins>
    </build>

</project>

3.3、创建SpringBoot启动类

package com.atguigu.spring.kafka.consumer;

import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;


// Generated by https://start.springboot.io
// 优质的 spring/boot/data/security/cloud 框架中文文档尽在 => https://springdoc.cn
@SpringBootApplication
public class SpringKafkaConsumerApplication {

    public static void main(String[] args) {
        SpringApplication.run(SpringKafkaConsumerApplication.class, args);
    }

}

3.4、创建生产者发送消息

package com.atguigu.spring.kafka.consumer;
import jakarta.annotation.Resource;
import org.junit.jupiter.api.Test;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.kafka.core.KafkaTemplate;
import java.time.LocalDateTime;
@SpringBootTest
public class KafkaProducerApplicationTests {
    @Resource
    KafkaTemplate kafkaTemplate;

    @Test
    void sendRertyMsg(){
        kafkaTemplate.send("retry_topic",0,"","重试机制和死信队列"+"_"+ LocalDateTime.now());
    }
}

在这里插入图片描述

没有指定分区,默认会创建一个分区,即使此时有3个kafka实例,也只会用一个,因为只有一个分区

在这里插入图片描述
在这里插入图片描述

[
  [
    {
      "partition": 0,
      "offset": 0,
      "msg": "重试机制和死信队列_2024-06-07T15:49:37.398841600",
      "timespan": 1717746578357,
      "date": "2024-06-07 07:49:38"
    }
  ]
]

3.5、创建消费者消费消息

给一个消费者配重试主题的时候,死信主题消费者一般不写

package com.atguigu.spring.kafka.consumer.listener;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.kafka.annotation.RetryableTopic;
import org.springframework.kafka.annotation.TopicPartition;
import org.springframework.kafka.support.Acknowledgment;
import org.springframework.retry.annotation.Backoff;
@Component
public class MyKafkaListenerAck {
    /**
     * 自动ack可能会导致漏消息
     *      spring-kafka:
     *                     自动ack 如果有异常,会死循环获取消息重新消费
     *                        不能继续向后消费消息,会导致消息积压
     *
     *                    手动ack 配置了手动ack,且ack-mode为manual_immediate时,
     *                        如果消息消费失败,会继续向后消费
     * @param record
     */
    //指定消费者消费异常使用重试+死信主题   必须结合手动ack使用
    @RetryableTopic(
            attempts = "3",
            numPartitions = "3",  //重试主题的分区数量
            backoff =@Backoff(value = 2_000L),  //重试的时间间隔
            autoCreateTopics ="true",
            retryTopicSuffix = "-myRetry",  //指定重试主题创建时的后缀名:使用原主题的名称拼接后缀生成名称 -retry
            dltTopicSuffix = "-myDlt" //指定DLT主题创建时的后缀名:使用原主题的名称拼接后缀生成名称 -dlt
    )
    @KafkaListener(groupId = "my_group1"
    ,topicPartitions = {@TopicPartition(topic = "retry_topic",partitions = {"0","1","2"})})
    public void consumeByRetry(ConsumerRecord<String, String> record, Acknowledgment ack) {
        System.out.println("consumeByRetry消费者获取到消息:topic = "+ record.topic()
                +",partition:"+record.partition()
                +",offset = "+record.offset()
                +",key = "+record.key()
                +",value = "+record.value());
        int i = 1/0;
        //手动ack
        ack.acknowledge();
    }

   /* //死信队列默认名称在原队列后拼接'-dlt'
    @KafkaListener(groupId = "my_group2"
            ,topicPartitions = {@TopicPartition(topic = "topic_dlt",partitions = {"0","1","2"})})
    public void consumeByDLT(ConsumerRecord<String, String> record, Acknowledgment ack) {
        System.out.println("consumeByDLT消费者获取到消息:topic = "+ record.topic()
                +",partition:"+record.partition()
                +",offset = "+record.offset()
                +",key = "+record.key()
                +",value = "+record.value());
        //手动ack
        ack.acknowledge();
    }*/

}

此时运行SpringKafkaConsumerApplication,控制台会报错,因为我们手动写了异常 int i=1/0

在这里插入图片描述

在这里插入图片描述

在这里插入图片描述

[
  [
    {
      "partition": 0,
      "offset": 0,
      "msg": "重试机制和死信队列_2024-06-07T15:49:37.398841600",
      "timespan": 1717746979414,
      "date": "2024-06-07 07:56:19"
    }
  ]
]

我们发现原本在重试主题中的消息,死信主题也有一份

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/1801209.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

基于思通数科大模型的设备隐患智能检测:图像处理与声音分析的融合应用

在现代工业生产中&#xff0c;设备的稳定运行对保障生产效率和产品质量至关重要。然而&#xff0c;设备的老化、磨损以及异常状态的检测往往需要大量的人力和物力。思通数科大模型结合图像处理技术和声音分析技术&#xff0c;为设备隐患检测提供了一种自动化、高效的解决方案。…

源码、反码和补码

对于有符号数而言&#xff0c;原码就是一个数的二进制表示。二进制的最高位是符号位&#xff0c;0 表示正数&#xff0c;1 表示负数。 计算机用数的原码进行显示&#xff0c;数的计算和存储是用补码进行的。 正数的原码&#xff0c;反码和补码都一样&#xff0c;即正数三码合…

Matching Anything by Segmenting Anything

摘要 在复杂场景中跨视频帧稳健地关联相同对象是许多应用的关键&#xff0c;特别是多目标跟踪&#xff08;MOT&#xff09;。当前方法主要依赖于标注的特定领域视频数据集&#xff0c;这限制了学习到的相似度嵌入的跨域泛化能力。我们提出了MASA&#xff0c;一种新颖的方法用于…

JavaScript 动态网页实例 —— 图像显示

图像是网页设计中必不可少的内容之一,而图像的显示方式更是关系到网站的第一印象。本章介绍图像的显示,主要包括:图片的随机显示、图像的显示和隐藏、图像的滚动显示、图像的探照灯扫描显示、多幅图像的翻页显示、图像的水纹效果显示、全景图效果显示手电照射效果显示以及雷达…

揭秘800G以太网——简介

什么是800G以太网&#xff1f; 800G以太网是一种高带宽以太网标准&#xff0c;每秒可传输800 Gbps&#xff08;千兆位每秒&#xff09;的数据速率。它代表了以太网技术的又一进步&#xff0c;旨在满足不断增长的数据传输需求以及处理大量数据的能力。因此&#xff0c;800G以太…

杰理AC632N提升edr的hid传输速率, 安卓绝对坐标触摸点被识别成鼠标的修改方法

第一个问题: 首先修改edr的hid传输速率.修改你的板级配置,里面的一个地方给注释掉了,请打开那个注释就能提升edr的hid传输效率了 第二个问题: 修改632n系别把触摸板的hid报告描述符识别成鼠标点,修改如下: 注释掉上面的pnp,改成下面的

RocketMQ的安装

首先到RocketMQ官网下载页面下载 | RocketMQ (apache.org)&#xff0c;本机解压缩&#xff0c;作者在这里用的是最新的5.2.0版本。按照如下步骤安装。 1、环境变量配置rocket mq地址 ROCKETMQ_HOME D:\rocketmq-all-5.2.0-bin-release 在变量path中添加”%ROCKETMQ_HOME%\bi…

04 架构核心技术之分布式消息队列

本课时的主题是分布式消息队列&#xff0c;分布式消息队列的知识结构如下图。 本课时主要介绍以下内容。 同步架构和异步架构的区别。异步架构的主要组成部分&#xff1a;消息生产者、消息消费者、分布式消息队列。异步架构的两种主要模型&#xff1a;点对点模型和发布订阅模型…

RandomDate(接口参数化-随机生成日期)

目录 1、入口位置&#xff1a;2、验证函数生成值3、获取 年月日时分秒 的全随机4、时间函数 前言&#xff1a;有时候我们做性能测试或者接口测试时&#xff0c;参数需要传入日期格式&#xff0c;但是又不想每次都是用同一个日期&#xff0c;我们就可以使用Jmeter工具中函数助手…

[MQTT]服务器EMQX搭建SSL/TLS连接过程(wss://)

&#x1f449;原文阅读 &#x1f4a1;章前提示 本文采用8084端口进行连接&#xff0c;是EMQX 默认提供了四个常用的监听器之一&#xff0c;如果需要添加其他类型的监听器&#xff0c;可参考官方文档&#x1f517;管理 | EMQX 文档。 本文使用自签名CA&#xff0c;需要提前在L…

三次谐波式发电机定子单相接地保护Simulink仿真

在用于接地保护的发电机定子回路的仿真模型的基础上增加三次谐波电动势,得到用于仿真三次谐波式接地保护的发电机定子回路的Simulink仿真模型,如图1所示。 图 1发电机定子回路的Simulink仿真模型 发电机端和中性点侧的三次谐波电压的获取采用如图2所示的方法。 图 2 …

校园生活服务平台的设计

管理员账户功能包括&#xff1a;系统首页&#xff0c;个人中心&#xff0c;管理员管理&#xff0c;用户管理&#xff0c;跑腿管理&#xff0c;文娱活动管理&#xff0c;活动申请管理&#xff0c;备忘录管理 前台账户功能包括&#xff1a;系统首页&#xff0c;个人中心&#xff…

使用wheelnav.js构建酷炫的动态导航菜单

目录 前言 一、WheelNav是什么 1、项目地址 2、关于开源协议 3、相关目录介绍 二、如何使用wheelnav.js 1、新建html页面 2、设置style样式 3、创建展示元素实现动态导航 三、参数即方法介绍 1、参数列表 2、运行方法 3、实际成果 四、总结 前言 用户体验永远是一…

数据结构和算法一轮

前言 本文参考《2025年数据结构考研复习指导&#xff08;王道论坛组编&#xff09;》和相关文章&#xff0c;为考试前复习而写。 目录 前言 第一章线性表 1.1顺序表 1.2单链表 1.3循环链表 ​1.4双向链表 第二章栈和队列 2.1栈 2.2共享栈 2.3链栈 2.4队列 2.5循环…

大学生创新与创业搜题软件?推荐7个搜题软件和学习工具 #媒体#知识分享

随着大学课程的增多和知识的不断积累&#xff0c;大学生们常常面临着繁重的作业和复杂的题目。为了解决这一问题&#xff0c;许多大学生搜题软件应运而生。 1.彩虹搜题 这个是公众号 个性化推荐功能&#xff0c;精准满足需求。更高效地获取你想要的答案。 下方附上一些测试的…

项目质量保证措施(Word原件)

一、 质量保障措施 二、 项目质量管理保障措施 &#xff08;一&#xff09; 资深的质量经理与质保组 &#xff08;二&#xff09; 全程参与的质量经理 &#xff08;三&#xff09; 合理的质量控制流程 1&#xff0e; 质量管理规范&#xff1a; 2&#xff0e; 加强协调管理&…

高通CSIPHY combo mode介绍

目录 使用MIPI Switch 使用高通平台CSIPHY的Combo Mode YYYY使用Combo Mode电路图如下: 如何设置combo PHY mode CSIInfo configuration when camera works in normal mode 平台SoC一般都有多个CSIPHY以满足当前手机相机设计多摄的情况,但是一款SoC CSIPHY的个数也是一定…

Nested KVM Hypervisor Support

​​​​​​​Description Nested KVM是指基于虚拟化技术的虚拟机管理系统。 Nested KVM在Intel处理器上&#xff0c;KVM使用Intel的vmx&#xff08;virtualmachine eXtensions&#xff09;来提高虚拟机性能&#xff0c;即硬件辅助虚拟化技术。如果一台虚拟机能够和物理机一…

从VS Code源码看清晰代码之美

VS Code的产品做的很优秀&#xff0c;其源码也质量颇高&#xff0c;清晰、整洁、富有美感。 下面是 src\vs\workbench\common\notifications.ts 文件中的两段代码&#xff0c;大家感受一下&#xff1a; get sticky(): boolean {if (this._sticky) {return true; // explicitl…

【开源】课程智能组卷系统 SSM+JSP+MySQL

目录 一、项目介绍 学生模块 老师模块 试卷模块 试题模块 考试模块 二、项目界面 三、核心代码 一、项目介绍 经典老框架SSM打造入门项目《课程智能组卷系统》,可以给管理员们、学生、教师使用&#xff0c;包括学生模块、老师模块、试卷模块、试题模块、考试模块、公告…