Kafka生产者消息发送流程原理及源码分析

news2025/1/12 10:10:00

Kafka是一个分布式流处理平台,它能够以极高的吞吐量处理数据。在Kafka中,生产者负责将消息发送到Kafka集群,而消费者则负责从Kafka集群中读取消息。本文将探讨Kafka生产者消息发送流程的细节,包括消息的序列化、分区分配、记录提交等关键步骤。

先看一个生产者发送消息的代码样例

public class MyProducer1 {
    public static void main(String[] args) throws ExecutionException, InterruptedException {

        Map<String, Object> configs = new HashMap<>();
        // 指定初始连接用到的broker地址
        configs.put("bootstrap.servers", "node164:9092");
        // 指定key的序列化类
        configs.put("key.serializer", IntegerSerializer.class);
        // 指定value的序列化类
        configs.put("value.serializer", StringSerializer.class);
        //borker集群消息持久化控制
        configs.put("acks", "all");
        //重试次数
        configs.put("reties", "3");
        KafkaProducer<Integer, String> producer = new KafkaProducer<Integer, String>(configs);
        
        // 用于设置用户自定义的消息头字段
        List<Header> headers = new ArrayList<>();
        headers.add(new RecordHeader("biz.name", "producer.demo".getBytes()));

        ProducerRecord<Integer, String> record = new ProducerRecord<Integer, String>(
                "test_topic",
                0,
                0,
                "hello world 0",
                headers
        );

        // 消息异步确认
        producer.send(record, new Callback() {
            @Override
            public void onCompletion(RecordMetadata metadata, Exception exception) {
                if (exception == null) {
                    System.out.println("消息的主题:" + metadata.topic());
                    System.out.println("消息的分区号:" + metadata.partition());
                    System.out.println("消息的偏移量:" + metadata.offset());
                } else {
                    System.out.println("异常消息:" + exception.getMessage());
                }
            }
        });

        // 关闭生产者
        producer.close();
    }
}

通过跟踪producer.send源码可知生产者发送消息的大体流程如下图,RecordAccumulator的消息发送到brokers实际上由Sender线程处理,下图暂时忽略,先看producer主线程处理的一些细节。

  • KafkaProducer构造函数根据客户端参数初始化拦截器、序列化器、分区器,创建Sender守护线程。
  • 调用send函数发送消息时,其内部使用异步消息发送,消息经过拦截器、序列化器、分区器后缓存到缓冲区。
  • 批次发送的条件为:缓冲区数据⼤⼩达到batch.size或者linger.ms达到上限。
  • 缓冲区消息发送到指定分区,落盘到broker。如果发送失败,客户端将根据设置的重试参数进行重试,如果超过了重试次数,抛出异常。
  • 发送成功,返回RecordMetadata元数据到客户端。如果是同步调用将阻塞等待元数据返回,如果是异步调用将通过Callback接口进行回调返回元数据

生产者拦截器

KafkaProducer调用send方法后,如果有设置拦截器,会先经过拦截器,默认是不会经过任何拦截器的,除非客户端配置了拦截器(interceptor.classes参数),send函数如下

    @Override
    public Future<RecordMetadata> send(ProducerRecord<K, V> record, Callback callback) {
        // intercept the record, which can be potentially modified; this method does not throw exceptions
        ProducerRecord<K, V> interceptedRecord = this.interceptors == null ? record : this.interceptors.onSend(record);
        return doSend(interceptedRecord, callback);
    }

可见,拦截器列表会被首先执行,而拦截器的初始化则是在KafkaProducer的 构造函数中,部分源码如下

List<ProducerInterceptor<K, V>> interceptorList = (List) (new ProducerConfig(userProvidedConfigs, false)).getConfiguredInstances(ProducerConfig.INTERCEPTOR_CLASSES_CONFIG,
                    ProducerInterceptor.class);
this.interceptors = interceptorList.isEmpty() ? null : new ProducerInterceptors<>(interceptorList);

可见,拦截器是通过客户端配置的ProducerConfig.INTERCEPTOR_CLASSES_CONFIG来初始化的,拦截器必须实现ProducerInterceptor接口。

public interface ProducerInterceptor<K, V> extends Configurable {

    public ProducerRecord<K, V> onSend(ProducerRecord<K, V> record);


    public void onAcknowledgement(RecordMetadata metadata, Exception exception);


    public void close();
}

拦截器接口共有三个接口,第一个onSend接口把ProducerRecord直接传了进来,我们可以在实现接口时,对原消息进行统一处理,比如添加一些业务相关的头部信息等。onAcknowledgement接口则可以在确认消息发送成功后做一些操作,最后close接口则可以在拦截器关闭时清理一些资源。

如需要自定义拦截器则直接实现ProducerInterceptor接口,实现相关方法,在客户端进行配置即可,客户端配置示例:

 // 如果有多个拦截器,则设置为多个拦截器类的全限定类名,中间用逗号隔开
  configs.put(ProducerConfig.INTERCEPTOR_CLASSES_CONFIG, "com.xxx.CustomInterceptorOne,com.xxx.CustomInterceptorTwo");

