spark shuffle写操作——UnsafeShuffleWriter

news2024/9/25 23:22:41

PackedRecordPointer

使用long类型packedRecordPointer存储数据。
数据结构为:[24 bit partition number][13 bit memory page number][27 bit offset in page]

LongArray

LongArray不同于java中long数组。LongArray可以使用堆内内存也可以使用堆外内存。
MemoryLocation有两个变量obj、offset。可以表示内存的地址(堆内和堆外都可以)
堆内:obj是jvm对象的地址,offset是该对象的对象头大小。
堆外:obj是null,offset是堆外内存的地址。
MemoryBlock有三个变量obj、offset、length。表示一个内存块大小(堆内和堆外都可以)。
MemoryBlock是MemoryLocation子类
堆内:obj是jvm对象的地址,offset是该对象的对象头大小,length堆内内存大小
堆外:obj是null,offset是堆外内存的地址,length堆外内存大小
LongArray有四个变量memory、baseObj、baseOffset、length。
memory是MemoryBlock对象,表示占用内存块的大小
baseObj和baseOffset是用来确定内存的地址(堆内、堆外)
length表示可以保存long数据的量,所以就是内存大小除以8

LongArray存long数据,就是将long值放到index对应的内存地址。
index对应的地址就是baseObj和baseOffset+index*8

ShuffleInMemorySorter

  • array:类似long数组,存的是PackedRecordPointer,排序的时候是对这个数组进行排序,不是直接对消息进行排序,预留一部分空间用于排序
  • pos:新消息待插入位置
  • usableCapacity:longArray可以插入数据的容量。要留出部分空间用于排序
  • initialSize:初始内存大小

getUsableCapacity

usableCapacity变量是构造器中初始化调用getUsableCapacity。
getUsableCapacity是根据排序方法控制容量大小

reset

初始化pos、array、usableCapacity变量

expandPointerArray

  1. 数据从旧的arry迁移到新的array
  2. 释放旧的array内存
  3. 重新计算容量

insertRecord

生成long(包含partitionId、pageNumber、offset),放入longArray中

getSortedIterator

根据排序方法选择对应排序类。
RadixSort:https://baike.baidu.com/item/%E5%9F%BA%E6%95%B0%E6%8E%92%E5%BA%8F/7875498?fr=ge_ala
TimSort:https://zhuanlan.zhihu.com/p/695042849
最后生成ShuffleSorterIterator,此时只是partition有序

_SORT_COMPARATOR:_排序是比较partition,相同partition消息放在一起。

ShuffleSorterIterator是可一个类似iterator的类,它没有next方法,每次都是调用loadNext方法,将下一个值放入packedRecordPointer变量,再读取这个变量。

ShuffleExternalSorter

  • allocatedPages:申请下来用于存储数据的内存页集合
  • spills:因为内存不够,spill生成的文件
  • currentPage:当前往里写入的内存页
  • pageCursor:写入当前内存页的位置游标
  • peakMemoryUsedBytes:内存使用的峰值,这个这是用来在UI上展示

insertRecord

1.检查是否inMemSorter有空间写入新的long值,growPointerArrayIfNecessary
2.检查是否需要新的page,acquireNewPageIfNecessary
3.为消息生成在page的内存地址
4.将数据复制到page中
5.写入到inMemSorter

growPointerArrayIfNecessary

  1. 判断是否还有空间写入新的数据
  2. 申请两倍的使用空间大小的longArray
  3. 如果申请的page太大,会触发spill。page最大是17G,不知道会不会触发
  4. 触发过spill就调用freeArray释放longArray内存
  5. 申请到新的大容量的longArray,调用expandPointerArray进行扩容

spill

是调用spill(long size, MemoryConsumer trigger)方法

writeSortedFile将内存中的数据都写入到文件
freeMemory释放全部的数据对应的page

writeSortedFile

调用inMemSorter的getSortedIterator方法生成排好序的iterator。getSortedIterator方法可以在上面翻一下。此处只是对数据的long地址进行排序,不是对实际数据进行排序。

生成临时文件用来存放数据

首先生成临时文件对应的writer,然后遍历消息。
当分区发生变化,进行提交,生成分区对应的fileSegment。

根据内存数据地址找到对应数据。

  • recordPage数据存放的内存页
  • recordOffsetInPage数据在该内存页起始位置
  • dataRemaining数据的长度

将数据写入到文件中。

提交最后一个分区的写入,将分区信息写入到spillInfo中。
spill完成将对应的spillInfo保存到spills变量

freeMemory

遍历allocatedPages释放内存。初始化内存相关的变量。

acquireNewPageIfNecessary

