【kafka系列】生产者

news2025/2/20 4:30:08

目录

发送流程

1. 流程逻辑分析

阶段一:主线程处理

阶段二:Sender 线程异步发送

核心设计思想

2. 流程

关键点总结

重要参数

一、核心必填参数

二、可靠性相关参数

三、性能优化参数

四、高级配置

五、安全性配置(可选)

六、错误处理与监控

典型配置示例

关键注意事项


发送流程

  • 序列化与分区:消息通过Partitioner选择目标分区(默认轮询或哈希),序列化后加入RecordAccumulator缓冲区。
  • 批次合并Sender线程将同一分区的消息合并为ProducerBatch,减少网络请求(源码见Sender.run()方法)。
  • 发送至Broker:通过NetworkClient异步发送,Broker的LogAppendTime处理写入请求。
  • ACK机制:根据acks配置(0/1/all)等待Broker确认,通过Metadata类更新分区元数据

1. 流程逻辑分析

Kafka 生产者发送消息的核心流程分为 主线程处理Sender 线程异步发送 两个阶段,具体步骤如下:


阶段一:主线程处理
  1. 创建 ProducerRecord
    • 用户调用 producer.send(ProducerRecord),指定 Topic、Key、Value 和可选的分区或时间戳。
  1. 选择分区(Partition)
    • 若未指定分区,根据以下规则选择:
      • 有 Key:对 Key 哈希取模(hash(key) % 分区数),确保相同 Key 的消息进入同一分区。
      • 无 Key:默认使用粘性分区策略(Sticky Partitioning,Kafka 2.4+),在批次填满或超时前发送到同一分区,提升性能。
  1. 序列化(Serialize)
    • 使用配置的 key.serializervalue.serializer 对 Key 和 Value 序列化(如 StringSerializerByteArraySerializer)。
  1. 追加到缓冲区(RecordAccumulator)
    • 将消息按 Topic-Partition 分组,存入 RecordAccumulator 的批次(Batch)中。
    • 批次策略
      • batch.size:批次大小阈值(默认 16KB),达到阈值立即发送。
      • linger.ms:批次等待时间(默认 0ms),超时后发送未满批次。

阶段二:Sender 线程异步发送
  1. Sender 线程拉取批次
    • Sender 线程定期检查缓冲区,将满足条件的批次(已满或超时)封装为 ProducerRequest
  1. 构建请求并发送到 Broker
    • 根据分区的 Leader 副本所在 Broker,将请求发送到对应的节点。
    • 关键配置
      • acks:控制消息持久化确认级别:
        • 0:不等待确认(可能丢失数据)。
        • 1:等待 Leader 确认(默认)。
        • all:等待所有 ISR 副本确认(最高可靠性)。
      • max.in.flight.requests.per.connection:控制单个 Broker 的未确认请求数(默认 5)。
  1. 处理 Broker 响应
    • 成功:触发用户设置的 Callback 回调,并释放批次内存。
    • 失败
      • 可重试错误(如网络抖动、Leader 切换):根据 retries(默认 0)和 retry.backoff.ms(默认 100ms)重试。
      • 不可重试错误(如消息过大):直接触发回调并抛出异常。

核心设计思想
  • 异步批处理:通过缓冲区合并小消息,减少网络 I/O 次数。
  • 零拷贝优化:使用 sendfile 系统调用提升网络传输效率。
  • 高可靠性:通过重试机制和 acks=all 确保消息不丢失。

2. 流程


关键点总结

  1. 分区选择:优先使用 Key 哈希或粘性分区策略,保证消息顺序性和吞吐量。
  2. 批次优化:通过 batch.sizelinger.ms 平衡延迟与吞吐。
  3. 可靠性保障:通过 acksretries 配置确保消息持久化。
  4. 异步处理:主线程与 Sender 线程解耦,避免阻塞用户逻辑。

重要参数

以下是 Kafka 生产者(Producer)在日常开发中的 常见配置参数 及其作用,按功能分类整理成表格:


一、核心必填参数

参数名

默认值

说明

bootstrap.servers

Kafka 集群地址列表(逗号分隔,如 host1:9092,host2:9092

)。

key.serializer

Key 的序列化类(如 org.apache.kafka.common.serialization.StringSerializer

)。

value.serializer

Value 的序列化类(同上)。


二、可靠性相关参数

参数名

默认值

说明

acks

1

消息持久化确认机制:

0:不等待确认(可能丢失数据)。 1:等待 Leader 确认(默认)。all:等待所有 ISR 副本确认(最高可靠性)。

retries

0

发送失败后的重试次数(建议设为 Integer.MAX_VALUE

配合 delivery.timeout.ms

)。

enable.idempotence

false

是否启用幂等性(true时保证消息不重复,需配合 acks=all

retries>0)。

max.in.flight.requests.per.connection

5

单个 Broker 的未确认请求数。若启用幂等性,建议设为 1

以保证顺序。


三、性能优化参数

参数名

默认值

说明

linger.ms

0

消息在缓冲区等待时间(毫秒),增大可提升吞吐量(但增加延迟)。

