消息队列-kafka-消息发送流程(源码跟踪)

news2024/11/15 14:01:40

官方网址

源码:https://kafka.apache.org/downloads
快速开始:https://kafka.apache.org/documentation/#gettingStarted
springcloud整合

发送消息流程

在这里插入图片描述
主线程:主线程只负责组织消息,如果是同步发送会阻塞,如果是异步发送需要传入一个回调函数。
Map集合:存储了主线程的消息。
Sender线程:真正的发送其实是sender去发送到broker中。

源码阅读

1 首先打开Producer.send()可以看到里面的内容

// 返回值是一个 Future 参数为ProducerRecord
Future<RecordMetadata> send(ProducerRecord<K, V> record);
// ProducerRecord定义了这些信息
// 主题
private final String topic;
// 分区
private final Integer partition;
// header
private final Headers headers;
private final K key;
private final V value;
// 时间戳
private final Long timestamp;

2 发送之前的前置处理

public Future<RecordMetadata> send(ProducerRecord<K, V> record, Callback callback) {
     // intercept the record, which can be potentially modified; this method does not throw exceptions
     // 这里给开发者提供了前置处理的勾子
     ProducerRecord<K, V> interceptedRecord = this.interceptors.onSend(record);
     // 我们最终发送的是经过处理后的消息 并且如果是异步发送会有callback 这个是用户定义的
     return doSend(interceptedRecord, callback);
 }

3 进入真正的发送逻辑Future doSend()

  • 由于是网络通信,所以我们要序列化,在这个函数里面就做了序列化的内容。
try {
     serializedKey = keySerializer.serialize(record.topic(), record.headers(), record.key());
 } catch (ClassCastException cce) {
     throw new SerializationException("Can't convert key of class " + record.key().getClass().getName() +
             " to class " + producerConfig.getClass(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG).getName() +
             " specified in key.serializer", cce);
 }
 byte[] serializedValue;
 try {
     serializedValue = valueSerializer.serialize(record.topic(), record.headers(), record.value());
 } catch (ClassCastException cce) {
     throw new SerializationException("Can't convert value of class " + record.value().getClass().getName() +
             " to class " + producerConfig.getClass(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG).getName() +
             " specified in value.serializer", cce);
 }
  • 然后我们获取分区
// 然后这里又是一个策略者模式 也是由用户可以配置的  DefaultPartitioner UniformStickyPartitioner RoundRobinPartitioner 提供了这样三个分区器
private int partition(ProducerRecord<K, V> record, byte[] serializedKey, byte[] serializedValue, Cluster cluster) {
   Integer partition = record.partition();
   return partition != null ?
           partition :
           partitioner.partition(
                   record.topic(), record.key(), serializedKey, record.value(), serializedValue, cluster);
}

4 到了我们的RecordAccumulator,也就是先由主线程发送到了RecordAccumulator

// 也就是对图中的Map集合
RecordAccumulator.RecordAppendResult result = accumulator.append(tp, timestamp, serializedKey,
                 serializedValue, headers, interceptCallback, remainingWaitMs, true, nowMs);

我们发现里面是用一个MAP存储的一个分区和ProducerBatch 是讲这个消息写到内存里面MemoryRecordsBuilder 通过这个进行写入

// 可以看到是一个链表实现的双向队列,也就是消息会按append的顺序写到 内存记录中去
private final ConcurrentMap<TopicPartition, Deque<ProducerBatch>> batches;

5 接着我们看,我们append了以后,会有一个判断去唤醒sender线程,见下面的注释

// 如果说哦我们当前的 这个batch满了或者 我们创建了一个新的batch 这个时候唤醒 sender线程去发送数据
if (result.batchIsFull || result.newBatchCreated) {
      log.trace("Waking up the sender since topic {} partition {} is either full or getting a new batch", record.topic(), partition);
      // 唤醒sender 去发送数据
      this.sender.wakeup();
  }
// 实现了Runnable 所以我们去看一下RUN方法的逻辑
public class Sender implements Runnable 

好上来就是一个循环

while (running) {
    try {
        runOnce();
    } catch (Exception e) {
        log.error("Uncaught error in kafka producer I/O thread: ", e);
    }
}

接着进入runOnece方法,直接看核心逻辑

// 从RecordAccumulator 拿数据 然后发送
Map<Integer, List<ProducerBatch>> batches = this.accumulator.drain(cluster, result.readyNodes, this.maxRequestSize, now);
      addToInflightBatches(batches);
// 中间省去了非核心逻辑
sendProduceRequests(batches, now);

如果继续跟踪的话最终是走到了selector.send()里面:

Send send = request.toSend(destination, header);
 InFlightRequest inFlightRequest = new InFlightRequest(
         clientRequest,
         header,
         isInternalRequest,
         request,
         send,
         now);
 this.inFlightRequests.add(inFlightRequest);
 selector.send(send);

6 接着我们就要看返回逻辑了,可以看到在sendRequest里面sendProduceRequest方法是通过传入了一个回调函数处理返回的。

RequestCompletionHandler callback = new RequestCompletionHandler() {
          public void onComplete(ClientResponse response) {
              handleProduceResponse(response, recordsByPartition, time.milliseconds());
          }
      };
