【Flink网络数据传输(4)】RecordWriter(下)封装数据并发送到网络的过程

news2025/1/23 22:37:11

文章目录

  • 一. RecordWriter封装数据并发送到网络
    • 1. 数据发送到网络的具体流程
    • 2. 源码层面
      • 2.1. Serializer的实现逻辑
        • a. SpanningRecordSerializer的实现
        • b. SpanningRecordSerializer中如何对数据元素进行序列化
      • 2.2. 将ByteBuffer中间数据写入BufferBuilder
  • 二. BufferBuilder申请资源并创建
    • 1. ChannelSelectorRecordWriter创建BufferBuilder
    • 2. BroadcastRecordWriter创建BufferBuilder

一. RecordWriter封装数据并发送到网络

1. 数据发送到网络的具体流程

RecordWriter对接入的StreamRecord数据进行序列化并等待下游任务消费的过程,整个过程细节如下。

  1. StreamRecord通过RecordWriterOutput写入RecordWriter,并在RecordWriter中通过RecordSerializer组件将StreamRecord序列化为ByteBuffer数据格式。

  2. RecordWriter向ResultPartition申请BufferBuilder对象,用于构建BufferConsumer对象,将序列化后的二进制数据存储在申请到的Buffer中。ResultPartition会向LocalBufferPool申请MemorySegment内存块,用于存储Buffer数据

  3. BufferBuilder中会不断接入ByteBuffer数据,直到将BufferBuilder中的Buffer空间占满,此时会申请新的BufferBuilder继续构建BufferConsumer数据集。

  4. Buffer构建完成后,会调用flushTargetPartition()方法,让ResultPartition向下游输出数据,此时会通知NetworkSequenceViewReader组件开始消费ResultSubPartition中的BufferConsumer对象。

  5. 当BufferConsumer中Buffer数据被推送到网络后,回收BufferConsumer中的MemorySegment内存空间,继续用于后续的消息处理。

在这里插入图片描述

 

2. 源码层面

接下来我们从源码的角度了解RecordWriter具体处理数据的逻辑。在RecordWriterOutput中调用pushToRecordWriter方法将数据写出。

在这里插入图片描述

通过recordWriter.emit(serializationDelegate)方法,将数据元素发送到RecordWriter中进行处理。主要逻辑如下

  1. 序列化数据为ByteBuffer二进制数据,并缓存在SpanningRecordSerializer.serializationBuffer对象中。
  2. 将序列化器生成的中间数据复制到指定分区中,实际上就是将ByteBuffer数据复制到BufferBuiler对象中。
  3. 如果BufferBuiler中存储了完整的数据元素,就会清空序列化器的中间数据,因为序列化器中累积的数据不宜过大。
protected void emit(T record, int targetSubpartition) throws IOException {  
    checkErroneous();  
  
    targetPartition.emitRecord(serializeRecord(serializer, record), targetSubpartition);  
  
    if (flushAlways) {  
        targetPartition.flush(targetSubpartition);  
    }  
}



protected void emit(T record, int targetChannel) throws IOException, 
   InterruptedException {
   checkErroneous();
   // 数据序列化
   serializer.serializeRecord(record);
   // 将序列化器中的数据复制到指定分区中
   if (copyFromSerializerToTargetChannel(targetChannel)) {
      // 清空序列化器
      serializer.prune();
   }
}

 

2.1. Serializer的实现逻辑

接着了解如何将序列化器中的数据转换成Buffer并存储到ResultPartiton中,最终将数据发送到下游。

a. SpanningRecordSerializer的实现

SpanningRecordSerializer实现将序列化后的BytesBuffer数据写入BufferBuilder。

SpanningRecordSerializer对象主要包含了DataOutputSerializer serializationBuffer和ByteBuffer dataBuffer两个成员变量。

  • DataOutputSerializer可以将数据转换成二进制格式并存储在byte[]数组中。在serialization中会调用serializationBuffer.wrapAsByteBuffer()方法,将serializationBuffer中生成的byte[]数组转换成ByteBuffer数据结构,并赋值给dataBuffer对象。
  • ByteBuffer是Java NIO中用于对二进制数据进行操作的Buffer接口,底层有DirectByteBuffer和HeapByteBuffer等实现,通过ByteBuffer提供的方法,可以轻松实现对二进制数据的操作。

 