如果page空间不够存放数据,申请新的page,更新相关的变量。

closeAndGetSpills

将缓存的数据写入到文件中,释放内存,关闭inMemSorter。

UnsafeShuffleWriter

write

遍历数据调用insertRecordIntoSorter写入到sorter中。
最后调用closeAndWriteOutput合并中间spill文件

insertRecordIntoSorter

将消息序列化成byte[],调用ShuffleExternalSorter的insertRecord方法。

closeAndWriteOutput

关闭sorter,将剩余的缓存数据生成文件。
调用mergeSpills将所有的spill文件合并成一个文件。

mergeSpills

  • 没有spill文件,直接生成空的data和index文件
  • 只有一个spill文件,没有合并文件的过程。调用transferMapSpillFile方法
  • 有多个spill文件,调用mergeSpillsUsingStandardWriter方法合并文件


LocalDiskSingleSpillMapOutputWriter的transferMapSpillFile方法是根据shuffleId、mapId生成临时的data数据文件,将spill文件重命名为临时data文件,最后生成正式data文件和index文件。

mergeSpillsUsingStandardWriter
根据compression和fastMerge选择对应的合并文件方式
1.transferTo-based fast merge:调用mergeSpillsWithTransferTo(spills, mapWriter)
2.fileStream-based fast merge:调用mergeSpillsWithFileStream(spills, mapWriter, null)
3.slow merge:调用mergeSpillsWithFileStream(spills, mapWriter, compressionCodec)
最后生成正式的data文件和index文件

mergeSpillsWithTransferTo

  1. 生成spill文件对应的channel
  2. 生成最终data临时文件的channel
  3. 对于每一个分区,遍历spill文件的channel将对应分区的数据写入的data临时文件

mergeSpillsWithFileStream

  1. 生成spill文件对应的stream
  2. 生成临时data文件对应的分区writer的stream
  3. 包装分区的stream,加上监控、加密、压缩等相关功能
  4. 对应每个分区,遍历spill文件的stream,加上limit、加密、压缩的功能,数据复制到分区writer的stream

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/1914708.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

构建高精度室内定位导航系统,从3DGIS到AI路径规划的全面解析

室内定位导航系统是一种利用多种技术实现室内精准定位和导航的智能系统,即便没有卫星信号,也能实现精准导航。维小帮室内定位导航系统是基于自研的地图引擎与先进定位技术,结合智能路径规划算法,解决了人们在大型复杂室内场所最后…

python怎么判断字符串以什么结尾

在python编辑器中新建一个data.py。 写上自己的注释。 然后新建一个变量testname。 利用endswith来判断字符串是不是以“ar”结尾。 将结果打印出来。 选择“run”->“run”。 运行该程序,如果是,就会返回true。

深度探讨:无法恢复主文件表的挑战与解决方案

在数字时代,数据的安全与恢复成为了不容忽视的重要议题。其中,主文件表(Master File Table, MFT)作为文件系统的核心组件,一旦受损或无法恢复,将直接导致数据访问的障碍,给用户带来巨大困扰。本…

Vue在一个页面调用另一个同级页面的方法

1、建个中转站 2、然后在两个页面都引入它,注意引入路径。 import Utils from src/utils/way 3、调用方的写法 //eg :Utils.$emit(demo, msg) 4、被调用方的写法 //eg :Utils.$on(demo, val>{})

Playwright使用教程【附爬取Leetcode题目URLs以及有道翻译小软件】

前言 playwright是微软设计的一款工具,可以爬取网页,还可以自动化测试自己编写的网站,而且不像bs4、request编写爬虫那么复杂,也不需要考虑反爬技术,只需要知道最基础的前端知识,就可以高效、便捷的编写爬…

【算法训练记录——Day43】