// 如果有返回
if (response.hasResponse()) {
          ProduceResponse produceResponse = (ProduceResponse) response.responseBody();
          for (Map.Entry<TopicPartition, ProduceResponse.PartitionResponse> entry : produceResponse.responses().entrySet()) {
              TopicPartition tp = entry.getKey();
              ProduceResponse.PartitionResponse partResp = entry.getValue();
              ProducerBatch batch = batches.get(tp);
              completeBatch(batch, partResp, correlationId, now, receivedTimeMs + produceResponse.throttleTimeMs());
          }
          this.sensors.recordLatency(response.destination(), response.requestLatencyMs());
      } 

追踪到ProducerBatch

if (this.finalState.compareAndSet(null, tryFinalState)) {
        completeFutureAndFireCallbacks(baseOffset, logAppendTime, exception);
        return true;
    }
private void completeFutureAndFireCallbacks(long baseOffset, long logAppendTime, RuntimeException exception) {
       // Set the future before invoking the callbacks as we rely on its state for the `onCompletion` call
       produceFuture.set(baseOffset, logAppendTime, exception);

       // execute callbacks
       for (Thunk thunk : thunks) {
           try {
               if (exception == null) {
                   RecordMetadata metadata = thunk.future.value();
                   if (thunk.callback != null)
                       thunk.callback.onCompletion(metadata, null);
               } else {
                   if (thunk.callback != null)
                       thunk.callback.onCompletion(null, exception);
               }
           } catch (Exception e) {
               log.error("Error executing user-provided callback on message for topic-partition '{}'", topicPartition, e);
           }
       }

       produceFuture.done();
   }

Thunk 这个其实就是我们在Append的时候的回调:
在这里插入图片描述
至此整个流程就完成了,从发送消息,到响应后回调我们的函数。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/1490063.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

安装Proxmox VE虚拟机平台

PVE是专业的虚拟机平台&#xff0c;可以利用它安装操作系统&#xff0c;如&#xff1a;Win、Linux、Mac、群晖等。 1. 下载镜像 访问PVE官网&#xff0c;下载最新的PVE镜像。 https://www.proxmox.com/en/downloads 2. 下载balenaEtcher balenaEtcher用于将镜像文件&#…

【Vue3】3-6 : 仿ElementPlus框架的el-button按钮组件实

文章目录 前言 本节内容实现需求完整代码如下&#xff1a; 前言 上节,我们学习了 slot插槽&#xff0c;组件内容的分发处理 本节内容 本小节利用前面学习的组件通信知识&#xff0c;来完成一个仿Element Plus框架的el-button按钮组件实现。 仿造的地址&#xff1a;uhttps://…

docker pull 拉取失败,设置docker国内镜像

遇到的问题 最近在拉取nginx时&#xff0c;显示如下错误&#xff1a;Error response from daemon: Get “https://registry-1.docker.io/v2/”: net/http: request canceled (Client.Timeout exceeded while awaiting headers)。 这个的问题是拉取镜像超时&#xff0c;通过检索…

基于Golang客户端实现Nacos服务注册发现和配置管理

基于Golang客户端实现Nacos服务注册发现和配置管理 背景 最近需要把Golang实现的一个web项目集成到基于Spring Cloud Alibaba的微服务体系中&#xff0c;走Spring Cloud Gateway网关路由实现统一的鉴权入口。 软件版本 组件名称组件版本Nacos2.2.0Go1.21.0Ginv1.9.1Nacos-s…

项目部署发布

目录 上传数据库 修改代码中的数据源配置 修改配置文件中的日志级别和日志目录 打包程序 ​编辑​编辑 上传程序 查看进程是否在运行 以及端口 云服务器开放端口(项目所需要的端口) 上传数据库 通过xshell控制服务器 创建目录 mkdir bit_forum 然后进入该目录 查看路…

【AI+CAD】(一)ezdxf 解析DXF文件

DXF文件格式理解 DXF文件格式是矢量图形文件格式&#xff0c;其详细说明了如何表示不同的图形元素。 DXF是一个矢量图形文件&#xff0c;它捕获CAD图形的所有元素&#xff0c;例如文本&#xff0c;线条和形状。更重要的是&#xff0c;DXF是用于在CAD应用程序之间传输数据的图形…

Java日志框架的纷争演进与传奇故事

在Java的世界里&#xff0c;日志记录是每一个应用不可或缺的部分。它帮助开发者了解应用的运行状态、调试问题、监控性能等。而在这背后&#xff0c;是一系列日志框架的发展与演进。今天&#xff0c;就让我们一起回顾这些日志框架的历史&#xff0c;探寻它们背后的故事。 1. Lo…

分布式数据库中全局自增序列的实现

自增序列广泛使用于数据库的开发和设计中&#xff0c;用于生产唯一主键、日志流水号等唯一ID的场景。传统数据库中使用Sequence和自增列的方式实现自增序列的功能&#xff0c;在分布式数据库中兼容Oracle和MySQL等传统数据库语法&#xff0c;也是基于Sequence和自增列的方式实现…

