⚙️ 如何调整重试策略以适应不同的业务需求?

news2025/1/22 9:50:45

调整 Kafka 生产者和消费者的重试策略以适应不同的业务需求,需要根据业务的特性和容错要求来进行细致的配置。以下是一些关键的调整策略:

  1. 业务重要性

    • 对于关键业务消息,可以增加重试次数,并设置较长的重试间隔,以减少消息丢失的风险。
    • 对于非关键业务消息,可以减少重试次数或不进行重试,以避免不必要的资源消耗。
  2. 消息幂等性

    • 如果业务逻辑是幂等的,即多次处理相同消息不会导致业务状态不一致,可以增加重试次数。
    • 如果业务逻辑不是幂等的,需要谨慎设置重试策略,或者实现去重逻辑。
  3. 消息时效性

    • 对于时效性要求高的消息,可以减少重试间隔,以便快速尝试重新发送。
    • 对于时效性要求不高的消息,可以增加重试间隔,减少对 Kafka 集群的压力。
  4. 系统容量和负载

    • 根据 Kafka 集群和下游系统的容量和负载情况调整重试策略,避免因重试导致的额外负载影响系统稳定性。
  5. 错误类型

    • 对于临时性错误(如网络问题),可以设置较高的重试次数和较短的重试间隔。
    • 对于永久性错误(如消息格式错误),应减少重试次数,避免无意义的重试。
  6. 死信队列(DLQ)

    • 对于重试次数用尽后仍然发送失败的消息,可以配置死信队列进行存储,以便后续分析和处理。
  7. 监控和告警

    • 实施实时监控,对重试次数、失败率等关键指标进行监控,并设置告警阈值。
  8. 业务流程控制

    • 在业务流程中实现重试逻辑,例如在业务层捕获异常并根据业务规则进行重试。
  9. 自定义重试策略

    • 实现自定义的重试策略,例如指数退避策略,以适应特定的业务场景。
  10. 事务性消息

    • 如果业务要求消息发送的原子性,可以启用事务性消息发送,确保消息要么全部发送成功,要么全部不发送。
  11. 资源限制

    • 考虑到生产者和消费者的资源限制,如内存和网络带宽,合理设置重试策略,避免资源耗尽。
  12. 反馈机制

    • 建立反馈机制,根据业务运行情况和系统性能反馈调整重试策略。

通过综合考虑上述因素,可以为不同的业务需求定制合适的重试策略,以确保 Kafka 消息系统的高效性和可靠性。

在这里插入图片描述

以下是一些代码案例,展示了如何根据不同的业务需求调整 Kafka 生产者和消费者的重试策略

在这里插入图片描述

Kafka 生产者重试策略案例

import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.serialization.StringSerializer;

import java.util.Properties;

public class CustomRetryProducerDemo {
    public static void main(String[] args) {
        // 配置生产者属性
        Properties props = new Properties();
        props.put("bootstrap.servers", "4.5.8.4:9092");
        props.put("key.serializer", StringSerializer.class.getName());
        props.put("value.serializer", StringSerializer.class.getName());
        props.put("retries", 5); // 设置重试次数
        props.put("retry.backoff.ms", 1000); // 设置重试间隔为1秒
        props.put("buffer.memory", 33554432); // 设置缓冲区大小
        props.put("batch.size", 16384); // 设置批次大小
        props.put("linger.ms", 10); // 设置等待时间为10毫秒
        props.put("max.in.flight.requests.per.connection", 1); // 设置最大在途请求数

        // 创建生产者实例
        Producer<String, String> producer = new KafkaProducer<>(props);

        // 发送消息
        for (int i = 0; i < 100; i++) {
            String key = "key-" + i;
            String value = "value-" + i;
            ProducerRecord<String, String> record = new ProducerRecord<>("test-topic", key, value);
            producer.send(record, (metadata, exception) -> {
                if (exception != null) {
                    // 处理消息发送失败的情况
                    System.err.println("发送消息失败:" + exception.getMessage());
                } else {
                    // 处理消息发送成功的情况
                    System.out.println("消息发送成功,偏移量:" + metadata.offset());
                }
            });
        }

        // 关闭生产者
        producer.close();
    }
}

