消息队列-kafka-消息发送流程(源码跟踪) 与消息可靠性

news2025/1/23 9:20:37

官方网址

源码:https://kafka.apache.org/downloads
快速开始:https://kafka.apache.org/documentation/#gettingStarted
springcloud整合

发送消息流程

在这里插入图片描述
主线程:主线程只负责组织消息,如果是同步发送会阻塞,如果是异步发送需要传入一个回调函数。
Map集合:存储了主线程的消息。
Sender线程:真正的发送其实是sender去发送到broker中。

源码阅读

1 首先打开Producer.send()可以看到里面的内容

// 返回值是一个 Future 参数为ProducerRecord
Future<RecordMetadata> send(ProducerRecord<K, V> record);
// ProducerRecord定义了这些信息
// 主题
private final String topic;
// 分区
private final Integer partition;
// header
private final Headers headers;
private final K key;
private final V value;
// 时间戳
private final Long timestamp;

2 发送之前的前置处理

public Future<RecordMetadata> send(ProducerRecord<K, V> record, Callback callback) {
     // intercept the record, which can be potentially modified; this method does not throw exceptions
     // 这里给开发者提供了前置处理的勾子
     ProducerRecord<K, V> interceptedRecord = this.interceptors.onSend(record);
     // 我们最终发送的是经过处理后的消息 并且如果是异步发送会有callback 这个是用户定义的
     return doSend(interceptedRecord, callback);
 }

3 进入真正的发送逻辑Future doSend()

  • 由于是网络通信,所以我们要序列化,在这个函数里面就做了序列化的内容。
try {
     serializedKey = keySerializer.serialize(record.topic(), record.headers(), record.key());
 } catch (ClassCastException cce) {
     throw new SerializationException("Can't convert key of class " + record.key().getClass().getName() +
             " to class " + producerConfig.getClass(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG).getName() +
             " specified in key.serializer", cce);
 }
 byte[] serializedValue;
 try {
     serializedValue = valueSerializer.serialize(record.topic(), record.headers(), record.value());
 } catch (ClassCastException cce) {
     throw new SerializationException("Can't convert value of class " + record.value().getClass().getName() +
             " to class " + producerConfig.getClass(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG).getName() +
             " specified in value.serializer", cce);
 }
  • 然后我们获取分区
// 然后这里又是一个策略者模式 也是由用户可以配置的  DefaultPartitioner UniformStickyPartitioner RoundRobinPartitioner 提供了这样三个分区器
private int partition(ProducerRecord<K, V> record, byte[] serializedKey, byte[] serializedValue, Cluster cluster) {
   Integer partition = record.partition();
   return partition != null ?
           partition :
           partitioner.partition(
                   record.topic(), record.key(), serializedKey, record.value(), serializedValue, cluster);
}

4 到了我们的RecordAccumulator,也就是先由主线程发送到了RecordAccumulator

// 也就是对图中的Map集合
RecordAccumulator.RecordAppendResult result = accumulator.append(tp, timestamp, serializedKey,
                 serializedValue, headers, interceptCallback, remainingWaitMs, true, nowMs);

我们发现里面是用一个MAP存储的一个分区和ProducerBatch 是讲这个消息写到内存里面MemoryRecordsBuilder 通过这个进行写入

// 可以看到是一个链表实现的双向队列,也就是消息会按append的顺序写到 内存记录中去
private final ConcurrentMap<TopicPartition, Deque<ProducerBatch>> batches;

5 接着我们看,我们append了以后,会有一个判断去唤醒sender线程,见下面的注释

// 如果说哦我们当前的 这个batch满了或者 我们创建了一个新的batch 这个时候唤醒 sender线程去发送数据
if (result.batchIsFull || result.newBatchCreated) {
      log.trace("Waking up the sender since topic {} partition {} is either full or getting a new batch", record.topic(), partition);
      // 唤醒sender 去发送数据
      this.sender.wakeup();
  }
// 实现了Runnable 所以我们去看一下RUN方法的逻辑
public class Sender implements Runnable 

