Kafka - 消息零丢失实战

news2025/4/12 23:10:37

Kafka消息0丢失实战

当你用Kafka处理业务时,是否担心过消息神秘失踪?下面将从SpringBoot整合实战出发,穿透生产者→Broker→消费者全链路

1、 消息丢失的三大场景

场景1:生产者自信发送

// 致命陷阱代码示例

@Bean
public ProducerFactory<String, String> producerFactory() {
    Map<String, Object> configs = new HashMap<>();
    configs.put(ProducerConfig.ACKS_CONFIG, "0"); // 发后即忘
    configs.put(ProducerConfig.RETRIES_CONFIG, 0); // 不重试
    return new DefaultKafkaProducerFactory<>(configs);
}

// 异步发送忘记回调

kafkaTemplate.send("order-topic", orderId, json).addCallback(
    result -> logger.info("发送成功"),  // 成功日志
    ex -> logger.error("发送失败")     // 错误吞没
);

场景2:Broker的死亡错觉

# 危险配置示范
auto.create.topics.enable=true     # 自动创建主题埋雷
unclean.leader.election.enable=true # 允许脏选举
min.insync.replicas=1             # 单副本存活即工作

场景3:消费者的自信提交

// 问题配置

@KafkaListener(topics = "order-topic")
public void handle(Order order) {
    try {
        paymentService.process(order);  // 处理耗时操作
    } finally {
        // 没有手动提交偏移量!
    }
}

// 错误配置:自动提交间隔过长

spring.kafka.consumer.enable-auto-commit=true
spring.kafka.consumer.auto-commit-interval=5000

2、生产者端的钢铁防线

1. 同步发送+重试策略(金融级防护)

@Bean
public KafkaTemplate<String, String> reliableKafkaTemplate() {
    Map<String, Object> configs = new HashMap<>();
    configs.put(ProducerConfig.ACKS_CONFIG, "all"); // 必须所有副本确认
    configs.put(ProducerConfig.RETRIES_CONFIG, Integer.MAX_VALUE);
    configs.put(ProducerConfig.MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION, 1); // 防止乱序
    configs.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, true); // 开启幂等
    return new KafkaTemplate<>(new DefaultKafkaProducerFactory<>(configs));
}

// 发送模板

ListenableFuture<SendResult<String, String>> future = kafkaTemplate.send("order-topic", key, value);
future.get(10, TimeUnit.SECONDS); // 同步等待确认

2. 事务消息(分布式事务防护)

@Bean
public KafkaTransactionManager<String, String> transactionManager() {
    return new KafkaTransactionManager<>(producerFactory());
}
@Transactional
public void processOrder(Order order) {
    paymentService.charge(order);
    kafkaTemplate.send("payment-topic", order.getId(), order.toJson());
    inventoryService.reduceStock(order); 
}

3. 监控指标看护

// 注册监控指标

metrics.addMetric("producer-error-rate", (tags) -> {
    return producer.metrics().get("record-error-rate").value();
});

3、Broker集群的堡垒计划

1. 存活确认矩阵

# broker关键配置
unclean.leader.election.enable=false    # 禁止脏选举
min.insync.replicas=2                  # 至少2个副本确认
default.replication.factor=3           # 默认3副本
log.flush.interval.messages=10000      # 刷盘策略
log.flush.interval.ms=1000

2. ISR机制源码解析

// Kafka源码片段(Partition.scala)

def inSyncReplicas = {
  val leaderLogEndOffset = localLogOrException.logEndOffset
  remoteReplicas.filter { replica =>
    replica.logEndOffset >= leaderLogEndOffset &&
    (time.milliseconds - replica.lastCaughtUpTimeMs) < replicaLagTimeMaxMs
  }
}

3. 磁盘防护策略

# 使用JBOD而不是RAID(Kafka最佳实践)
log.dirs=/data/kafka/log1,/data/kafka/log2,/data/kafka/log3
# 监控脚本示例
df -h | grep /data/kafka | awk '{if ($5 > 85) print "ALERT: "$6" usage over 85%"}'

4、消费者端的终极防御

1. 手动提交+死信队列

@KafkaListener(topics = "order-topic", groupId = "payment-group")
public void listen(
    ConsumerRecord<String, String> record,
    Acknowledgment ack,
    Consumer<String, String> consumer) {
    try {
        paymentService.process(record.value());
        ack.acknowledge(); // 手动提交
    } catch (Exception ex) {
        // 记录原始消息到死信队列
        kafkaTemplate.send("order-dlq", record.key(), record.value());
        // 重置偏移量到5秒前
        consumer.seek(record.topic(), record.partition(), record.offset() - 1);
    }
}

2. 消费者组反脆弱模式

# 关键配置
spring.kafka.consumer.enable-auto-commit=false
spring.kafka.listener.ack-mode=MANUAL_IMMEDIATE
spring.kafka.consumer.isolation.level=read_committed

3. 消费延迟监控

// 计算消费延迟