b. SpanningRecordSerializer中如何对数据元素进行序列化

SpanningRecordSerializer.serializeRecord()方法主要逻辑如下。

1)清理serializationBuffer的中间数据,实际上就是将byte[]数组的position参数置为0。
2)设定serialization buffer的初始容量,默认不小于4。
3)将数据元素写入serializationBuffer的bytes[]数组。(所有数据元素都实现了IOReadableWritable接口,可以直接将数据对象转换为二进制格式)
4)获取serializationBuffer的长度信息,并写入serializationBuffer。
5)将serializationBuffer中的byte[]数据封装为java.io.ByteBuffer数据结构,最终赋值到dataBuffer的中间结果中。

public void serializeRecord(T record) throws IOException {
   if (CHECKED) {
      if (dataBuffer.hasRemaining()) {
         throw new IllegalStateException("Pending serialization of previous record.");
      }
   }
   // 首先清理serializationBuffer中的数据
   serializationBuffer.clear();
   // 设定serialization buffer数量
   serializationBuffer.skipBytesToWrite(4);
   // 将record数据写入serializationBuffer
   record.write(serializationBuffer);
   // 获取serializationBuffer的长度信息并记录到serializationBuffer对象中
   int len = serializationBuffer.length() - 4;
   serializationBuffer.setPosition(0);
   serializationBuffer.writeInt(len);
   serializationBuffer.skipBytesToWrite(len);
   // 对serializationBuffer进行wrapp处理,转换成ByteBuffer数据结构
   dataBuffer = serializationBuffer.wrapAsByteBuffer();
}

Flink 1.12版本中RecordWriter就提供了serializeRecord的能力,没有单拎出来实现。

 

2.2. 将ByteBuffer中间数据写入BufferBuilder

首先BufferBuilder用于构建完整的Buffer数据。在copyFromSerializerToTargetChannel()方法中实现了将RecordSerializer中的ByteBuffer中间数据写入BufferBuilder的逻辑:

  1. 对序列化器进行Reset操作,重置初始化位置。
  2. 将序列化器的ByteBuffer中间数据写入BufferBuilder。
  3. 判断当前BufferBuilder是否构建了完整的Buffer数据,完成BufferBuilder中Buffer的构建。
  4. 判断SerializationResult中是否具有完整的数据元素,如果是则将pruneTriggered置为True,然后清空当前的BufferBuilder,并跳出循环。
  5. 创建新的bufferBuilder,继续从序列化器中将中间数据复制到BufferBuilder中。
  6. 指定flushAlways参数为True,调用flushTargetPartition()方法将数据写入ResultPartition。为防止过度频繁地将数据写入ResultPartiton,在RecordWriter中会有独立的outputFlusher线程(在构造器中),周期性地将构建出来的Buffer数据推送到ResultPartiton本地队列中存储,默认延迟为100ms。
protected boolean copyFromSerializerToTargetChannel(int targetChannel) throws 
   IOException, InterruptedException {
   // 对序列化器进行Reset操作,初始化initial position
   serializer.reset();
   // 创建BufferBuilder
   boolean pruneTriggered = false;
   BufferBuilder bufferBuilder = getBufferBuilder(targetChannel);
   // 调用序列化器将数据写入bufferBuilder
   SerializationResult result = serializer.copyToBufferBuilder(bufferBuilder);
   // 如果SerializationResult是完整Buffer
   while (result.isFullBuffer()) {
      // 则完成创建Buffer数据的操作
      finishBufferBuilder(bufferBuilder);
      // 如果是完整记录,则将pruneTriggered置为True
      if (result.isFullRecord()) {
         pruneTriggered = true;
         emptyCurrentBufferBuilder(targetChannel);
         break;
      }
      // 创建新的bufferBuilder,继续复制序列化器中的数据到BufferBuilder中
      bufferBuilder = requestNewBufferBuilder(targetChannel);
      result = serializer.copyToBufferBuilder(bufferBuilder);
   }
   checkState(!serializer.hasSerializedData(), "All data should be written at once");
     // 如果指定的flushAlways,则直接调用flushTargetPartition将数据写入ResultPartition
   if (flushAlways) {
      flushTargetPartition(targetChannel);
   }
   return pruneTriggered;
}

 

