Kafka_02_Producer详解

news2024/9/27 15:23:22

Kafka_02_Producer详解

  • Producer
    • ProducerRecord
    • Send&Close
    • 实现原理
      • ProducerInterceptor
      • Serializer
      • Partitioner
    • 事务

Producer

Producer(生产者): 生产并发送消息到Broker(推送)

  1. Producer是多线程安全的(建议通过池化以提高性能)
  2. Producer实例后可发送多条消息(可对应多个ProducerRecord)

// 0.9之后的版本是基于Java实现(之前是Scala实现)


Producer客户端发送消息大致逻辑:

  1. 配置Producer客户端参数并创建该Producer实例
  2. 构建需发送的消息
  3. 发送构建的消息
  4. 关闭实例

构造Producer必填的3个参数:

参数说明
bootstrap.servers引导程序的服务地址
格式: 地址1:端口1,地址N:端口N
(建议指定两个以上的Broker地址以保证稳定性, 且使用主机名形式)
key.serializer发送时对Key调用的序列化器
Broker仅能接受字节数组形式的消息byte[]
value.serializer发送时对Value调用的序列化器
Broker仅能接受字节数组形式的消息byte[]

// 序列化器必须以全限定名方式指定, Java的ProducerConfig类中包含所有的配置参数


ProducerRecord

ProducerRecord(构建消息): Producer每次发送的消息体

  1. ProducerRecord由多个属性构成(Topic和消息是基础属性)
  2. ProducerRecord有多个构造方法(指定属性的个数)
  3. 可根据不同需求创建特定ProducerRecord

ProducerRecord定义:

public class ProducerRecord<K, V> {
    private final String topic;      // Topic(必填)
    private final Integer partition; // Partition

    // 消息头部(0.11版本引入)
    // 指定与应用相关信息(可忽略)
    private final Headers headers;

    // 键(附加信息)
    // 其会用于计算Partition(二次归类)
    private final K key;

    // 值(消息体, 必填)
    // 为空则代表: 墓碑消息
    private final V value;

    // 消息时间戳
    // 细分为CreateTime(消息创建时间)和LogAppendTime(追加日志时间)
    private final Long timestamp;
    ......
}

Send&Close

Send(发送消息): Producer构建ProducerRecord之后发送给Broker

  1. 发送模式: 发后既忘(fire-and-forget)、同步(sync)、异步(async)
  2. 发送模式默认为异步(可通过获取返回值的方法以阻塞等待实现同步)
  3. 返回值通常为发送消息的元数据(Topic、Partition、偏移量和时间戳等)

Send()方法的定义:

public Future<RecordMetadata> send(ProducerRecord<K, V> record);

public Future<RecordMetadata> send(ProducerRecord<K, V> record, Callback callback);
  1. 可通过Future的get()方法阻塞实现同步(返回RecordMetadata对象)
  2. Send()方法需配合try/catch(发送成功或发生异常)
  3. 发送导致的异常分为: 重试异常、不可重试异常

// 不可重试异常发生时会直接抛出并结束


常见的重试异常为:

可重试异常说明
NewworkException网络异常
LeaderNotAvailableException副本的leader不可用
(可能正在选举leader)
UnknownTopicOrPartitionExceptionTopic或Partition异常
NotEnoughReplicasException副本数量不足
NotCoordinatorException协调器异常

Send()方法中的Callback定义:

public interface Callback {
    void onCompletion(RecordMetadata var1, Exception var2);
}
  1. var1和var2参数互斥(两者必有个为null,后者代表异常)
  2. 若两个消息对相同Partition发送消息, 则按发送顺序调用Callback

Close(结束发送):回收Producer实例

  1. 发送结束后务必回收Producer实例(防止资源泄漏)
  2. Close默认会阻塞等待之前所有的发送请求完成之后再回收
  3. 可指定关闭的超时时间(超出该事件则强行回收, 不建议指定)

Close()方法的定义:

public void close();

public void close(long timeout, TimeUnit timeUnit);

实现原理

Producer的发送消息由两个线程完成:

  1. 主线程: 构建并处理消息后发送至RecordAccumulator
  2. Sender线程: 从RecordAccumulator获取消息, 并发送至Broker

如: Producer发送消息链路图

image

  1. RecordAccumulator: 双端队列缓存待发送ProducerBatch以减少网络影响
  2. ProducerBatch: 包含任意多个待发送的ProducerRecord(消息批次)
  3. Request: Kafka支持的各种请求协议
  4. InFlightRequests: 缓存已发送但未响应的Request

// Interceptor和Partitioner可选择性处理, 但必须经Serializer处理