好上来就是一个循环

while (running) {
    try {
        runOnce();
    } catch (Exception e) {
        log.error("Uncaught error in kafka producer I/O thread: ", e);
    }
}

接着进入runOnece方法,直接看核心逻辑

// 从RecordAccumulator 拿数据 然后发送
Map<Integer, List<ProducerBatch>> batches = this.accumulator.drain(cluster, result.readyNodes, this.maxRequestSize, now);
      addToInflightBatches(batches);
// 中间省去了非核心逻辑
sendProduceRequests(batches, now);

如果继续跟踪的话最终是走到了selector.send()里面:

Send send = request.toSend(destination, header);
 InFlightRequest inFlightRequest = new InFlightRequest(
         clientRequest,
         header,
         isInternalRequest,
         request,
         send,
         now);
 this.inFlightRequests.add(inFlightRequest);
 selector.send(send);

6 接着我们就要看返回逻辑了,可以看到在sendRequest里面sendProduceRequest方法是通过传入了一个回调函数处理返回的。

RequestCompletionHandler callback = new RequestCompletionHandler() {
          public void onComplete(ClientResponse response) {
              handleProduceResponse(response, recordsByPartition, time.milliseconds());
          }
      };
// 如果有返回
if (response.hasResponse()) {
          ProduceResponse produceResponse = (ProduceResponse) response.responseBody();
          for (Map.Entry<TopicPartition, ProduceResponse.PartitionResponse> entry : produceResponse.responses().entrySet()) {
              TopicPartition tp = entry.getKey();
              ProduceResponse.PartitionResponse partResp = entry.getValue();
              ProducerBatch batch = batches.get(tp);
              completeBatch(batch, partResp, correlationId, now, receivedTimeMs + produceResponse.throttleTimeMs());
          }
          this.sensors.recordLatency(response.destination(), response.requestLatencyMs());
      } 

追踪到ProducerBatch

if (this.finalState.compareAndSet(null, tryFinalState)) {
        completeFutureAndFireCallbacks(baseOffset, logAppendTime, exception);
        return true;
    }
private void completeFutureAndFireCallbacks(long baseOffset, long logAppendTime, RuntimeException exception) {
       // Set the future before invoking the callbacks as we rely on its state for the `onCompletion` call
       produceFuture.set(baseOffset, logAppendTime, exception);

       // execute callbacks
       for (Thunk thunk : thunks) {
           try {
               if (exception == null) {
                   RecordMetadata metadata = thunk.future.value();
                   if (thunk.callback != null)
                       thunk.callback.onCompletion(metadata, null);
               } else {
                   if (thunk.callback != null)
                       thunk.callback.onCompletion(null, exception);
               }
           } catch (Exception e) {
               log.error("Error executing user-provided callback on message for topic-partition '{}'", topicPartition, e);
           }
       }

       produceFuture.done();
   }

Thunk 这个其实就是我们在Append的时候的回调:
在这里插入图片描述
至此整个流程就完成了,从发送消息,到响应后回调我们的函数。

消息可靠性

// 所有消费者的配置都在ProducerConfig 里面
public static final String ACKS_CONFIG = "acks";

acks = 0:异步形式,单向发送,不会等待 broker 的响应
acks = 1:主分区保存成功,然后就响应了客户端,并不保证所有的副本分区保存成功
acks = all 或 -1:等待 broker 的响应,然后 broker 等待副本分区的响应,总之数据落地到所有的分区后,才能给到producer 一个响应

在可靠性的保证下,假设一些故障:

  • Broker 收到消息后,同步 ISR 异常:只要在 -1 的情况下,其实不会造成消息的丢失,因为有重试机制
  • Broker 收到消息,并同步 ISR 成功,但是响应超时:只要在 -1 的情况下,其实不会造成消息的丢失,因为有重试机制

可靠性能保证哪些,不能保障哪些?

  • 保证了消息不会丢失
  • 不保证消息一定不会重复(消息有重复的概率,需要消费者有幂等性控制机制)

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/1501238.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