二. BufferBuilder申请资源并创建

1. ChannelSelectorRecordWriter创建BufferBuilder

在ChannelSelectorRecordWriter.getBufferBuilder()方法中定义了BufferBuilder的创建过程。

//1. targetChannel确认数据写入的分区,ID与下游InputGate中的InputChannelID是对应的
//2. 
public BufferBuilder getBufferBuilder(int targetChannel) throws IOException, 
   InterruptedException {
	//在ChannelSelectorRecordWriter中维护了
	//bufferBuilders[]数组,用于存储创建好的BufferBuilder对象
   if (bufferBuilders[targetChannel] != null) {
      return bufferBuilders[targetChannel];
   } else {
   //只有在无法从bufferBuilders[]中获取BufferBuilder时,
   //才会调用requestNewBufferBuilder()方法创建新的BufferBuilder对象。
      return requestNewBufferBuilder(targetChannel);
   }
}

requestNewBufferBuilder()方法逻辑如下

  1. 检查bufferBuilders[]的状态,确保bufferBuilders[targetChannel]为空或者bufferBuilders[targetChannel].isFinished()方法返回值为True。
  2. 调用targetPartition.getBufferBuilder()方法获取新的BufferBuilder,这里的targetPartition就是前面提到的ResultPartition。在ResultPartition中会向LocalBufferPool申请Buffer内存空间,用于存储序列化后的ByteBuffer数据。
  3. 向targetPartition添加通过bufferBuilder构建的BufferConsumer对象,bufferBuilder和BufferConsumer内部维护了同一个Buffer数据。BufferConsumer会被存储到ResultSubpartition的BufferConsumer队列中。
  4. 将创建好的bufferBuilder添加至数组,用于下次直接获取和构建BufferConsumer对象。
public BufferBuilder requestNewBufferBuilder(int targetChannel) throws 
   IOException, InterruptedException {
   checkState(bufferBuilders[targetChannel] == null 
              || bufferBuilders[targetChannel].isFinished());
   // 调用targetPartition获取BufferBuilder
   BufferBuilder bufferBuilder = targetPartition.getBufferBuilder();
   // 向targetPartition中添加BufferConsumer
   targetPartition.addBufferConsumer(bufferBuilder.createBufferConsumer(),
                                     targetChannel);
   // 将创建好的bufferBuilder添加至数组
   bufferBuilders[targetChannel] = bufferBuilder;
   return bufferBuilder;
}

 

2. BroadcastRecordWriter创建BufferBuilder

在BroadcastRecordWriter内部创建BufferBuilder的过程中,会将创建的bufferConsumer对象添加到所有的ResultSubPartition中,实现将Buffer数据下发至所有InputChannel,如下代码:

public BufferBuilder requestNewBufferBuilder(int targetChannel) 
    throws IOException, InterruptedException {
   checkState(bufferBuilder == null || bufferBuilder.isFinished());
   BufferBuilder builder = targetPartition.getBufferBuilder();
   if (randomTriggered) {
      targetPartition.addBufferConsumer(builder.createBufferConsumer(), targetChannel);
   } else {
      try (BufferConsumer bufferConsumer = builder.createBufferConsumer()) {
         for (int channel = 0; channel < numberOfChannels; channel++) {
            targetPartition.addBufferConsumer(bufferConsumer.copy(), channel);
         }
      }
   }
   bufferBuilder = builder;
   return builder;
}

 

以上步骤就是在RecordWriter组件中将数据元素序列化成二进制格式,然后通过BufferBuilder构建成Buffer类型数据,最终存储在ResultPartition的ResultSubPartition中。

这是从Task的层面了解数据网络传输过程,下篇了解在TaskManager中如何构建底层的网络传输通道。

 

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/1495327.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