生产者序列化器

拦截器处理完后,将进入到doSend方法,在发送消息前,首先会根据客户端配置的序列化器对key和value进行序列化。

序列化接口如下:

public interface Serializer<T> extends Closeable {

    /**
     * Configure this class.
     * @param configs configs in key/value pairs
     * @param isKey whether is for key or value
     */
    void configure(Map<String, ?> configs, boolean isKey);

    /**
     * Convert {@code data} into a byte array.
     *
     * @param topic topic associated with data
     * @param data typed data
     * @return serialized bytes
     */
    byte[] serialize(String topic, T data);

    /**
     * Close this serializer.
     *
     * This method must be idempotent as it may be called multiple times.
     */
    @Override
    void close();
}

在Kafka中,消息可以是任何类型的数据,如字符串、JSON、二进制数据等。为了将这些数据存储到Kafka集群中,Kafka需要对它们进行序列化。Kafka提供了多种序列化器,如StringSerializer、JsonSerializer等。生产者可以根据自己的需求选择合适的序列化器来序列化消息。如果默认提供的序列化器仍未满足需求,实现上面的Serializer接口,然后在客户端配置自己的序列化器即可。通过接口可以看出,序列化器最终将key和value序列化成字节数组。

doSend方法使用序列化器的部分源码:

byte[] serializedKey;
            try {
                serializedKey = keySerializer.serialize(record.topic(), record.headers(), record.key());
            } catch

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/1813621.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

【五】Linux软件仓库Yum源--SSH远程控制--SCP远程传输

RPM&#xff08;红帽软件包管理器&#xff09; RPM建立统一的数据库文件&#xff0c;记录软件信息并分析依赖关系。目前RPM的优势已经被公众所认可&#xff0c;使用范围也已不局限在红帽系统中了。常见RPM命令如下&#xff1a; 安装软件 rpm -ivh file…

数据结构(DS)学习笔记(二):数据类型与抽象数据类型

参考教材&#xff1a;数据结构C语言版&#xff08;严蔚敏&#xff0c;杨伟民编著&#xff09; 工具&#xff1a;XMind、幕布、公式编译器 正在备考&#xff0c;结合自身空闲时间&#xff0c;不定时更新&#xff0c;会在里面加入一些真题帮助理解数据结构 目录 1.1数据…

【DevOps】 什么是容器 - 一种全新的软件部署方式

目录 引言 一、什么是容器 二、容器的工作原理 三、容器的主要特性 四、容器技术带来的变革 五、容器技术的主要应用场景 六、容器技术的主要挑战 七、容器技术的发展趋势 引言 在过去的几十年里,软件行业经历了飞速的发展。从最初的大型机时代,到后来的个人电脑时代,…

neo4j 3.5.5版本创建新的数据库

neo4j 3.5.5版本创建新的数据库 1.找到neo4j的conf文件 点进去 2.点击neo4j.conf 选择记事本打开 3.把graph.db换成自己想要创建的数据库名称 4.打开neo4j服务 出现新的数据库

信息系统项目管理师0151:输出(9项目范围管理—9.4收集需求—9.4.3输出)

点击查看专栏目录 文章目录 9.4.3 输出9.4.3 输出 需求文件 需求文件描述各种单一需求将如何满足项目相关的业务需求。一开始可能只有高层级的需求,然后随着有关需求信息的增加而逐步细化。只有明确的(可测量和可测试的)、可跟踪的、完整的、相互协调的,且主要干系人愿意认…

FreeRTOS学习笔记-基于stm32(14)内存管理

一、FreeRTOS 内存管理简介 FreeRTOS有两种方法来创建任务&#xff0c;队列&#xff0c;信号量等&#xff0c;一种动态一种静态。静态方法需要手动定义任务堆栈。使用动态内存管理的时候 FreeRTOS 内核在创建任务、队列、信号量的时候会动态的申请 RAM。 我们在移植FreeRTOS时可…

采用java+springboot+vue+uniapp自主研发的智慧城管源码,城管综合执法平台源代码

智慧城管执法平台源码&#xff0c;PCAPP端全套源码&#xff0c;城管综合执法系统源码。 智慧城管系统拥有自主版权&#xff0c;项目落地案例&#xff0c;有演示&#xff0c;适合二次开发项目使用。 智慧城管执法系统旨在提高城市管理效率&#xff0c;涵盖了城市管理中的很多业务…

RabbitMQ从入门到入土

同步与异步 同步调用 优势&#xff1a; 时效性强&#xff0c;等到结果后就返回 问题&#xff1a; 扩展性差 性能下降 级联失败问题 异步调用 优势&#xff1a; 耦合度低&#xff0c;扩展性强 无需等待&#xff0c;性能好 故障隔离&#xff0c;下游服务故障不影响上游 缓…

探索乡村振兴新模式:发挥科技创新在乡村振兴中的引领作用,构建智慧农业体系,助力美丽乡村建设