Kafka 消费者重试策略案例

import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.serialization.StringDeserializer;

import java.time.Duration;
import java.util.Collections;
import java.util.Properties;

public class CustomRetryConsumerDemo {
    public static void main(String[] args) {
        // 配置消费者属性
        Properties props = new Properties();
        props.put("bootstrap.servers", "4.5.8.4:9092");
        props.put("group.id", "test-group");
        props.put("enable.auto.commit", "true");
        props.put("auto.commit.interval.ms", "1000");
        props.put("key.deserializer", StringDeserializer.class.getName());
        props.put("value.deserializer", StringDeserializer.class.getName());
        props.put("max.poll.records", 500); // 设置每次拉取的最大记录数
        props.put("fetch.min.bytes", 1024); // 设置最小获取1KB的数据
        props.put("fetch.max.wait.ms", 500); // 设置最大等待500ms

        // 创建消费者实例
        Consumer<String, String> consumer = new KafkaConsumer<>(props);

        // 订阅主题
        consumer.subscribe(Collections.singletonList("test-topic"));

        // 消费消息
        while (true) {
            ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));
            for (ConsumerRecord<String, String> record : records) {
                try {
                    // 处理消息
                    System.out.printf("offset = %d, key = %s, value = %s%n", record.offset(), record.key(), record.value());
                    // 假设处理消息可能会失败
                    if (record.value().contains("error")) {
                        throw new RuntimeException("模拟处理消息失败");
                    }
                } catch (Exception e) {
                    // 处理消息失败,记录日志或重试
                    System.err.println("处理消息失败:" + e.getMessage());
                    // 可以在这里实现重试逻辑,例如将消息发送到死信队列
                }
            }
            // 批量提交偏移量
            consumer.commitSync();
        }
    }
}

死信队列(DLQ)案例

import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.serialization.StringSerializer;

import java.util.Properties;

public class DLQProducerDemo {
    public static void main(String[] args) {
        // 配置生产者属性
        Properties props = new Properties();
        props.put("bootstrap.servers", "4.5.8.4:9092");
        props.put("key.serializer", StringSerializer.class.getName());
        props.put("value.serializer", StringSerializer.class.getName());
        props.put("retries", 5); // 设置重试次数
        props.put("retry.backoff.ms", 1000); // 设置重试间隔为1秒

        // 创建生产者实例
        Producer<String, String> producer = new KafkaProducer<>(props);

        // 发送消息
        for (int i = 0; i < 100; i++) {
            String key = "key-" + i;
            String value = "value-" + i;
            ProducerRecord<String, String> record = new ProducerRecord<>("test-topic", key, value);
            producer.send(record, (metadata, exception) -> {
                if (exception != null) {
                    // 处理消息发送失败的情况
                    System.err.println("发送消息失败:" + exception.getMessage());
                    // 将失败的消息发送到死信队列
                    ProducerRecord<String, String> dlqRecord = new ProducerRecord<>("test-topic-DLQ", key, exception.getMessage());
                    producer.send(dlqRecord);
                } else {
                    // 处理消息发送成功的情况
                    System.out.println("消息发送成功,偏移量:" + metadata.offset());
                }
            });
        }

        // 关闭生产者
        producer.close();
    }
}

这些代码案例展示了如何根据不同的业务需求调整 Kafka
生产者和消费者的重试策略,包括设置重试次数、重试间隔、处理消息发送失败的情况以及实现死信队列(DLQ)。希望这些示例能帮助您更好地理解和应用
Kafka 的重试机制。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/2239841.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

uniCloud云对象调用第三方接口,根据IP获取用户归属地的免费API接口,亲测可用

需求 在2022年5月初&#xff0c;网络上各大平台上&#xff0c;都开始展示用户IP属地&#xff0c;在某音、某手等小视频平台以及各主流网站应用中&#xff0c;都展示IP归属地&#xff0c;如下图所示&#xff1a; 解决办法 收费文档的肯定有很多&#xff0c;基本你百度搜“归…