Producer发送ProducerRecord的流程:

  1. 主线程将ProducerRecord加工处理后发送至RecordAccumulator尾部
  2. RecordAccumulator根据ProducerRecord分区选择对应的ProducerBatch
  3. RecordAccumulator根据内存复用原则和ProducerBatch大小决定是否新建
  4. Sender线程从RecordAccumulator头部获取ProducerBatch
  5. <分区, <Deque<ProducerBatch>>形式变为<Node, List<ProducerBatch>>
  6. 再根据各种协议请求转换为<Node, Request>形式
  7. 发送前以Map<nodeId, Deque<Request>>缓存Request
  8. 返回发送后的响应并清理InFlightRequests和RecordAccumulator

// 形式转换是为完成应用逻辑层到网络I/O层的转换


RecordAccumulator内存复用原则:

  1. RecordAccumulator通过java.io.ByteBufferBufferPool实现内存复用
  2. 若内存申请不超过指定大小, 则申请指定大小并放置于BufferPool
  3. 若内存申请超过指定大小, 则申请该内存并再使用后直接释放

// BufferPool可避免频繁的申请和释放内存


InFlightRequest中包含leastLoadedNode

  1. leastLoadedNode: 负载最小的Broker(未确认请求最少的)
  2. leastLoadedNode常用于元数据请求和Consumer组播协议的交互
  3. leastLoadedNode由Sender线程根据指定过期时间维护(主线程也可访问)

// 元数据: Broker、Topic、Partition、leader和follower副本所在的Broker等


如: Sender线程维护leatLoadedNode信息

  1. Sender线程检查元数据是否过期(默认5m)
  2. 超出则挑出leastLoadedNode, 向该Broker发送MetadataRequest请求
  3. 获取结果后将其结果存入InFlightRequests中, 并更新元数据的过期时间

ProducerInterceptor

ProducerInterceptor(拦截器): 消息发送前/后的进行的操作

  1. 不建议通过ProducerInterceptor修改topic、key和partition
  2. 可指定多个ProducerInterceptor(拦截链按配置时顺序执行)
  3. 可通过interceptor.classes参数指定Producer所使用的ProducerInterceptor

ProducerInterceptor定义:

public interface ProducerInterceptor<K, V> extends Configurable {
    // 发送前进行的操作
    public ProducerRecord<K, V> onSend(ProducerRecord<K, V> record);

    // 发送后被应答之后或失败进行的操作
    // 优先于Send()方法中定义的Callback前执行
    // 由于该方法运行于Producer的IO线程中, 应简洁
    public void onAcknowledgement(RecordMetadata metadata, Exception exception);

    // 关闭拦截器
    public void close();
}

// 抛出的任何异常都会被记录到日志中, 并不再向上抛


Serializer

Serializer(序列化器): 将特定数据转换成字节数组(byte[])

  1. Broker仅能接受字节数组形式的数据(接收后会对其反序列化)
  2. Producer使用的Serializer需和Consumer使用的反序列化器需对应
  3. Producer指定Serializer时, 需通过全限定名方式指定(类的完整路径)

Serializer定义:

public interface Serializer<T> extends Closeable {
    // 配置序列化器
    // 常用于指定编码类型(默认UTF-8)
    void configure(Map<String, ?> configs, boolean isKey);

    // 执行序列化
    byte[] serialize(String topic, T data);

    // 关闭序列化器
    // 需保证幂等性
    void close();
}

// 不建议使用自定义Serializer或DeSerializer, 会增加耦合度


Partitioner

Partitioner(分区器): ProducerRecord分区的默认规则

  1. ProducerRecord中指定partition字段, 则略过Partitioner
  2. Partitioner的分区计算受Topic数量的影响(已分配的不受)
  3. 可通过partitioner.class参数指定Producer所使用的Partitioner

Partitioner定义:

public interface Partitioner extends Configurable, Closeable {
    // 计算并返回分区号
    public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster);

    // 关闭分区器
    public void close();
}

public interface Configurable {
    // 获取配置信息并初始化数据
    void configure(Map<String, ?> configs);
}

默认的Partitioner: org.apache.kafka.clients.producer.internals.DefaultPartitioner

  1. close()方法默认为空
  2. 消息为null时, 则以轮询的方式分配可用的分区号
  3. 消息不为null时, 则进行Hash计算(MurmurHash2算法)

// 消息相同的情况下会写入相同的分区(存在消息互相覆盖的情况)


事务

事务(Transaction): Producer操作的最小原子单位(可跨Partition)

  1. 开启事务时, 必须也需开启幂等性(enable.idempotence)
  2. 开启事务时必须指定事务ID(若事务ID重复, 将结束被覆盖的事务并抛出异常)
  3. 只能使事务处于以下两种状态(否则将抛出异常): COMMIT、ABORT
  4. 事务开启后需关闭自动位移提交, 也不能位移消费