java ~ word模板填充字符后输出到指定目录

word文件格式&#xff1a; jar包&#xff1a; <dependency><groupId>com.deepoove</groupId><artifactId>poi-tl</artifactId><version>1.10.0</version></dependency>样例代码&#xff1a; // 封装参数集合Map<String, Ob…

【异常处理】BadSqlGrammarException低级SQL语法异常

报错 org.springframework.jdbc.BadSqlGrammarException: ### Error querying database. Cause: java.sql.SQLSyntaxErrorException: You have an error in your SQL syntax; check the manual that corresponds to your MySQL server version for the right syntax to use …

MYSQL5.7报1205 - Lock wait timeout exceeded; try restarting transaction

简介 今天使用navicate操作添加时&#xff0c;mysql报错误&#xff0c;错误如下 原因 这个问题的原因是在mysql中产生了事务A&#xff0c;执行了修改的语句&#xff0c;比如&#xff1a; update t1 set aget18 where id1;此时事务并未进行提交&#xff0c;事务B开始运行&am…

Linux_防火墙无法启动问题

当查看防火墙状体的时候报如下错误 ● firewalld.service - firewalld - dynamic firewall daemonLoaded: loaded (/usr/lib/systemd/system/firewalld.service; enabled; vendor preset: enabled)Active: inactive (dead) since 四 2024-03-07 07:42:16 CST; 7s agoDocs: man…

【黑马程序员】STL实战--演讲比赛管理系统

演讲比赛管理系统 需求说明 比赛规则 程序功能 创建管理类 功能描述 提供菜单界面与用户交互 对演讲比赛流程进行控制 与文件的读写交互 创建演讲比赛管理类 新建speechManager.hpp #pragma once#include <iostream>using namespace std;// 设计演讲比赛类 clas…

光线追踪5- Surface normals and multiple objects

首先&#xff0c;让我们获取一个表面法线&#xff0c;以便进行着色。这是一个垂直于交点处表面的向量。在我们的代码中&#xff0c;我们需要做一个重要的设计决定&#xff1a;法线向量是否为任意长度&#xff0c;还是将其归一化为单位长度。 诱人的是&#xff0c;如果不…

react高阶组件:如何同时兼容class类组件和函数式组件。

场景&#xff1a; 每个页面都要实现分享功能&#xff0c;但是页面有些是用class类&#xff0c;有些又直接是函数式。 方案1&#xff1a; 写2套方法。各自引用。&#xff08;维护不太好&#xff0c;改要改2遍&#xff09; 方案2&#xff1a; 可以封一个 jsx的组件&#xff0c…

服务器cpu占用高没看到进程

现象&#xff1a; 1. 今天连服务器发现root密码被改了&#xff0c;再改回去&#xff0c;登录发现服务器很卡&#xff0c;top查看&#xff0c;可用的cpu为0&#xff0c;但是没看到明显的进程&#xff0c;很显然中了病毒 2. 发现crontab -l有异常的定时计划&#xff0c;给删除掉 …

DailyNotes个人笔记管理工具

DailyNotes 是记录笔记和跟踪任务的应用程序&#xff0c;使用markdown进行编辑 部署 下载镜像 docker pull m0ngr31/dailynotes创建目录并授权 mkdir -p /data/dailynotes/config_dir chmod -R 777 /data/dailynotes启动容器 docker run -d --restart always --name mynot…

【Web安全靶场】upload-labs-master 1-21

upload-labs-master 其他靶场见专栏… 文章目录 upload-labs-masterPass-01-js前端校验Pass-02-MIME校验Pass-03-其他后缀绕过黑名单Pass-04-.hatccess绕过Pass-05-点空格点代码逻辑绕过Pass-06-大小写绕过Pass-07-空格绕过Pass-08-点号绕过Pass-09-::$DATA绕过Pass-10-点空格…

2024年腾讯云8核16G18M服务器租用价格1668元15个月

