详解kafka消息发送重试机制的案例

news2024/11/16 17:33:56

在 Kafka 生产者中实现消息发送的重试机制,可以通过配置 KafkaProducer 的相关属性来实现。以下是一些关键的配置项:

retries:设置生产者发送失败后重试的次数。

retry.backoff.ms:设置生产者在重试前等待的时间。

buffer.memory:设置生产者在内存中缓存数据的最大值,如果达到这个值,生产者会拒绝接受新的消息,直到当前缓存的消息被发送出去。

batch.size:设置生产者在发送批次中可以包含的最大消息数。

linger.ms:设置生产者在发送批次之前等待更多消息的最大时间。

max.in.flight.requests.per.connection:设置每个连接最多数未完成的请求

import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.serialization.StringSerializer;

import java.util.Properties;

public class KafkaProducerDemo {
    public static void main(String[] args) {
        // 配置生产者属性
        Properties props = new Properties();
        props.put("bootstrap.servers", "4.5.8.4:9092");
        props.put("key.serializer", StringSerializer.class.getName());
        props.put("value.serializer", StringSerializer.class.getName());
        props.put("retries", 5); // 设置重试次数
        props.put("retry.backoff.ms", 100); // 设置重试间隔
        props.put("buffer.memory", 33554432); // 设置缓冲区大小
        props.put("batch.size", 16384); // 设置批次大小
        props.put("linger.ms", 1); // 设置等待时间
        props.put("max.in.flight.requests.per.connection", 5); // 设置最大在途请求数

        // 创建生产者实例
        Producer<String, String> producer = new KafkaProducer<>(props);

        // 发送消息
        for (int i = 0; i < 1000000; i++) {
            String key = "案例1=====" + i;
            System.out.println("key:"+key);
            String value = "Spring AI Alibaba 实现了与阿里云通义模型的完整适配,接下来,我们将学习如何使用 spring ai alibaba 开发一个基于通义模型服务的智能聊天应用" + i;
            ProducerRecord<String, String> record = new ProducerRecord<>("test-topic", key, value);
            producer.send(record, (metadata, exception) -> {
                if (exception != null) {
                    // 处理消息发送失败的情况
                    System.err.println("发送消息失败:" + exception.getMessage());
                } else {
                    // 处理消息发送成功的情况
                    System.out.println("消息发送成功,偏移量:" + metadata.offset());
                }
            });
        }

        // 关闭生产者
        producer.close();
    }
}

在这个示例中,我们设置了重试次数、重试间隔、缓冲区大小、批次大小、等待时间和最大在途请求数。此外,我们还为 send 方法提供了一个回调函数,用于处理消息发送成功或失败的情况。这样,当消息发送失败时,生产者会自动重试,直到达到配置的重试次数。如果所有重试都失败,回调函数会收到异常通知,你可以在回调中实现进一步的错误处理逻辑。

🔍 如何配置Kafka生产者的重试策略?

其实上面也有说,再次总结下

要配置 Kafka 生产者的重试策略,你可以按照以下步骤进行:

  1. 设置重试次数

    • 通过设置 retries 属性来指定生产者在遇到错误时重试发送消息的次数。例如,设置 retries 为 3 表示生产者会尝试最多 3 次发送消息。
  2. 设置重试间隔

    • 使用 retry.backoff.ms 属性来配置重试之间的时间间隔。这个设置可以防止生产者在连续的短时间内发送大量重试请求,给 Kafka 集群或网络造成压力。
  3. 确保消息幂等性

    • 设置 enable.idempotencetrue 以确保生产者发送消息的逻辑是幂等的,即使消息被重复发送也不会影响系统状态。
  4. 配置确认策略

    • 通过 acks 属性来确保消息被所有副本确认。例如,设置 acks 为 “all” 可以确保消息被所有副本确认后才认为是成功发送。
  5. 异步发送与回调

    • 使用异步发送消息,并在回调中处理发送失败的情况。在回调中对异常进行分类处理,对于可恢复的错误进行重试,对于不可恢复的错误进行日志记录或报警。
  6. 错误处理与日志记录

    • 在回调函数中捕获并处理异常,同时记录详细的错误日志,便于问题排查和监控。
  7. 监控与告警

    • 对生产者的关键性能指标进行监控,如发送延迟、吞吐量等。当指标出现异常时,及时触发告警通知相关人员处理。
  8. 合理配置重试机制

    • 根据业务需求合理配置重试次数和重试间隔,以减少因网络波动或 Kafka 集群短暂不可用导致的消息丢失风险。
  9. 设置最大在途请求

    • 通过 max.in.flight.requests.per.connection 属性限制每个连接最多数未完成的请求,这有助于控制内存使用和重试的并发量。
  10. 配置超时时间

    • Kafka 2.4 版本引入了 delivery.timeout.ms 参数,它设置了发送记录和接收确认之间的超时时间。这个参数与 retries 结合使用,可以提供更灵活的重试控制。