Producer中常用的事务方法:

// 初始化事务
void initTransactions();

// 开启事务
void beginTransaction();

// 事务内的位移提交
void sendOffsetsToTransaction(Map<TopicPartition, OffsetAndMetadata> offsets, String consumerGroupId)

// 提交事务
void commitTransaction();

// 终止事务(回滚)
void abortTransaction();

事务协调器(TransactionCoordinator): 负责事务中的各类操作

  1. 每个Producer都对应个事务协调器, 由其负责Producer中各类请求
  2. 事务协调器会将事务的信息都存储至内部Toipc的__transaction_state

如: 事务的执行流程

image

  1. 查找事务协调器: 找到事务协调器所在的Broker并建立连接(同时查找Partition)
  2. 获取PID: 通过InitProducerIdRequest请求获取该事务ID
  3. 执行事务: 通过各类请求处理Record并将数据存储至内部Topic
  4. 结束事务: 发送各类请求结束事务, 同时将事务信息存储至内部Topic和日志文件

Consumer的事务受以下限制:

  1. 采用日志压缩策略的Topic, 其Record可能被覆盖
  2. Consumer在消费时可能没有分配到事务内的所有Partition
  3. Record可能分布在Partition的多个LogSegment, 存在部分被清除的可能
  4. Consumer可通过位移提交/位移消费访问Record, 可能导致遗漏事务中的Record

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/1364127.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

【Docker】配置阿里云镜像加速器

默认情况下&#xff0c;将来从docker hub &#xff08;https://hub.docker.com )上下载镜像太慢&#xff0c;所以一般配置镜像加速器。 没有账号的注册一个账号并登录 登录之后点击控制台 查看 cat /etc/docker/daemon.json

大学物理实验重点——电路暂态过程

RC串联电路的暂态过程&#xff1a; RLC串联电路的暂态过程&#xff1a; 三种解&#xff1a; 半衰期测量法测量时间常数&#xff1a; 测振荡周期&#xff1a; 不断增大电阻直至振荡凸起峰刚好消失&#xff0c;此时即为临界阻尼状态&#xff0c;记下电阻箱的阻值R&#xff0c;…

【微服务】springcloud集成skywalking实现全链路追踪

目录 一、前言 二、环境准备 2.1 软件环境 2.2 微服务模块 2.3 环境搭建 2.3.1 下载安装包 2.3.2 解压并启动服务 2.3.3 访问web界面 三、搭建springcloud微服务 3.1 顶层公共依赖 3.2 用户服务模块 3.2.1 准备测试使用数据库 3.2.2 添加依赖 3.2.3 添加配置文件 …

HDU 1205:吃糖果 ← 鸽巢原理

【题目来源】http://acm.hdu.edu.cn/showproblem.php?pid1205【题目描述】 HOHO&#xff0c;终于从Speakless手上赢走了所有的糖果&#xff0c;是Gardon吃糖果时有个特殊的癖好&#xff0c;就是不喜欢将一样的糖果放在一起吃&#xff0c;喜欢先吃一种&#xff0c;下一次吃另一…

cgo环境之-安装gcc mingw

下载 到官网下载&#xff1a; 官网 如果你是Windows arm 芯片&#xff0c;可以到这里下载 https://github.com/mstorsjo/llvm-mingw/releases

10款有趣的前端源码分享(附效果图及在线演示)

分享10款非常有趣的前端特效源码 其中包含css动画特效、js原生特效、svg特效以及小游戏等 下面我会给出特效样式图或演示效果图 但你也可以点击在线预览查看源码的最终展示效果及下载源码资源 自毁按钮动画特效 自毁按钮动画特效 点击打开盒子可以点击自毁按钮 进而会出现自毁…

【ESP32接入国产大模型之文心一言】

1. 怎样接入文心一言 视频讲解&#xff1a; 【ESP32接入国产大模型之文心一言】 随着人工智能技术的不断发展&#xff0c;自然语言处理领域也得到了广泛的关注和应用。在这个领域中&#xff0c;文心一言作为一款强大的自然语言处理工具&#xff0c;具有许多重要的应用价值。本…

数据结构——栈(Stack)

目录 1.栈的介绍 2.栈工程 2.1 栈的定义 2.1.1 单链表实现栈 2.1.2 数组实现栈 2.1.2.1 静态数组栈 2.1.2.2 动态数组栈 2.2 栈的函数接口 2.2.1 栈的初始化 2.2.2 栈的数据插入&#xff08;入栈&#xff09; 2.2.3 栈的数据删除&#xff08;出栈&#xff09; 2.2.…

【docker】centos 使用 Nexus Repository 搭建私有仓库