随着科技的不断进步&#xff0c;乡村振兴工作正迎来前所未有的发展机遇。科技创新作为推动社会发展的重要力量&#xff0c;在乡村振兴中发挥着越来越重要的引领作用。本文旨在探讨如何发挥科技创新在乡村振兴中的引领作用&#xff0c;通过构建智慧农业体系&#xff0c;助力美丽…

汉语翻译藏语的软件,有3款宝藏软件!

在数字化飞速发展的今天&#xff0c;语言不再是沟通的障碍。对于想要学习藏语或需要与藏区人民交流的朋友们来说&#xff0c;一款优质的汉语翻译藏语的软件无疑是一大福音。那么&#xff0c;市面上究竟有哪些值得推荐的汉语翻译藏语的软件呢&#xff1f;接下来&#xff0c;就让…

PostgreSQL 快速入门与实战

1、概述 前面2篇博客给大家详细的介绍了PostgreSQL的安装和配置&#xff0c;本篇文章就带着大家一起学习一下PostgreSQL的用法&#xff0c;主要内容包括 基本的数据库操作、用户管理、数据备份、SCHEMA(模式)以及和MySQL的区别。 2、数据库基本操作 PostgreSQL是严格遵守SQL规…

C# Winform内嵌窗体(在主窗体上显示子窗体)

在开发Winform项目中&#xff0c;经常会要切换不同的窗体。通常程序都有一个主窗体&#xff0c;在切换窗体时往往需要关闭其他子窗体&#xff0c;这个实例就来介绍MDI主窗体内嵌子窗体的实现方法。 MDI主窗体要设置一个比较重要的属性&#xff0c;IsMdiContainertrue。子窗体的…

boost asio异步服务器(3)增加发送队列实现全双工通信

增加发送节点 构造发送节点&#xff0c;管理发送数据。发送节点的类如下。 这个发送节点用于保证发送和接收数据的有效性。 增加发送队列 前边实现的是一个简单的echo服务器&#xff0c;也就是服务器将收到的内容发送给对应的客户端。但是在实际的服务器设计中&#xff0c;服务…

苹果WWDC 2024 带来的 AI 风暴:从生产力工具到个人助理,AI 将如何融入我们的生活?

2024年6月5日&#xff0c;苹果WWDC 2024全球开发者大会如约而至&#xff0c;带来了众多令人兴奋的新功能和新产品。其中&#xff0c;AI 技术的全面融入无疑是最引人注目的亮点。从 iOS、iPadOS 到 macOS&#xff0c;再到 Siri 和开发者工具&#xff0c;苹果正在将 AI 融入到其生…

数字孪生技术推动希腊水务系统的技术进步

OpenFlows 提供的数字孪生技术将科扎尼供水渗漏的响应时间缩短了 50% 引领希腊供水管理改革 新冠疫情之后&#xff0c;希腊制定国家经济复苏计划&#xff0c;旨在推动能源改革、数字化和现代化&#xff0c;作为计划的一部分&#xff0c;希腊正试图实现可持续的给排水管理&…

Qt | openSSL将TCP数据进行不对称(RSA)加密传输-windows平台实操(可行)

01、windows平台工具准备 QtQt5.14.2openSSL下载(选择适合自己的版本即可)https://slproweb.com/products/Win32OpenSSL.htmlTCP调试助手调试助手02、简介 首先简单介绍一下openssl。接着描述如何在windo

龙芯+RT-Thread+LVGL实战笔记(36)——密码锁完善

【写在前面】不知不觉中,又临近学期末了。这个学期,因为一些特殊原因,一直没怎么更新本教程,而且不得已上调了本教程的价格,在此笔者深表歉意。另一方面,自己带的学生发挥不佳,很遗憾未能闯进国赛,为此笔者也郁闷了相当长一段时间。事已至此,也只能慢慢释然,来年再战…

AI网络爬虫:批量爬取AI导航网站Futurepedia数据

Futurepedia致力于使AI技术对各行各业的专业人士更加可理解和实用&#xff0c;提供全面的AI网站和工具目录、易于遵循的指南、每周新闻通讯和信息丰富的YouTube频道&#xff0c;简化AI在专业实践中的整合。如何把Futurepedia上的全部AI网站数据爬取下来呢&#xff1f; 网站一页…

[大模型]LLaMA3-8B-Instruct langchain 接入

环境准备 在 Autodl 平台中租赁一个 3090 等 24G 显存的显卡机器&#xff0c;如下图所示镜像选择 PyTorch-->2.1.0-->3.10(ubuntu22.04)-->12.1 接下来打开刚刚租用服务器的 JupyterLab&#xff0c;并且打开其中的终端开始环境配置、模型下载和运行演示。 pip 换…

一款优秀的下载和共享工具

一、简介 1、它以舒适和快速的方式下载Internet文件&#xff0c;同时支持断点续传和嗅探视频音频的功能。 它具有站点抓取、批量下载队列和计划任务下载等功能&#xff0c;可以接管所有浏览器的下载任务&#xff0c;包括Edge&#xff0c;Firefox和Chrome等主流浏览器。 对于用…