通过上述配置,你可以为 Kafka 生产者设置一个健壮的重试策略,以确保在面对网络问题或 Kafka 集群短暂不可用时,消息能够被可靠地发送。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/2241644.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

大数据学习14之Scala面向对象--至简原则

1.类和对象 1.1基本概念 面向对象&#xff08;Object Oriented&#xff09;是一种编程思想&#xff0c;面向对象主要是把事物给对象化&#xff0c;包括其属性和行为。面向对象编程更贴近实际生活的思想&#xff0c;总体来说面向对象的底层还是面向过程&#xff0c;面向过程抽象…

pipx安装提示找不到包

执行&#xff1a; pipx install --include-deps --force "ansible6.*"WARNING: Retrying (Retry(total4, connectNone, readNone, redirectNone, statusNone)) after connection broken by NewConnectionError(<pip._vendor.urllib3.connection.HTTPSConnection …

‘conda‘ 不是内部或外部命令,也不是可运行的程序或批处理文件,Miniconda

下载了conda&#xff0c;但是在cmd里执行conda --version会显示’conda’ 不是内部或外部命令&#xff0c;也不是可运行的程序或批处理文件。 原因是环境变量里没有添加conda&#xff0c;无法识别路径。 需要在系统环境变量里添加如下路径&#xff1a; 保存之后重新打开cmd&am…

【Qt】使用QString的toLocal8Bit()导致的问题

问题 使用Qt发送一个Http post请求的时候&#xff0c;服务一直返回错误和失败信息。同样的url以及post参数&#xff0c;复制黏贴到postman里就可以发送成功。就感觉很神奇。 原因 最后排查出原因是因为参数中含有汉字而导致的编码问题。 在拼接post参数时&#xff0c;使用了…

设计一致性的关键:掌握 Axure 母版使用技巧

设计一致性的关键&#xff1a;掌握 Axure 母版使用技巧 前言 在快节奏的产品开发周期中&#xff0c;设计师们一直在寻找能够提升工作效率和保持设计一致性的方法。 Axure RP&#xff0c;作为一款强大的原型设计工具&#xff0c;其母版功能为设计师们提供了一个强大的解决方案…

鸿蒙next ui安全区域适配(刘海屏、摄像头挖空等)

目录 相关api 团结引擎对于鸿蒙的适配已经做了安全区域的适配&#xff0c;也考虑到了刘海屏和摄像机挖孔的情况&#xff0c;在团结引擎内可以直接使用Screen.safeArea 相关api 团结引擎对于鸿蒙的适配已经做了安全区域的适配&#xff0c;也考虑到了刘海屏和摄像机挖孔的情况&am…

Android OpenGL ES详解——实例化

目录 一、实例化 1、背景 2、概念 实例化、实例数量 gl_InstanceID 应用举例 二、实例化数组 1、概念 2、应用举例 三、应用举例——小行星带 1、不使用实例化 2、使用实例化 四、总结 一、实例化 1、背景 假如你有一个有许多模型的场景&#xff0c;而这些模型的…

前端传数组 数据库存Json : [1,2,3]格式

一、前端正常传数组&#xff0c;但是value.toString() 即可 const empIds ref([1,2,3]) empIds.value empIds.value.toString() await updateApiRules(empIds.value) // 接口传参 二、后端用String类型接收后转换 String[] empIds updateDO.getEmpId().split("&#x…

《Java核心技术 卷I》用户图形界面鼠标事件

鼠标事件 如果只希望用户能够点击按钮或菜单&#xff0c;那么就不需要显式地处理鼠标事件&#xff0c;鼠标操作将由用户界面中的各种组件内部处理&#xff0c;不过&#xff0c;如果希望用户能使用鼠标画图&#xff0c;就需要捕获鼠标移动&#xff0c;点击和拖动事件。 本节&am…

贪心算法入门(三)