2024年腾讯云8核16G18M服务器租用价格1668元15个月&#xff0c;270GB SSD云硬盘&#xff0c;3500GB月流量。 一张表看懂腾讯云服务器租用优惠价格表&#xff0c;一目了然&#xff0c;腾讯云服务器分为轻量应用服务器和云服务器CVM&#xff0c;CPU内存配置从2核2G、2核4G、4核8…

【开源】JAVA+Vue.js实现学校热点新闻推送系统

目录 一、摘要1.1 项目介绍1.2 项目录屏 二、功能模块2.1 新闻类型模块2.2 新闻档案模块2.3 新闻留言模块2.4 新闻评论模块2.5 新闻收藏模块 三、系统设计3.1 用例设计3.2 数据库设计3.2.1 新闻类型表3.2.2 新闻表3.2.3 新闻留言表3.2.4 新闻评论表3.2.5 新闻收藏表 四、系统展…

Python与FPGA——全局二值化

文章目录 前言一、Python全局128二、Python全局均值三、Python全局OTSU四、FPGA全局128总结 前言 为什么要进行图像二值化&#xff0c;rgb图像有三个通道&#xff0c;处理图像的计算量较大&#xff0c;二值化的图像极大的减少了处理图像的计算量。即便从彩色图像转成了二值化图…

HarmonyOS应用开发-环境搭建(windows环境)

官网地址&#xff1a;链接 DevEco Studio 3.1.1 Release&#xff1a;下载地址 1、安装DevEco Studio 直接安装即可 2、配置开发环境 1.运行已安装的DevEco Studio&#xff0c;首次使用&#xff0c;请选择Do not import settings&#xff0c;单击OK。 2.安装Node.js与ohpm。注…

STC89C52串口通信详解

目录 前言 一.通信基本原理 1.1串行通信与并行通信 1.2同步通信和异步通信 1.2.1异步通信 1.2.2同步通信 1.3单工、半双工与全双工通信 1.4通信速率 二.串口通信简介 2.1接口标准 2.2串口内部结构 2.3串口相关寄存器 三.串口工作方式 四.波特率计算 五.串口初始化步骤 六.实验…

【go语言开发】gorm库连接和操作mysql,实现一个简单的用户注册和登录

本文主要介绍使用gorm库连接和操作mysql&#xff0c;首先安装gorm和mysql依赖库&#xff1b;然后初始化mysql&#xff0c;配置连接池等基本信息&#xff1b;然后建表、完成dao、controller开发&#xff1b;最后在swagger中测试 文章目录 前言安装依赖库数据库初始化账号注册和登…

Hadoop运行搭建——系统配置和Hadoop的安装

Hadoop运行搭建 前言&#xff1a; 本文原文发在我自己的博客小站&#xff0c;直接复制文本过来&#xff0c;所以图片不显示(我还是太懒啦&#xff01;)想看带图版的请移步我的博客小站~ Linux镜像&#xff1a;CentOS7 系统安装&#xff1a;CentOS安装参考教程 系统网卡设置…

在win10中下载桌面版的docker并在docker中搭建运行基于linux的容器

在win10中下载桌面版的docker 1.背景 在很多时候需要linux系统部署项目&#xff0c;在win10中安装虚拟机并在虚拟机中安装linux系统比较繁琐&#xff0c;可以利用win10自带的hyper-v的虚拟机管理工具&#xff0c;打开该虚拟机管理工具&#xff0c;安装docker&#xff0c;并在…

【排序】希尔排序

一、思想 希尔排序&#xff0c;由D.L. Shell于1959年提出&#xff0c;是基于插入排序的一种改进算法。它的主要思想是将待排序的序列分割成若干个子序列&#xff0c;这些子序列的元素是相隔一定“增量”的。然后对每个子序列进行直接插入排序。随着增量的逐步减小&#xff0c;…

数学建模【整数规划】

一、整数规划简介 整数规划其实是线性规划和非线性规划的一个特殊情况&#xff0c;即有的变量取值只能是整数&#xff0c;不能是小数。这时候就需要一个新的函数来解决问题。 对于整数规划&#xff0c;分为线性整数规划和非线性整数规划 线性整数规划&#xff1a;MATLAB可进…