long lag = record.timestamp() - System.currentTimeMillis();
metrics.recordGauge("consumer.lag", lag);

5、全链路防护方案

1. 端到端校验设计

// 消息指纹校验

public class MessageWrapper {
    private String payload;
    private String checksum; // SHA256(payload + salt)
}

// 生产者端

String salt = "kafka-secure-2023";
String checksum = DigestUtils.sha256Hex(payload + salt);
template.send("topic", new MessageWrapper(payload, checksum));

// 消费者端

if (!DigestUtils.sha256Hex(message.getPayload() + salt).equals(message.getChecksum())) {
    throw new InvalidMessageException();
}

2. 混沌工程测试用例

@SpringBootTest
public class KafkaChaosTest {
    @Autowired
    private KafkaChaosRunner chaosRunner;
    @Test
    public void testNetworkPartition() {
        chaosRunner.networkPartition("kafka-broker1", Duration.ofMinutes(5));
        // 验证消息不丢失
    }
}

3. 消息轨迹追踪

// 使用OpenTelemetry实现

Span sendSpan = tracer.spanBuilder("kafka.send")
                     .setAttribute("message.key", key)
                     .startSpan();
try (Scope scope = sendSpan.makeCurrent()) {
    kafkaTemplate.send("topic", key, value);
} finally {
    sendSpan.end();
}

配置核查清单
✅ 生产者acks=all且开启幂等
✅ broker禁用unclean leader选举
✅ 消费者关闭自动提交
✅ 事务消息开启read_committed
✅ 监控Producer/Consumer/Broker关键指标

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/2328340.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

【C++11】异步编程

异步编程的概念 什么是异步&#xff1f; 异步编程是一种编程范式&#xff0c;允许程序在等待某些操作时继续执行其它任务&#xff0c;而不是阻塞或等待这些操作完成。 异步编程vs同步编程&#xff1f; 在传统的同步编程中&#xff0c;代码按顺序同步执行&#xff0c;每个操作需…

论文阅读笔记:Denoising Diffusion Implicit Models (4)

0、快速访问 论文阅读笔记&#xff1a;Denoising Diffusion Implicit Models &#xff08;1&#xff09; 论文阅读笔记&#xff1a;Denoising Diffusion Implicit Models &#xff08;2&#xff09; 论文阅读笔记&#xff1a;Denoising Diffusion Implicit Models &#xff08…

UltraScale+系列FPGA实现 IMX214 MIPI 视频解码转HDMI2.0输出,提供2套工程源码和技术支持

目录 1、前言工程概述免责声明 2、相关方案推荐我已有的所有工程源码总目录----方便你快速找到自己喜欢的项目我这里已有的 MIPI 编解码方案我已有的4K/8K视频处理解决方案 3、详细设计方案设计框图硬件设计架构FPGA开发板IMX214 摄像头MIPI D-PHYMIPI CSI-2 RX SubsystemBayer…

BUUCTF-web刷题篇(9)

18.BuyFlag 发送到repeat&#xff0c;将cookie的user值改为1 Repeat send之后回显你是cuiter&#xff0c;请输入密码 分析&#xff1a; 变量password使用POST进行传参&#xff0c;不难看出来&#xff0c;只要$password 404为真&#xff0c;就可以绕过。函数is_numeric()判…

MySQL-- 函数(单行函数): 日期和时间函数

目录 1,获取日期、时间 2,日期与时间戳的转换 3,获取月份、星期、星期数、天数等函数 4,日期的操作函数 5,时间和秒钟转换的函数 6,计算日期和时间的函数 7,日期的格式化与解析 1,获取日期、时间 CURDATE() &#xff0c;CURRENT_DATE() 返回…

DeepSeek真的超越了OpenAI吗?

DeepSeek 现在确实很有竞争力&#xff0c;但要说它完全超越了 OpenAI 还有点早&#xff0c;两者各有优势。 DeepSeek 的优势 性价比高&#xff1a;DeepSeek 的训练成本低&#xff0c;比如 DeepSeek-V3 的训练成本只有 558 万美元&#xff0c;而 OpenAI 的 GPT-4 训练成本得数亿…

Node 22.11使用ts-node报错

最近开始学ts&#xff0c;发现使用ts-node直接运行ts代码的时候怎么都不成功&#xff0c;折腾了一番感觉是这个node版本太高还不支持&#xff0c; 于是我找了一个替代品tsx npm install tsx -g npx tsx your-file.ts -g代表全局安装&#xff0c;也可以开发环境安装&#xff0…

LabVIEW中VISA Write 与 GPIB Write的差异

在使用 LabVIEW 与 GPIB 设备通讯时&#xff0c;VISA Write Function 和 GPIB Write Function 是两个常用的函数&#xff0c;它们既有区别又有联系。 一、概述 VISA&#xff08;Virtual Instrument Software Architecture&#xff09;是一种用于仪器编程的标准 I/O 软件库&…

牛客练习题——素数(质数)