相关文章 贪心算法入门&#xff08;一&#xff09;-CSDN博客 贪心算法入门&#xff08;二&#xff09;-CSDN博客 1.什么是贪心算法&#xff1f; 贪心算法是一种解决问题的策略&#xff0c;它将复杂的问题分解为若干个步骤&#xff0c;并在每一步都选择当前最优的解决方案&am…

企业知识中台:构建智慧企业的核心

在当今数字化时代&#xff0c;企业知识中台已成为构建智慧企业的核心。它不仅是企业知识资产的集中地&#xff0c;也是推动企业创新和提高决策效率的关键。本文将分为四个部分&#xff0c;详细探讨知识中台的概念、重要性、构建步骤以及如何利用HelpLook工具搭建企业知识库。 …

基于Spring Boot的在线性格测试系统设计与实现(源码+定制+开发)智能性格测试与用户个性分析平台、在线心理测评系统的开发、性格测试与个性数据管理系统

博主介绍&#xff1a; ✌我是阿龙&#xff0c;一名专注于Java技术领域的程序员&#xff0c;全网拥有10W粉丝。作为CSDN特邀作者、博客专家、新星计划导师&#xff0c;我在计算机毕业设计开发方面积累了丰富的经验。同时&#xff0c;我也是掘金、华为云、阿里云、InfoQ等平台…

机器学习 ---线性回归

目录 摘要&#xff1a; 一、简单线性回归与多元线性回归 1、简单线性回归 2、多元线性回归 3、残差 二、线性回归的正规方程解 1、线性回归训练流程 2、线性回归的正规方程解 &#xff08;1&#xff09;适用场景 &#xff08;2&#xff09;正规方程解的公式 三、衡量…

shell脚本(1)

声明&#xff1a;学习视频来自b站up主 泷羽sec&#xff0c;如涉及侵权马上删除文章 感谢泷羽sec 团队的教学 视频地址&#xff1a;shell脚本&#xff08;1&#xff09;脚本创建执行与变量使用_哔哩哔哩_bilibili 本文主要讲解shell脚本的创建、执行和变量的使用。 一、脚本执行…

测试实项中的偶必现难测bug--互斥逻辑异常

问题: 今天线上出了一个很奇怪的问题,看现象和接口是因为数据问题导致app模块奔溃 初步排查数据恢复后还是出现了数据重复的问题,查看后台实际只有一条数据,但是显示在app却出现了两条一模一样的置顶数据 排查: 1、顺着这个逻辑,我们准备在预发复现这个场景,先是cop…

解决MySQL中整型字段条件判断禁用不生效的问题

MySQL中&#xff0c;当尝试将整数与字符串进行比较时&#xff0c;数据库可能会尝试将字符串转换为整数。在这种情况下&#xff0c;空字符串会被转换为整数0&#xff0c;所以0 ! 会被解释为0 ! 0&#xff0c;结果自然是false。 在开发过程中&#xff0c;我们经常需要对数据库中的…

Flink1.19编译并Standalone模式本地运行

1.首先下载源码 2.本地运行 新建local_conf和local_lib文件夹&#xff0c;并且将编译后的文件放入对应的目录 2.1 启动前参数配置 2.1.2 StandaloneSessionClusterEntrypoint启动参数修改 2.1.3 TaskManagerRunner启动参数修改 和StandaloneSessionClusterEntrypoint一样修改…

创建vue插件,发布npm

开发步骤&#xff1a;1.创建一个vue项目&#xff0c;2.开发一个组件。 3.注册成插件。 4.vite和package.json配置。5.发布到npm &#xff11;.创建一个vue项目 npm create vuelatest 生成了vue项目之后&#xff0c;得到了以下结构。 在src下创建个plugins目录。用于存放开发的…

【深度学习】LSTM、BiLSTM详解

文章目录 1. LSTM简介&#xff1a;2. LSTM结构图&#xff1a;3. 单层LSTM详解4. 双层LSTM详解5. BiLSTM6. Pytorch实现LSTM示例7. nn.LSTM参数详解 1. LSTM简介&#xff1a; LSTM是一种循环神经网络&#xff0c;它可以处理和预测时间序列中间隔和延迟相对较长的重要事件。LSTM通…

Queuing 表(buffer表)的优化实践 | OceanBase 性能优化实践

案例问题描述 该案例来自一个金融行业客户的问题&#xff1a;他们发现某个应用对一个数据量相对较小的表&#xff08;仅包含数千条记录&#xff09;访问时&#xff0c;频繁遇到性能下降的情况。为解决此问题&#xff0c;客户向我们求助进行分析。我们发现这张表有频繁的批量插…