Nexus Repository 是一种流行的软件仓库管理工具&#xff0c;它可以帮助您搭建私有仓库&#xff0c;以便在内部网络或私有云环境中存储、管理和分发各种软件包和组件。 它常被用于搭建Maven的镜像仓库。本文演示如何用Nexus Repository搭建docker 私有仓库。 使用Nexus Repos…

【InternLM】Lagent智能体工具调用实践浦语·灵笔(InternLM-XComposer)图文理解创作Demo练习

目录 前言一、Lagent智能体工具1-1、什么是智能体&#xff1f;1-2、Lagent智能体 二、InternLM-XComposer&#xff08;图文理解创作模型介绍&#xff09;三、Lagent调用实践3-0、环境搭建3-1、创建虚拟环境3-2、导入所需要的包3-3、模型下载3-4、Lagent安装3-5、demo运行 四、I…

阿里云服务器固定带宽下载和上传速度对照表

阿里云服务器公网带宽上传和下载速度对照表&#xff0c;1M带宽下载速度是128KB/秒&#xff0c;为什么不是1M/秒&#xff1f;阿腾云atengyun.com分享阿里云服务器带宽1M、2M、3M、5M、6M、10M、20M、30M、50M、100M及200M等公网带宽下载和上传速度对照表&#xff0c;附带宽价格表…

性能测试之(九):JMeter关联

关联&#xff1a;当请求之间有依赖关系&#xff0c;比如下一个请求的入参是上一个请求返回的数据&#xff0c;这需要进行关联处理&#xff1b; 关联场景1&#xff1a;登录之后返回token&#xff0c;后续的请求需要带token; 常用的关联方法&#xff1a;&#xff08;在后置处理器…

[算法应用]dijkstra算法的应用

先看一眼原始dijkstra算法&#xff0c;参考自dijkstra算法C实现_c实现djikstra-CSDN博客 分为三步 找到当前最优的把当前最优的&#xff0c;不参与后面的更新逐个比较是否更新 dijkstra算法的应用 题目大概是要从图上找一条权值不减的路径&#xff0c;且要经过最多的点。 所以…

odoo17 | 模型之间的内联视图

前言 从商业角度来看&#xff0c;我们的房地产模块现在是有意义的。我们创建了特定的视图&#xff0c;添加了几个操作按钮和约束。然而&#xff0c;我们的用户界面仍然有点粗糙。我们想为列表视图添加一些颜色&#xff0c;并使一些字段和按钮有条件地消失。例如&#xff0c;当…

STM32F407ZGT6时钟源配置

1、26M外部时钟源 1、25M外部时钟源

四种方式实现[选择性注入SpringBoot接口的多实现类]

最近在项目中遇到两种情况&#xff0c;准备写个博客记录一下。 情况说明&#xff1a;Service层一个接口是否可以存在多个具体实现&#xff0c;此时应该如何调用Service&#xff08;的具体实现&#xff09;&#xff1f; 其实之前的项目中也遇到过这种情况&#xff0c;只不过我采…

【linux应用开发】进程通信总结——使用管道、消息队列、共享内存、信号量实现l进程通信的详细教程

文章目录 简介无名管道有名管道IPC key标识消息队列共享内存信号量 简介 进程间通信&#xff08;IPC, Inter-Process Communication&#xff09;是指在操作系统中&#xff0c;不同进程之间交换数据、信息和命令的过程。在一个多任务的操作系统中&#xff0c;多个进程可以同时运…

Python和Java环境搭建

小白搭建全流程 首先不建议装在C盘&#xff0c;一旦重置电脑&#xff0c;之前安装第三方包需要重新安装 relolver :解释器 1、Python解释器安装 资源包&#xff1a; 1、 python -version java -version–用于查看是否安装 where python whrer java–用于查看安装的位置【非常…

ARTrack 阅读记录

目录 环境配置与脚本编写 前向传播过程 网络结构 环境配置与脚本编写 按照官网执行并没有顺利完成&#xff0c;将yaml文件中的 pip 项 手动安装的 conda create -n artrack python3.9 # 启动该环境&#xff0c;并跳转到项目主目录路径下 astor0.8.1 configparser5.2.0 data…

(2023|NIPS,MUSE,掩蔽适配器,基于反馈的迭代训练)StyleDrop:任意风格的文本到图像生成

StyleDrop: Text-to-Image Generation in Any Style 公和众和号&#xff1a;EDPJ&#xff08;添加 VX&#xff1a;CV_EDPJ 或直接进 Q 交流群&#xff1a;922230617 获取资料&#xff09; 目录 0. 摘要 3. StyleDrop&#xff1a;文本到图像合成的风格调整 3.1 基础&#x…