batch.size

16384

(16KB)

单个批次的大小阈值,达到阈值后立即发送。

buffer.memory

33554432

(32MB)

生产者缓冲区的总内存大小。

compression.type

none

消息压缩算法(gzip

snappy

lz4

zstd

),减少网络带宽占用。


四、高级配置

参数名

默认值

说明

request.timeout.ms

30000

(30秒)

生产者等待 Broker 响应的超时时间。

max.block.ms

60000

(60秒)

生产者缓冲区满或元数据不可用时的阻塞时间(超时抛异常)。

partitioner.class

默认轮询/哈希策略

自定义分区策略(实现 Partitioner

接口)。


五、安全性配置(可选)

参数名

默认值

说明

security.protocol

PLAINTEXT

安全协议(如 SSL

SASL_SSL

)。

ssl.keystore.location

SSL 证书路径(客户端认证时需配置)。

sasl.mechanism

SASL 认证机制(如 PLAIN

SCRAM-SHA-256

)。


六、错误处理与监控

参数名

默认值

说明

interceptor.classes

生产者拦截器(实现 ProducerInterceptor

接口),用于监控或修改消息。

metrics.sample.window.ms

30000

(30秒)

性能指标采样窗口时间。


典型配置示例

Properties props = new Properties();
props.put("bootstrap.servers", "kafka1:9092,kafka2:9092");
props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
props.put("acks", "all");
props.put("retries", 10);
props.put("linger.ms", 20);
props.put("batch.size", 32768);
props.put("compression.type", "snappy");
props.put("enable.idempotence", "true");

关键注意事项

  1. 可靠性 vs 性能
    • acks=allenable.idempotence=true 提高可靠性,但可能降低吞吐量。
    • 增大 batch.sizelinger.ms 可提升吞吐量,但增加延迟。
  1. 幂等性限制
    • 需 Kafka 0.11+ 版本支持,且 max.in.flight.requests=1(或 Kafka 2.0+ 允许 5)。
  1. 监控与调优
    • 通过 metrics 和拦截器监控生产者性能,动态调整参数

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/2299449.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

Unity 获取独立显卡数量