Day43——动态规划Ⅴ 1.kamacoder52_携带研究材料2.leetcode518_零钱兑换Ⅱ3.leetcode377_组合总和Ⅳ 完全背包 1.kamacoder52_携带研究材料 思路:这里每种材料可以选择无数次,因此属于完全背包, 首先回顾一下01背包的核心代码 for(int i 0…

vue3 + tsx 表格 Action 单独封装组件用法

前言 先上图看右侧列 action 的 UI 效果: 正常来说,如果一个表格的附带 action 操作,我们一般会放在最右侧的列里面实现,这个时候有些UI 框架支持在 SFC 模板里面定义额外的 solt,当然如果不支持,更通用的…

医疗器械网络安全 | 漏洞扫描、渗透测试没有发现问题,是否说明我的设备是安全的?

尽管漏洞扫描、模糊测试和渗透测试在评估系统安全性方面是非常重要和有效的工具,但即使这些测试没有发现任何问题,也不能完全保证您的医疗器械是绝对安全的。这是因为安全性的评估是一个多维度、复杂且持续的过程,涉及多个方面和因素。以下是…

7.10号小项目部分说明

总体说明 糖锅小助手 我这次主要对上次糖锅小助手界面添加了一个侧边栏(侧边输入框放置了三个按钮,可以跳转到其他ai聊天界面,还可以退出聊天界面回到登录界面)和一个日期输入框(日期输入框获取时间,根据时…

史上最齐全电动弃流装置(弃流控制柜/流量式雨水弃流装置)这里都有,井座式、304不锈钢材质

电动弃流装置组成部分有:PLC控制柜、雨量传感器/电磁流量计、弃流装置 进出水管可以定制不同接口,可以适用于连接波纹管、PVC管 电动弃流装置的工作原理如下: 首先,雨量传感器或电磁流量计实时监测降雨量或水流流量等相关数据&…

TCP 握手数据流

这张图详细描述了 TCP 握手过程中,从客户端发送 SYN 包到服务器最终建立连接的整个数据流转过程,包括网卡、内核、进程中的各个环节。下面对每个步骤进行详细解释: 客户端到服务器的初始连接请求 客户端发送 SYN 包: 客户端发起…

AI降痕工具:一键去除论文中的AI代写痕迹

在这个充满创意和创新的时代,AI已经渗透到我们生活的方方面面。然而,随着AI的飞速发展,AI的痕迹在论文创作中愈发明显。 这不禁让人思考,如何让论文回归纯粹,展现人类独有的思考和见解。“论文去AI痕迹”不仅是对学术…

vue3-openlayers WebGL加载地图(栅格切片、矢量切片)

本篇介绍一下使用vue3-openlayers WebGL加载地图(栅格切片、矢量切片) 1 需求 vue3-openlayers WebGL加载地图(栅格切片、矢量切片) 2 分析 栅格切片使用ol-webgl-tile-layer 矢量切片使用ol-vector-tile-layer(默…

海外视频媒体发布/发稿:如何在国外媒体以视频的形式宣发

1. 背景介绍 在如今数字化时代,每个国家都拥有着各自的视频媒体平台,而主流媒体也都纷纷加入了视频发布的行列。视频媒体的宣发形式主要包括油管Youtube等视频分享平台,以及图文配合的发布方式。通过在视频中夹带链接,媒体可以以…

服务器安全运维方案介绍

随着信息技术的飞速发展,服务器已成为企业信息化建设的核心基础设施。然而,服务器安全运维问题也日益凸显,如何保障服务器的稳定运行和数据安全,已成为企业面临的重要挑战。本文旨在介绍一套全面的服务器安全运维方案,…

『C + ⒈』‘\‘

&#x1f942;在反斜杠(\)有⒉种最常用的功能如下所示&#x1f44b; #define _CRT_SECURE_NO_WARNINGS 1 #include<stdio.h> int main(void) {int a 10;int b 20;int c 30;if (a 10 &&\b 20 &&\c 30){printf("Your print\n");}else{prin…

【leetcode刷题笔记】02.复写零

题目&#xff1a; 思路&#xff1a; 代码实现 class Solution { public:void duplicateZeros(vector<int>& arr) {//1.模拟异地操作int prev0,cur0;int lenarr.size();while(cur<len){//arr[prev]不是0就都走一步if(arr[prev]!0){prev;cur;}//arr[prev]是0就pre…

FlutterFlame游戏实践#15 | 生命游戏 - 演绎启动

theme: cyanosis 本文为稀土掘金技术社区首发签约文章&#xff0c;30天内禁止转载&#xff0c;30天后未获授权禁止转载&#xff0c;侵权必究&#xff01; Flutter\&Flame 游戏开发系列前言: 该系列是 [张风捷特烈] 的 Flame 游戏开发教程。Flutter 作为 全平台 的 原生级 渲…

78000A 信号分析软件

思仪(Ceyear) 78000A 信号分析软件 78000A 信号分析软件是一款能够在电脑上运行的应用软件&#xff0c;预留了开放式的 SCPI 控制指令&#xff0c;可以远程控制信号/频谱分析仪采集数据&#xff0c;也可以回放仿真数据或者采集的历史数据文件&#xff0c;执行通用频谱测量、矢…

Windows 下安装 Memcached

Memcached 安装包下载 官网上并未提供 Memcached 的 Windows 平台安装包。 我们可以使用以下链接来下载&#xff0c;你需要根据自己的系统平台及需要的版本号点击对应的链接下载即可&#xff1a; 32位系统 1.2.5版本&#xff1a;http://static.jyshare.com/download/memcache…