蓝桥杯PythonB组扫盲

题目分布&#xff08;参考2024年省赛&#xff09;&#xff1a;总共八道题&#xff0c;两填空8代码&#xff08;考察计算机基础知识和一些简单数学计算知识&#xff0c;不会太难&#xff0c;稍微准备下就行&#xff09;&#xff0c;六道程序设计题&#xff08;重点和难点&#x…

STM32单片机WIFI语音识别智能衣柜除湿消毒照明

实践制作DIY- GC0196-WIFI语音识别智能衣柜 一、功能说明&#xff1a; 基于STM32单片机设计-WIFI语音识别智能衣柜 二、功能介绍&#xff1a; STM32F103C系列最小系统板LCD1602显示器ULN2003控制的步进电机&#xff08;柜门开关&#xff09;5V加热片直流风扇紫外消毒灯DHT11…

git重置的四种类型(Git Reset)

git区域概念 1.工作区:IDEA中红色显示文件为工作区中的文件 (还未使用git add命令加入暂存区) 2.暂存区:IDEA中绿色(本次还未提交的新增的文件显示为绿色)或者蓝色(本次修改的之前版本提交的文件但本次还未提交的文件显示为蓝色)显示的文件为暂存区中的文件&#xff08;使用了…

MySQL技巧之跨服务器数据查询:基础篇-更新语句如何写

MySQL技巧之跨服务器数据查询&#xff1a;基础篇-更新语句如何写 上一篇已经描述&#xff1a;借用微软的SQL Server ODBC 即可实现MySQL跨服务器间的数据查询。 而且还介绍了如何获得一个在MS SQL Server 可以连接指定实例的MySQL数据库的连接名: MY_ODBC_MYSQL 以及用同样的…

C#入门 023 什么是类(Class)

什么是“类” 是一种数据结构 是一种数据类型 代表现实世界中的“种类” 构造器和析构器 析构器 析构器&#xff08;Destructor&#xff09;是一种特殊的成员方法&#xff0c;用于在对象被垃圾回收器&#xff08;Garbage Collector, GC&#xff09;回收之前执行清理操作。…

AXI DMA (一)

免责声明&#xff1a;本文所提供的信息和内容仅供参考。作者对本文内容的准确性、完整性、及时性或适用性不作任何明示或暗示的保证。在任何情况下&#xff0c;作者不对因使用本文内容而导致的任何直接或间接损失承担责任&#xff0c;包括但不限于数据丢失、业务中断或其他经济…

ubuntu 安装kafka-eagle

上传压缩包 kafka-eagle-bin-2.0.8.tar.gz 到集群 /root/efak 目录 cd /root/efak tar -zxvf kafka-eagle-bin-2.0.8.tar.gz cd /root/efak/kafka-eagle-bin-2.0.8 mkdir /root/efakmodule tar -zxvf efak-web-2.0.8-bin.tar.gz -C /root/efakmodule/ mv /root/efakmodule/efak…

TCP 三次握手意义及为什么是三次握手

✨✨✨励志成为超级技术宅 ✨✨✨ TCP的三次握手在笔试和面试中经常考察&#xff0c;非常重要&#xff0c;那么大家有没有思考过为什么是三次握手&#xff0c;俩次握手行不行呢&#xff1f;四次握手行不行呢&#xff1f;如果大家有疑问或者不是很理解&#xff0c;那么这篇博客…

vmware在全屏模式下快速切换回win桌面的方法

window上开发没有ubuntu下的方便&#xff0c;经常在window主机和ubuntu虚拟机直接切换太麻烦&#xff0c;每次得ctrlalt从虚拟机释放鼠标才可以切换&#xff0c;经过折腾发现以下几种方法可行 方法1 虚拟机监听切换按键并通知主机进行切换桌面 虚拟主机放在单独的一个桌面上并…