web基础04-flex布局

1.概述 Flexbox​ 是 ​flexible box​ 的简称&#xff08;注&#xff1a;意思是“​灵活的盒子容器​”&#xff09;&#xff0c;是 CSS3 引入的新的布局模式。 它决定了元素如何在页面上排列&#xff0c;使它们能在不同的屏幕尺寸和设备下可预测地展现出来。 它之所以被称…

C++错误总结(1)

1.定义函数类型时&#xff0c;如果没有返回值&#xff0c;用void void swap(int &x, int &y){ int tem x; x y; y tem; } 2.输入时&#xff0c;不加换行符 cin >> a >> b >> c >> endl ;(红色标记的是错误的部分) 3.【逆序出入…

【LeetCode】升级打怪之路 Day 16:二叉树题型 —— 二叉树的构造

今日题目&#xff1a; 654. 最大二叉树105. 从前序与中序遍历序列构造二叉树106. 从中序与后序遍历序列构造二叉树889. 根据前序和后序遍历构造二叉树 目录 LC 654. 最大二叉树 【easy】 Problem&#xff1a;根据遍历序列来还原二叉树 【classic】 ⭐⭐⭐⭐⭐LC 105. 从前序与中…

华为设备小型园区网方案(有线+无线+防火墙)

&#xff08;一&#xff09;配置有线部分 1.配置LSW2 &#xff08;1&#xff09;创建相关vlan [LSW2]vlan batch 10 3000 &#xff08;2&#xff09;配置连接LSW1的Eth-Trunk1&#xff0c;透传VLAN 10 3000 [LSW2]int Eth-Trunk 1 [LSW2-Eth-Trunk1]port link-type trunk [LSW2…

微信小程序(五十四)腾讯位置服务示范(2024/3/8更新)

教程如下&#xff1a; 上一篇 1.先在官网注册一下账号&#xff08;该绑定的都绑定一下&#xff09; 腾讯位置服务官网 2.进入控制台 3.创建应用 3. 额度分配 4.下载微信小程序SDK 微信小程序SDK下载渠道 5.解压将俩js文件放在项目合适的地方 6.加入安全域名or设置不验证合…

自动化工程师涨薪难,原因出在这里

大家好&#xff0c;今天说说真实的工控行业&#xff0c;摒弃虚无的鸡汤&#xff0c;聊点实在的。 举个例子&#xff0c;某工做销售&#xff0c;卖电控器件&#xff0c;眼见PLC收入可观&#xff0c;开始感到压力。于是&#xff0c;他下定决心学PLC&#xff0c;报了培训班。毕业后…

Flink并行度

1、Task flink中每个算子就是一个Task&#xff0c;比如flatMap、map、sum是一个Task。 2、SubTask 算子有几个并行度SubTask的数量就是几&#xff0c;比如 3、算子并行度 算子并行度指的是每个算子的并行度&#xff0c;可用env.setParallelism(1);设置所有算子的并行度&am…

防御保护--IPSEC VPPN实验

实验拓扑图 实验背景&#xff1a;FW1和FW2是双机热备的状态。 实验要求&#xff1a;在FW5和FW3之间建立一条IPSEC通道&#xff0c;保证10.0.2.0/24网段可以正常访问到192.168.1.0/24 IPSEC VPPN实验配置&#xff08;由于是双机热备状态&#xff0c;所以FW1和FW2只需要配置FW1…

【树莓派+python】实现三色呼吸灯+按钮切换

文章目录 Traffic-lights电路连接在这里插入图片描述代码实现算法设计流程图python环境配置三色呼吸灯实现三色呼吸灯按钮控制 Traffic-lights 电路连接 【元件实物图】 图1为Button&#xff0c;按钮的状态控制SIG引脚的电平值。图2为RGB灯&#xff0c;有三种颜色&#xff1a…

项目解决方案:多地5G蓄能电站的视频监控联网系统设计方案