质数数量 改题目需要注意的是时间 如果进行多次判断就会超时&#xff0c;这时需要使用素数筛结合标志数组进行对所有数据范围内进行判断&#xff0c;而后再结合前缀和将结果存储到数组中&#xff0c;就可以在O(1)的时间复杂度求出素数个数。 #include<iostream>using nam…

使用MQTTX软件连接阿里云

使用MQTTX软件连接阿里云 MQTTX软件阿里云配置MQTTX软件设置 MQTTX软件 阿里云配置 ESP8266连接阿里云这篇文章里有详细的创建过程&#xff0c;这里就不再重复了&#xff0c;需要的可以点击了解一下。 MQTTX软件设置 打开软件之后&#xff0c;首先点击添加进行创建。 在阿…

23种设计模式-行为型模式-责任链

文章目录 简介问题解决代码核心改进点&#xff1a; 总结 简介 责任链是一种行为设计模式&#xff0c;允许你把请求沿着处理者链进行发送。收到请求后&#xff0c;每个处理者均可对请求进行处理&#xff0c;或将其传递给链上的下个处理者。 问题 假如你正在开发一个订单系统。…

git commit Message 插件解释说明

- feat - 一项新功能 - fix - 一个错误修复 - docs - 仅文档更改 - style - 不影响代码含义的更改&#xff08;空白、格式化、缺少分号等&#xff09; - refactor - 既不修复错误也不添加功能的代码更改 - perf - 提高性能的代码更改 - build - 影响构建系统或外部依赖项…

推荐系统(二十一):基于MaskNet的商品推荐CTR模型实现

MaskNet 是微博团队 2021 年提出的 CTR 预测模型,相关论文:《MaskNet: Introducing Feature-Wise Multiplication to CTR Ranking Models by Instance-Guided Mask》。MaskNet 通过掩码自注意力机制,在推荐系统中实现了高效且鲁棒的特征交互学习,特别适用于需处理长序列及噪…

OpenCV 从入门到精通(day_04)

1. 绘制图像轮廓 1.1 什么是轮廓 轮廓是一系列相连的点组成的曲线&#xff0c;代表了物体的基本外形。相对于边缘&#xff0c;轮廓是连续的&#xff0c;边缘不一定连续&#xff0c;如下图所示。其实边缘主要是作为图像的特征使用&#xff0c;比如可以用边缘特征可以区分脸和手…

多模态学习(八):2022 TPAMI——U2Fusion: A Unified Unsupervised Image Fusion Network

论文链接&#xff1a;https://ieeexplore.ieee.org/stamp/stamp.jsp?tp&arnumber9151265 目录 一.摘要 1.1 摘要翻译 1.2 摘要解析 二.Introduction 2.1 Introduciton翻译 2.2 Introduction 解析 三. related work 3.1 related work翻译 3.2 relate work解析 四…

JavaEE-0403学习记录

通过前期准备后&#xff0c;项目已经能够成功运行&#xff1a; 1、在文件UserMapper.java中添加如下代码&#xff1a; List<User> selectUSerByIdDynamic(User user); 2、在文件UserMapper.xml中添加如下代码&#xff1a; <select id"selectUSerByIdDynamic&quo…

图像处理:使用Numpy和OpenCV实现傅里叶和逆傅里叶变换

文章目录 1、什么是傅里叶变换及其基础理论 1.1 傅里叶变换 1.2 基础理论 2. Numpy 实现傅里叶和逆傅里叶变换 2.1 Numpy 实现傅里叶变换 2.2 实现逆傅里叶变换 2.3 高通滤波示例 3. OpenCV 实现傅里叶变换和逆傅里叶变换及低通滤波示例 3.1 OpenCV 实现傅里叶变换 3.2 实现逆傅…

RNN模型与NLP应用——(7/9)机器翻译与Seq2Seq模型

声明&#xff1a; 本文基于哔站博主【Shusenwang】的视频课程【RNN模型及NLP应用】&#xff0c;结合自身的理解所作&#xff0c;旨在帮助大家了解学习NLP自然语言处理基础知识。配合着视频课程学习效果更佳。 材料来源&#xff1a;【Shusenwang】的视频课程【RNN模型及NLP应用…

使用YoloV5和Mediapipe实现——上课玩手机检测(附完整源码)

目录 效果展示 应用场景举例 1. 课堂或考试监控&#xff08;看到这个学生党还会爱我吗&#xff09; 2. 驾驶安全监控&#xff08;防止开车玩手机&#xff09; 3. 企业办公管理&#xff08;防止工作时间玩手机&#xff09; 4. 监狱、戒毒所、特殊场所安保 5. 家长监管&am…

XT-912在热交换站的应用

热网监控需求 随着国民经济的不断进步和人民生活水平日益提高&#xff0c;社会对环境的要求越来越高。近年来国家大力提倡城镇集中供热&#xff0c;改变原来各单位、各片区自己供热、单独建立锅炉房给城市带来的污染&#xff0c;由城市外围的一个或者多个热源厂提供热源&#…