[Docker#8] 容器配置 | Mysql | Redis | C++ | 资源控制 | 命令对比

目录 一&#xff1a;Mysql 容器化安装 二&#xff1a;Redis 容器化安装 Redis 简介 Redis 容器创建 三&#xff1a;C容器制作 四&#xff1a;容器资源更新 常见问题 一&#xff1a;Mysql 容器化安装 进入 mysql 的镜像网站&#xff0c;查找 mysql 的镜像 mysql docker…

泷羽sec学习打卡-Linux基础

声明 学习视频来自B站UP主 泷羽sec,如涉及侵权马上删除文章 笔记的只是方便各位师傅学习知识,以下网站只涉及学习内容,其他的都与本人无关,切莫逾越法律红线,否则后果自负 关于Linux的那些事儿-Base 一、Linux-Base什么时openssl&#xff1f;有哪些加密参数&#xff1f;常用lin…

SpringBoot后端解决跨域问题

1.全局方式 新建一个conifg配置类&#xff0c;内容如下&#xff1a; Configuration public class CorsConfig implements WebMvcConfigurer {Overridepublic void addCorsMappings(CorsRegistry registry) {registry.addMapping("/**")//是否发送Cookie.allowCrede…

odoo17 owl 前端 顶部导航栏右侧添加自定义按钮

odoo17 前端 顶部导航栏右侧添加自定义按钮 先看图 很多时候都想要在这添加个自定义按钮或图标, 无穷下手添加 这里将展示如何在顶部header添加自定义 添加自定义模块 demo 目录结构如下 └─demo│ __init__.py│ __manifest__.py│├─static│ └─src│ ├─s…

Power bi中的lookupvalue函数

lookupvalue函数是一个非常实用的函数&#xff0c;它可用于在两个表之间查找相应的值。kagkupMalue函数可以将一个表中的列值作为参数传递给另一个表中的列&#xff0c;并返回在第二个表中与该值匹配的另一个列的值。在实践中&#xff0c;lookupvalue函数通常用于两个表之间的关…

golang分布式缓存项目 Day5 分布式节点

该项目原作者&#xff1a;https://geektutu.com/post/geecache-day1.html。本文旨在记录本人做该项目时的一些疑惑解答以及部分的测试样例以便于本人复习 1 流程回顾注&#xff1a; 我们在GeeCache 第二天 中描述了 geecache 的流程。在这之前已经实现了流程 ⑴ 和 ⑶&#xf…

[产品管理-76]:延续是创新与颠覆式创新的比较

目录 一、概述 1、定义与特征 2、市场影响与竞争策略 3、实施难度与风险 4、案例分析 二、示例 1. 延续性创新示例 2. 创新示例 3. 颠覆式创新示例 一、概述 延续性创新与颠覆式创新是技术创新领域的两种重要策略&#xff0c;它们在多个方面存在显著差异。 以下是对…

【 AI写作鹅-注册安全分析报告-无验证方式导致安全隐患】

前言 由于网站注册入口容易被黑客攻击&#xff0c;存在如下安全问题&#xff1a; 1. 暴力破解密码&#xff0c;造成用户信息泄露 2. 短信盗刷的安全问题&#xff0c;影响业务及导致用户投诉 3. 带来经济损失&#xff0c;尤其是后付费客户&#xff0c;风险巨大&#xff0c;造…

AI:重塑电商行业的创新引擎,开启电商数字化转型新征程

&#x1f9d1; 博主简介&#xff1a;CSDN博客专家&#xff0c;历代文学网&#xff08;PC端可以访问&#xff1a;https://literature.sinhy.com/#/literature?__c1000&#xff0c;移动端可微信小程序搜索“历代文学”&#xff09;总架构师&#xff0c;15年工作经验&#xff0c;…

语义分割数据增强,图像和标签同步对应详细增强教程(附代码)

&#x1f4aa; 专业从事且热爱图像处理&#xff0c;图像处理专栏更新如下&#x1f447;&#xff1a; &#x1f4dd;《图像去噪》 &#x1f4dd;《超分辨率重建》 &#x1f4dd;《语义分割》 &#x1f4dd;《风格迁移》 &#x1f4dd;《目标检测》 &#x1f4dd;《图像增强》 &a…