使用Visual Studio 2022 创建lib和dll并使用

概述&#xff1a;对于一个经常写javaWeb的人来说,使用Visual Studio似乎没什么必要&#xff0c;但是对于使用ffi的人来说&#xff0c;使用c或c编译器&#xff0c;似乎是必不可少的&#xff0c;下面我将讲述如何用Visual Studio 2022 来创建lib和dll&#xff0c;并使用。 静态库…

UNIapp实现局域网内在线升级

首先是UNIapp 生成apk 用Hbuilder 进行打包 可以从网站https://www.yunedit.com/reg?gotocert 使用自有证书&#xff0c;目测比直接使用云证书要快一些。 发布apk 网站 用IIS发布即可 注意事项中记录如下内容 第一、需要在 iis 的MiMe 中添加apk 的格式&#xff0c;否则无法…

Java架构之路-架构应全面了解的技术栈和工作域

有时候我在想这么简单简单的东西&#xff0c;怎么那么难以贯通。比如作为一个架构师可能涉及的不单单是技术架构&#xff0c;还包含了项目管理&#xff0c;一套完整的技术架构也就那么几个技术栈&#xff0c;只要花点心思&#xff0c;不断的往里面憨实&#xff0c;总会学的会&a…

UE4升级UE5 蓝图节点变更汇总(4.26/27-5.2/5.3)

一、删除部分 Ploygon Editing删除 Polygon Editing这个在4.26、4.27中的插件&#xff0c;在5.1后彻底失效。 相关的蓝图&#xff0c;如编辑器蓝图 Generate mapping UVs等&#xff0c;均失效。 如需相关功能&#xff0c;请改成Dynamic Mesh下的方法。 GetSupportedClass删…

在K8S集群中部署SkyWalking

1. 环境准备 K8S 集群kubectlhelm 2. 为什么要部署SkyWalking&#xff1f; 我也不道啊&#xff0c;老板说要咱就得上啊。咦&#xff0c;好像可以看到服务的各项指标&#xff0c;像SLA&#xff0c;Apdex这些&#xff0c;主要是能够进行请求的链路追踪&#xff0c;bug排查的利…

C向C++的一个过渡

思维导图 输入输出&#xff0c;以及基础头文件 在c语言中我们常用scanf("%d",&n);和printf("%d\n",n);来输出一些变量和常量&#xff0c;在C中我们可以用cin;和cout;来表示输入输出。 在C语言中输入输出有头文件&#xff0c;在C也有头文件&#xff0…

解放人力,提升品质:码垛输送机的工业应用与价值

在现代工业生产中&#xff0c;码垛输送机已成为许多企业自动化生产线上的关键设备。它不仅可以提高生产效率&#xff0c;降低人力成本&#xff0c;还能确保产品质量&#xff0c;并为企业带来许多其他方面的实际好处。 1. 提高生产效率&#xff1a; 快速码垛&#xff1a;码垛输…

蓝桥杯练习题——dp

五部曲&#xff08;代码随想录&#xff09; 1.确定 dp 数组以及下标含义 2.确定递推公式 3.确定 dp 数组初始化 4.确定遍历顺序 5.debug 入门题 1.斐波那契数 思路 1.f[i]&#xff1a;第 i 个数的值 2.f[i] f[i - 1] f[i - 2] 3.f[0] 0, f[1] 1 4.顺序遍历 5.记得特判 …

基于springboot+vue的医院药品管理系统

博主主页&#xff1a;猫头鹰源码 博主简介&#xff1a;Java领域优质创作者、CSDN博客专家、阿里云专家博主、公司架构师、全网粉丝5万、专注Java技术领域和毕业设计项目实战&#xff0c;欢迎高校老师\讲师\同行交流合作 ​主要内容&#xff1a;毕业设计(Javaweb项目|小程序|Pyt…

STM32CubeIDE基础学习-新建STM32CubeIDE基础工程

STM32CubeIDE基础学习-新建STM32CubeIDE基础工程 前言 有开发过程序的朋友都清楚&#xff0c;后面开发是不需要再新建工程的&#xff0c;一般都是在初学时或者有特殊需要的时候才需要新建项目工程的。 后面开发都是可以在这种已有的工程上添加相关功能就行&#xff0c;只要前…

Linux系统部署Discuz论坛并发布至公网随时随地可远程访问

目录 ​编辑 前言 1.安装基础环境 2.一键部署Discuz 3.安装cpolar工具 4.配置域名访问Discuz 5.固定域名公网地址 6.配置Discuz论坛 结语 作者简介&#xff1a; 懒大王敲代码&#xff0c;计算机专业应届生 今天给大家聊聊Linux系统部署Discuz论坛并发布至公网随时随地…

Qt+FFmpeg+opengl从零制作视频播放器-1.项目介绍

1.简介 学习音视频开发&#xff0c;首先从做一款播放器开始是比较合理的&#xff0c;每一章节&#xff0c;我都会将源码贴在最后&#xff0c;此专栏你将学习到以下内容&#xff1a; 1&#xff09;音视频的解封装、解码&#xff1b; 2&#xff09;Qtopengl如何渲染视频&#…