获取独立显卡数量 导入插件包打开Demo 运行看控制台日志 public class GetGraphicCountDemo : MonoBehaviour{public int count;// Start is called before the first frame updatevoid Start(){count this.GetIndependentGraphicsDeviceCount();}}

Deepseek R1模型本地化部署+API接口调用详细教程:释放AI生产力

文章目录 前言一、deepseek R1模型与chatGPT o1系列模型对比二、本地部署步骤1.安装ollama2部署DeepSeek R1模型删除已存在模型,以7b模型为例 三、DeepSeek API接口调用Cline配置 前言 随着最近人工智能 DeepSeek 的爆火,越来越多的技术大佬们开始关注如…

Mac ARM 架构的命令行(终端)中,删除整行的快捷键是:Ctrl + U

在 Mac ARM 架构的命令行(终端)中,删除整行的快捷键是: Ctrl U这个快捷键会删除光标所在位置到行首之间的所有内容。如果你想删除光标后面的所有内容,可以使用: Ctrl K这两个快捷键可以帮助你快速清除当…

用pytorch实现一个简单的图片预测类别

前言: 在阅读本文之前,你需要了解Python,Pytorch,神经网络的一些基础知识,比如什么是数据集,什么是张量,什么是神经网络,如何简单使用tensorboard,DataLoader。 本次模型训练使用的是…

深度学习框架探秘|TensorFlow:AI 世界的万能钥匙

在人工智能(AI)蓬勃发展的时代,各种强大的工具和框架如雨后春笋般涌现,而 TensorFlow 无疑是其中最耀眼的明星之一。它不仅被广泛应用于学术界的前沿研究,更是工业界实现 AI 落地的关键技术。今天,就让我们…

Linux 服务器部署deepseek

把手教你在linux服务器部署deepseek,打造专属自己的数据库知识库 正文开始 第一步:安装Ollama 打开官方网址:https://ollama.com/download/linux 下载Ollama linux版本 复制命令到linux操作系统执行 [rootpostgresql ~]# curl -fsSL http…

DeepSeek、Kimi、文心一言、通义千问:AI 大语言模型的对比分析

在人工智能领域,DeepSeek、Kimi、文心一言和通义千问作为国内领先的 AI 大语言模型,各自展现出了独特的特点和优势。本文将从技术基础、应用场景、用户体验和价格与性价比等方面对这四个模型进行对比分析,帮助您更好地了解它们的特点和优势。…

CSDN、markdown环境下如何插入各种图(流程图,时序图,甘特图)

流程图 横向流程图 mermaid graph LRA[方形] --> B{条件a}B -->|满足| C(圆角)B -->|不满足| D(圆角)C --> E[输出结果1]D --> E效果图: 竖向流程图 mermaid graph TDC{条件a} --> |a1| A[方形]C --> |a2| F[竖向流程图]A --> B(圆角)B …

unity学习40:导入模型的 Animations文件夹内容,动画属性和修改动画文件

目录 1 Animations文件夹内容 2 每个模型文件的4个标签 3 model 4 rig 动画类型 5 Animation 5.1 新增动画和修改动画 5.2 限制动画某个轴的变化,烘焙 6 material 材料 1 Animations文件夹内容 下面有很多文件夹每个文件夹都是不同的动作模型每个文件夹下…

web第三次作业

弹窗案例 1.首页代码 <!DOCTYPE html><html lang"en"><head><meta charset"UTF-8"><meta name"viewport" content"widthdevice-width, initial-scale1.0"><title>综合案例</title><st…

GMSL 实例1:当 MAX96717 遇上 MAX96724,打通 Camera 视频数据传输

新年伊始&#xff0c;继 Deepseek 在 AI 圈掀起风波之后。比亚迪在2月10日发布会上重磅官宣&#xff1a;全系车型将搭载自研的高阶智驾系统“天神之眼”&#xff0c;覆盖从10万元级入门车型到高端豪华车型的所有范围。此举如一颗重磅炸弹投向当前一卷再卷的新能源汽车赛道&…

DeepSeek 助力 Vue 开发:打造丝滑的侧边栏(Sidebar)

前言&#xff1a;哈喽&#xff0c;大家好&#xff0c;今天给大家分享一篇文章&#xff01;并提供具体代码帮助大家深入理解&#xff0c;彻底掌握&#xff01;创作不易&#xff0c;如果能帮助到大家或者给大家一些灵感和启发&#xff0c;欢迎收藏关注哦 &#x1f495; 目录 Deep…

基于opencv的 24色卡IQA评测算法源码-可完全替代Imatest

1.概要 利用24色卡可以很快的分析到曝光误差&#xff0c;白平衡误差&#xff0c;噪声&#xff0c;色差&#xff0c;饱和度&#xff0c;gamma值。IQA或tuning工程一般用Imatest来手动计算&#xff0c;不便于产测部署&#xff0c;现利用opencv实现了imatest的全部功能&#xff0c…

数据结构与算法之排序算法-(计数,桶,基数排序)

排序算法是数据结构与算法中最基本的算法之一&#xff0c;其作用就是将一些可以比较大小的数据进行有规律的排序&#xff0c;而想要实现这种排序就拥有很多种方法~ &#x1f4da; 非线性时间比较类&#xff1a; 那么我将通过几篇文章&#xff0c;将排序算法中各种算法细化的&a…

MATLAB图像处理:图像特征概念及提取方法HOG、SIFT

图像特征是计算机视觉中用于描述图像内容的关键信息&#xff0c;其提取质量直接影响后续的目标检测、分类和匹配等任务性能。本文将系统解析 全局与局部特征的核心概念&#xff0c;深入讲解 HOG&#xff08;方向梯度直方图&#xff09;与SIFT&#xff08;尺度不变特征变换&…

kibana es 语法记录 elaticsearch

目录 一、认识elaticsearch 1、什么是正向索引 2、什么是倒排索引 二、概念 1、说明 2、mysql和es的对比 三、mapping属性 1、定义 四、CRUD 1、查看es中有哪些索引库 2、创建索引库 3、修改索引库 4、删除索引库 5、新增文档 6、删除文档 5、条件查询 一、认识…

手写一个Java Android Binder服务及源码分析

手写一个Java Android Binder服务及源码分析 前言一、Java语言编写自己的Binder服务Demo1. binder服务demo功能介绍2. binder服务demo代码结构图3. binder服务demo代码实现3.1 IHelloService.aidl3.2 IHelloService.java&#xff08;自动生成&#xff09;3.3 HelloService.java…

【动态规划篇】:当回文串遇上动态规划--如何用二维DP“折叠”字符串?

✨感谢您阅读本篇文章&#xff0c;文章内容是个人学习笔记的整理&#xff0c;如果哪里有误的话还请您指正噢✨ ✨ 个人主页&#xff1a;余辉zmh–CSDN博客 ✨ 文章所属专栏&#xff1a;动态规划篇–CSDN博客 文章目录 一.回文串类DP核心思想&#xff08;判断所有子串是否是回文…

Windows 安装 GDAL 并配置 Rust-GDAL 开发环境-1

Rust-GDAL 是 Rust 语言的 GDAL&#xff08;Geospatial Data Abstraction Library&#xff09; 绑定库&#xff0c;用于处理地理数据。由于 GDAL 依赖较多&#xff0c;在 Windows 上的安装相对复杂&#xff0c;本文档将介绍如何安装 GDAL 并配置 Rust-GDAL 的开发环境。 1. 检…

第1期 定时器实现非阻塞式程序 按键控制LED闪烁模式

第1期 定时器实现非阻塞式程序 按键控制LED闪烁模式 解决按键扫描&#xff0c;松手检测时阻塞的问题实现LED闪烁的非阻塞总结补充&#xff08;为什么不会阻塞&#xff09; 参考江协科技 KEY1和KEY2两者独立控制互不影响 阻塞&#xff1a;如果按下按键不松手&#xff0c;程序就…