目 录 一、前言 二、系统架构设计 1、系统架构设计说明 2、系统拓扑图 三、关键技术 1. 5G支持技术 2. 视频图像处理技术 3. 数据融合与分析技术 四、功能特点 1. 高效可靠 2. 实时监测 3. 远程控制 4. 故障预测 五、应用前景 一、前言 随着能源…

Unity类银河恶魔城学习记录9-2 P83 Explosive crystal源代码

Alex教程每一P的教程原代码加上我自己的理解初步理解写的注释&#xff0c;可供学习Alex教程的人参考 此代码仅为较上一P有所改变的代码 【Unity教程】从0编程制作类银河恶魔城游戏_哔哩哔哩_bilibili Crystal_Skill_Controller using System.Collections; using System.Colle…

Apache服务的搭建与配置

一、apache安装 systemctl stop firewalldsystemctl disable firewalldsetenforce 0yum -y install httpdsystemctl start httpdnetstat -ntlp | grep 80 二、认识主配置文件 # vim /etc/httpd/conf/httpd.conf ServerRoot "/etc/httpd" #定义工作目…

如何优雅地处理Web应用中的大文件上传

处理和上传大文件是Web开发中的常见挑战。一方面&#xff0c;我们需要确保应用的性能和响应性不被影响&#xff1b;另一方面&#xff0c;还要保证数据的安全性和完整性。接下来&#xff0c;我们将逐步探讨如何使用文件分片、Web Workers和SHA-256散列计算来解决这些问题。 问题…

数据结构---复杂度(1)

1.时间复杂度 衡量算法的好坏&#xff0c;使用大写的o来表示时间复杂度&#xff0c;通俗的讲&#xff0c;就是一个算法执行的次数&#xff1b; 时间复杂度就是数学里面的函数表达式&#xff1b;本质上是一个函数&#xff1b; 下面举几个例子&#xff1a; (1)这里的执行次数是…

【RISC-V 指令集】RISC-V 向量V扩展指令集介绍(二)-向量元素到向量寄存器状态的映射

1. 引言 以下是《riscv-v-spec-1.0.pdf》文档的关键内容&#xff1a; 这是一份关于向量扩展的详细技术文档&#xff0c;内容覆盖了向量指令集的多个关键方面&#xff0c;如向量寄存器状态映射、向量指令格式、向量加载和存储操作、向量内存对齐约束、向量内存一致性模型、向量…

测试人员需要掌握的 k8s 知识

前言 kubernetes 在容器编排领域已经形成统治地位&#xff0c;不管是开发、运维还是测试&#xff0c;掌握 kubernetes 都变得非常有必要。这篇文章通过 minikube 搭建一个简单的 kubernetes 运行环境。 安装虚拟机 主流的操作系统都支持 kubernetes&#xff0c;但是 windows…

禁止使用搜索引擎,你了解吗?

员工A&#xff1a;“我今天想搜索的时候&#xff0c;用不了浏览器了&#xff0c;你能用么&#xff1f;” 员工B&#xff1a;“不知道啊我试一下啊” “也不行” 员工C&#xff1a;“为什么啊&#xff1f;” 针对上述对话&#xff0c;我们不禁思考&#xff1a; 公司为什么禁…

一代影后阮玲玉:国际妇女节这天自杀,让很多人都感到震惊和惋惜!

一代影后阮玲玉:国际妇女节这天自杀&#xff0c;让很多人都感到震惊和惋惜&#xff01; 说起阮玲玉&#xff0c;那可真是咱们中国电影史上的一位大明星啊。#李秘书讲写作# 今日说起她演过的电影和成就&#xff0c;嘿&#xff0c;说出来可都是响当当的。 阮玲玉&#xff0c;原名…

算法刷题day25:多路归并

目录 引言概念一、鱼塘钓鱼二、技能升级三、序列 引言 关于这个多路并归蓝桥杯考的不是很多&#xff0c;如果要出的话&#xff0c;可能模型都会差不多&#xff0c;因为不会出太难的题&#xff0c;难题基本上都是贪心、DP之类的&#xff0c;所以好好刷题刷熟练就行了&#xff0…