Spark-ShuffleWriter-UnsafeShuffleWriter-钨丝内存分配

news2024/12/23 22:18:11

一、上下文

《Spark-ShuffleWriter-UnsafeShuffleWriter》中提到在进行Page内存分配时,调用了一行代码

MemoryBlock page = memoryManager.tungstenMemoryAllocator().allocate(acquired);

 这里就会走MemoryManager的钨丝内存分配,下面我们来详细看下

二、模式设定

  final val tungstenMemoryMode: MemoryMode = {
    //spark.memory.offHeap.enabled  默认 false  即:使用堆上分配
    if (conf.get(MEMORY_OFFHEAP_ENABLED)) {
      //spark.memory.offHeap.size 默认 0 
      //堆外分配的绝对内存量
      //此设置对堆内存使用没有影响,因此,如果执行器的总内存消耗必须符合某个硬限制,那么一定要相应地缩小JVM堆大小。
      require(conf.get(MEMORY_OFFHEAP_SIZE) > 0,
        "spark.memory.offHeap.size must be > 0 when spark.memory.offHeap.enabled == true")
      //当运行JVM时,其中有sun的Unsafe包可用,并且底层系统具有未对齐的访问能力,则为true。
      require(Platform.unaligned(),
        "No support for unaligned Unsafe. Set spark.memory.offHeap.enabled to false.")
      MemoryMode.OFF_HEAP
    } else {
      MemoryMode.ON_HEAP
    }
  }

如果我们想使用堆外内存分配,必须满足3个条件

1、将spark.memory.offHeap.enabled设置为true

2、将spark.memory.offHeap.size设置为一个正数

3、运行JVM时,其中有sun的Unsafe包可用,并且底层系统具有未对齐的访问能力

三、堆上堆外区别

堆是什么?

为了严谨我们看看官网给出的解释:

https://docs.oracle.com/javase/specs/jvms/se8/html/jvms-2.html#jvms-2.5.3

Java虚拟机有一个堆,在所有Java虚拟机线程之间共享。堆是运行时数据区域,从中为所有类实例和数组分配内存。

堆是在虚拟机启动时创建的。对象的堆存储由自动存储管理系统(称为垃圾收集器)回收;对象永远不会被显式释放。Java虚拟机不采用特定类型的自动存储管理系统,可以根据实现者的系统要求选择存储管理技术。堆的大小可以是固定的,也可以根据计算的需要进行扩展,如果不需要更大的堆,则可以进行收缩。堆的内存不需要是连续的。

Java虚拟机实现可以为程序员或用户提供对堆初始大小的控制,以及如果堆可以动态扩展或收缩,则可以控制最大和最小堆大小。(-Xms,-Xmx等参数)

以下异常情况与堆有关:

如果计算需要比自动存储管理系统可用的堆更多的堆,Java虚拟机会抛出OutOfMemoryError。

堆上:ON_HEAP

Java分配的非空对象都是由JVM的gc管理的,这一部分称堆上内存分配。JVM会定期对垃圾内存进行回收,在某些特定的时间点,它会进行一次彻底的回收(full gc)。彻底回收时,垃圾收集器会对所有分配的堆内内存进行完整的扫描,这意味着会对Java应用造成性能影响。

堆上存储的都是对象,对象存储包含三个方面:

  1. 对象头‌:包含对象的元信息,如哈希码、锁信息等。在64位JVM上,对象头通常占用16字节的空间。
  2. 实例数据‌:存储对象的属性值。每个实例变量占用一定的空间,具体大小取决于变量类型和对齐要求。
  3. 对齐填充‌:为了保证对象在内存中的地址是8字节对齐的,可能会添加一些额外的填充字节。

在Spark的MemoryManager负责堆上分配的对象是HeapMemoryAllocator

堆外:OFF_HEAP

不受垃圾收集器管理的内存,受操作系统直接管理。因此只能存字节型数据,相比堆上分配更节省空间,寻址也更快。

在Spark的MemoryManager负责堆外分配的对象是UnsafeMemoryAllocator

四、HeapMemoryAllocator

1、allocate

  //WeakReference 是 Java 中用于实现弱引用的类。
  //当你希望引用一个对象,但是不希望这个对象被 JVM 的垃圾回收器(GC)视为垃圾回收的重要依据时,你可以使用弱引用。
  //弱引用所引用的对象一旦被垃圾回收器标记为可回收的对象,就会被自动清除(即使垃圾回收器在运行时还没有进行实际的回收动作)。
  //使用场景:
  //    1、缓存对象:当你需要缓存一些对象,并且希望在内存紧张的时候能够释放这些对象,那么弱引用可以很好地满足这种需求。
  //    2、监听器和事件处理:在事件监听器中,如果你希望监听器能够被垃圾回收器回收,但是又不希望在监听器不再使用的时候手动去移除监听器,那么弱引用可以很好地满足这种需求。
  //很明显我们属于第1种场景
  private final Map<Long, LinkedList<WeakReference<long[]>>> bufferPoolsBySize = new HashMap<>();
  //1M
  private static final int POOLING_THRESHOLD_BYTES = 1024 * 1024;

  private boolean shouldPool(long size) {
    // 非常小的分配不太可能从池中受益。
    return size >= POOLING_THRESHOLD_BYTES;
  }

  public MemoryBlock allocate(long size) throws OutOfMemoryError {
    //多少个字,申请的内存都是 1 Byte 的倍数
    int numWords = (int) ((size + 7) / 8);
    //校准后的内存大小 单位 Byte
    long alignedSize = numWords * 8L;
    assert (alignedSize >= size);
    //是否满足池机制 即申请的内存 >= 1/8 M 即 128 KB
    if (shouldPool(alignedSize)) {
      synchronized (this) {
        //bufferPoolsBySize种存的是已经释放的内存,如果正好有这部分内存,可以直接拿来用
        final LinkedList<WeakReference<long[]>> pool = bufferPoolsBySize.get(alignedSize);
        if (pool != null) {
          //循环这个缓存池 pool
          while (!pool.isEmpty()) {
            //取出一个 数组引用
            final WeakReference<long[]> arrayReference = pool.pop();
            final long[] array = arrayReference.get();
            if (array != null) {
              assert (array.length * 8L >= size);
              //重新封装成 MemoryBlock  即 Page 三个参数
              //    1、Long[]
              //    2、去除头的偏移量
              //    3、实际数据长度
              //这样就方便程序直接操作数据的那块内存
              //Platform 是对 Unsafe 的封装
              MemoryBlock memory = new MemoryBlock(array, Platform.LONG_ARRAY_OFFSET, size);
              //spark.memory.debugFill 默认  false
              if (MemoryAllocator.MEMORY_DEBUG_FILL_ENABLED) {
                memory.fill(MemoryAllocator.MEMORY_DEBUG_FILL_CLEAN_VALUE);
              }
              return memory;
            }
          }
          //将alignedSize 移除,表示其已经被用了
          bufferPoolsBySize.remove(alignedSize);
        }
      }
    }
    //重新申请内存
    long[] array = new long[numWords];
    MemoryBlock memory = new MemoryBlock(array, Platform.LONG_ARRAY_OFFSET, size);
    if (MemoryAllocator.MEMORY_DEBUG_FILL_ENABLED) {
      memory.fill(MemoryAllocator.MEMORY_DEBUG_FILL_CLEAN_VALUE);
    }
    return memory;
  }

 最终申请的内存如图所示:

 MemoryBlock是管理内存中的这一页数据的对象

 MemoryBlock

public class MemoryBlock extends MemoryLocation {

  //未由TaskMemoryManagers分配的页面的特殊“pageNumber”值
  public static final int NO_PAGE_NUMBER = -1;

  //用于标记TaskMemoryManager中已释放页面的特殊“pageNumber”值
  public static final int FREED_IN_TMM_PAGE_NUMBER = -2;

  //MemoryAllocator(内存分配器释)放的页面的特殊“pageNumber”值。这使我们能够检测到双重释放。
  public static final int FREED_IN_ALLOCATOR_PAGE_NUMBER = -3;

  private final long length;

  //可选页码;当此内存块表示由TaskMemoryManager分配的页面时使用。
  //此字段是公共的,因此可以由位于不同包中的TaskMemoryManager进行修改
  public int pageNumber = NO_PAGE_NUMBER;

  public MemoryBlock(@Nullable Object obj, long offset, long length) {
    super(obj, offset);  // 就是一个 long [] 
    this.length = length;  //数据真实需要的大小
  }

  //返回内存块的大小
  public long size() {
    return length;
  }

  //创建指向 long [] 使用的内存的内存块
  public static MemoryBlock fromLongArray(final long[] array) {
    return new MemoryBlock(array, Platform.LONG_ARRAY_OFFSET, array.length * 8L);
  }

  //用指定的字节值填充内存块
  public void fill(byte value) {
    Platform.setMemory(obj, offset, length, value);
  }


}

2、free

 public void free(MemoryBlock memory) {
    //如果要释放内存,必须满足以下几个条件
    //    1、之前申请过内存,也就是 long []  是存在的
    assert (memory.obj != null) :
      "baseObject was null; are you trying to use the on-heap allocator to free off-heap memory?";
    //    2、内存之前没有被释放
    assert (memory.pageNumber != MemoryBlock.FREED_IN_ALLOCATOR_PAGE_NUMBER) :
      "page has already been freed";
    //    3、TMM分配的页面必须首先通过TMM.freePage()释放,而不是直接在分配器free()中释放
    assert ((memory.pageNumber == MemoryBlock.NO_PAGE_NUMBER)
            || (memory.pageNumber == MemoryBlock.FREED_IN_TMM_PAGE_NUMBER)) :
      "TMM-allocated pages must first be freed via TMM.freePage(), not directly in allocator " +
        "free()";

    //申请的内存大小
    final long size = memory.size();
    //spark.memory.debugFill 默认 false
    //是否分别用0xa5和0x5a字节填充新分配和释放的内存。这有助于发现未初始化或已释放内存的误用,但会带来一些开销。
    if (MemoryAllocator.MEMORY_DEBUG_FILL_ENABLED) {
      memory.fill(MemoryAllocator.MEMORY_DEBUG_FILL_FREED_VALUE);
    }

    // 将页面标记为已释放(这样我们就可以检测到双重释放)
    memory.pageNumber = MemoryBlock.FREED_IN_ALLOCATOR_PAGE_NUMBER;

    // 作为防止在释放bug后使用的额外防御层,我们对MemoryBlock进行了修改,以清空其对long[]数组的引用
    long[] array = (long[]) memory.obj;
    //将 页中的 long[] 置为null 且偏移量 置为 0
    memory.setObjAndOffset(null, 0);

    //校准后的内存分配大小 
    long alignedSize = ((size + 7) / 8) * 8;
    //判断释放满足最小分配大小 即:128 KB ,如果小于它也是不会申请页成功的
    if (shouldPool(alignedSize)) {
      synchronized (this) {
        LinkedList<WeakReference<long[]>> pool = bufferPoolsBySize.get(alignedSize);
        if (pool == null) {
          pool = new LinkedList<>();
          bufferPoolsBySize.put(alignedSize, pool);
        }
        //如果下次还有alignedSize的内存申请可以直接用
        pool.add(new WeakReference<>(array));
      }
    } else {
      // Do nothing
    }
  }

五、UnsafeMemoryAllocator

1、allocate

  public MemoryBlock allocate(long size) throws OutOfMemoryError {
    //使用Unsafe来从堆外分配怎么大的内存,不用校准
    long address = Platform.allocateMemory(size);
    //同样构建一个MemoryBlock 只是将obj 设置成了 null 且 address 设置成了直接地址
    MemoryBlock memory = new MemoryBlock(null, address, size);
    //spark.memory.debugFill 默认 false
    if (MemoryAllocator.MEMORY_DEBUG_FILL_ENABLED) {
      memory.fill(MemoryAllocator.MEMORY_DEBUG_FILL_CLEAN_VALUE);
    }
    return memory;
  }

2、free

  public void free(MemoryBlock memory) {
    //释放内存前先检查 和堆上一样
    assert (memory.obj == null) :
      "baseObject not null; are you trying to use the off-heap allocator to free on-heap memory?";
    assert (memory.pageNumber != MemoryBlock.FREED_IN_ALLOCATOR_PAGE_NUMBER) :
      "page has already been freed";
    assert ((memory.pageNumber == MemoryBlock.NO_PAGE_NUMBER)
            || (memory.pageNumber == MemoryBlock.FREED_IN_TMM_PAGE_NUMBER)) :
      "TMM-allocated pages must be freed via TMM.freePage(), not directly in allocator free()";

    
    if (MemoryAllocator.MEMORY_DEBUG_FILL_ENABLED) {
      memory.fill(MemoryAllocator.MEMORY_DEBUG_FILL_FREED_VALUE);
    }
    //调用Unsafe来释放内存
    Platform.freeMemory(memory.offset);
    //修改了MemoryBlock以重置其指针。
    memory.offset = 0;
    // 将页面标记为已释放(这样我们就可以检测到双重释放)。
    memory.pageNumber = MemoryBlock.FREED_IN_ALLOCATOR_PAGE_NUMBER;
  }

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/2145098.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

python运行时错误:找不到fbgemm.dll

python运行时错误&#xff1a;找不到fbgemm.dll 报错&#xff1a; OSError: [WinError 126] 找不到指定的模块。 Error loading "D:\program\py\312\Lib\site-packages\torch\lib\fbgemm.dll" or one of its dependencies. 原因是Windows下缺失&#xff1a;libomp140…

Mastering openFrameworks_第十一章_网络

网络 网络为多个设备之间的数据交换提供了一种方式。它是一个主要组成部分&#xff0c;允许远程控制移动和平板设备应用程序中的一些参数&#xff0c;也用于使交互式项目在多台计算机上同步工作。在本章中&#xff0c;您将学习如何在openFrameworks项目中实现和使用OSC和TCP协…

BrainSegFounder:迈向用于神经影像分割的3D基础模型|文献速递--Transformer架构在医学影像分析中的应用

Title 题目 BrainSegFounder: Towards 3D foundation models for neuroimagesegmentation BrainSegFounder&#xff1a;迈向用于神经影像分割的3D基础模型 01 文献速递介绍 人工智能&#xff08;AI&#xff09;与神经影像分析的融合&#xff0c;特别是多模态磁共振成像&am…

系统安装CH384串口卡驱动

1. 解压驱动文件CH38XDRV.tar&#xff0c;并进入驱动目录 cd CH38XDRV/DRV_28S/LINUX/driver$ 2. 编译 sudo make edgeedge-PC:~/CH38XDRV/DRV_28S/LINUX/driver$ sudo make 请输入密码: 验证成功 make -C /lib/modules/4.19.0-arm64-desktop/build M/home/edge/CH38XDRV/DRV…

2024年【四川省安全员B证】新版试题及四川省安全员B证考试试卷

题库来源&#xff1a;安全生产模拟考试一点通公众号小程序 四川省安全员B证新版试题参考答案及四川省安全员B证考试试题解析是安全生产模拟考试一点通题库老师及四川省安全员B证操作证已考过的学员汇总&#xff0c;相对有效帮助四川省安全员B证考试试卷学员顺利通过考试。 1、…

数据库事务的详解

1、 介绍 什么是事务? 事务是一个原子操作。是一个最小执行单元。可以由一个或多个SQL语句组成&#xff0c;在同一个事务当中&#xff0c;所有的SQL语句都成功执行时&#xff0c;整个事务成功&#xff0c;有一个SQL语句执行失败&#xff0c;整个事务都执行失败。(一组操作同时…

计算机人工智能前沿进展-大语言模型方向-2024-09-14

计算机人工智能前沿进展-大语言模型方向-2024-09-14 1. Multimodal learning using large language models to improve transient identification of nuclear power plants B Qi, J Sun, Z Sui, X Xiao, J Liang - Progress in Nuclear Energy, 2024 使用大型语言模型进行多…

Html在线编辑器

Html在线编辑器提供富文本编辑器,在线文章编辑器,富文本编辑器,Html在线编辑器使用&#xff0c;具有高级功能的Html在线编辑器可全屏编辑,Web版Html在线编辑器在线使用,文章,网站编辑,微信公众号可以在线使用编辑器功能等。

select系统调用(实现I/O复用)

API 在一段指定时间内&#xff0c;监听用户感兴趣的文件描述符上的可读、可写、异常事件。 int select(int nfds, fd_set *readfds, fd_set *writefds,fd_set *exceptfds, struct timeval *timeout);文件描述符集合fd_set 是一个用于管理文件描述符集合的结构体。select调用…

flutter集成百度地图定位 ‘BMKLocationManager.h‘ file not found报错

一、写在前面 好久不见~最近接手了一个flutter的项目&#xff0c;需求是接入百度地图的定位插件。但是按照官网的文档来做&#xff0c;安卓没有问题&#xff0c;但是ios就惨了&#xff0c;各种编译报错。 flutter_bmflocation: ^3.6.0 集成报错 ‘BMKLocationManager.h’ fil…

Renesas R7FA8D1BH (Cortex®-M85)内部RTC的应用

目录 概述 1 软硬件 1.1 软硬件环境信息 1.2 开发板信息 1.3 调试器信息 2 FSP配置RTC 2.1 配置参数 2.2 RTC模块介绍 3 RTC相关函数 3.1 R_RTC_Open() 3.2 R_RTC_Close() 3.3 R_RTC_ClockSourceSet() 3.4 R_RTC_CalendarTimeSet() 3.5 R_RTC_CalendarTimeGet()…

HC-SR04超声波传感器详解(STM32)

目录 一、介绍 二、传感器原理 1.原理图 2.引脚描述 3.工作原理介绍 三、程序设计 main.c文件 ultrasonic.h文件 ultrasonic.c文件 四、实验效果 五、资料获取 项目分享 一、介绍 HC-SR04超声波传感器是通过发送和接收超声波&#xff0c;利用时间差和声音传播速度…

Python编码系列—Python团队开发工作流:高效协作的艺术

&#x1f31f;&#x1f31f; 欢迎来到我的技术小筑&#xff0c;一个专为技术探索者打造的交流空间。在这里&#xff0c;我们不仅分享代码的智慧&#xff0c;还探讨技术的深度与广度。无论您是资深开发者还是技术新手&#xff0c;这里都有一片属于您的天空。让我们在知识的海洋中…

煤炭检测系统源码分享

煤炭检测检测系统源码分享 [一条龙教学YOLOV8标注好的数据集一键训练_70全套改进创新点发刊_Web前端展示] 1.研究背景与意义 项目参考AAAI Association for the Advancement of Artificial Intelligence 项目来源AACV Association for the Advancement of Computer Vision …

A股上市公司企业创新能力、质量、效率-原始数据+dofile+结果(2006-2023年)

上市公司的创新能力体现在其不断研发新技术、新产品和服务的能力上&#xff0c;这是企业保持竞争优势的关键&#xff1b;质量则是指公司所提供的产品或服务达到高标准的程度&#xff0c;高质量是赢得客户信任和市场份额的基础&#xff1b;效率则涵盖了生产运营中的资源利用程度…

天线工程师进阶指南:只会割铜皮式调天线,就Out了!跨学科天线设计介绍

❝本次推文简单介绍下跨学科天线设计。 什么是天线&#xff1f; 天线是一种变换器&#xff0c;它把传输线上传播的导行波&#xff0c;变换成在无界媒介&#xff08;通常是自由空间&#xff09;中传播的电磁波&#xff0c;或者进行相反的变换。 发射天线可以将来自发射机的高频…

资源创建方式

kubernetes支持两种创建资源的方式&#xff1a; 用kubectl命令直接创建&#xff0c;比如&#xff1a;kubectl run nginx-deployment --imagenginx1.7.9 --replicas2&#xff0c;在命令行中通过参数指定资源的属性 通过配置文件和kubectl apply创建&#xff0c;创建nginx.yml文…

9月18日

思维导图 配置桥接网络的过程 配置桥接网络 确保虚拟机提供了桥接模式菜单栏>编辑>虚拟机网络编辑器确保虚拟机可以设置桥接网络&#xff08;如无法通过桥接连接网络&#xff0c;则可以还原设置后重新尝试&#xff0c;如果还不行则找到VMware的软件安装包&#xff0c;双…

Pc端关于不同PDF阅读器的实际体验

因为马上研究生开学了&#xff0c;平时也会阅读很多pdf&#xff0c;实际上我们电脑上也自带一个pdf阅读的软件&#xff1a;也就是我们的edge&#xff0c;但是还是可能有些不够我们使用。下面是一些容易获取到的软件资源。 下面的评价仅是个人观点&#xff0c;请理性看待。 一…

【Pycharm使用技巧记录手册】批量检索与替换功能——辅助Yolo训练标签label配置文件构建

在yolo训练前的准备工作中&#xff0c;需要编写yaml配置文件中的信息。对于多分类问题&#xff0c;需要将其类别与索引一一对应。实践中&#xff0c;类别与索引的关系可能写在字典数据格式内&#xff0c;如何将其转换为配置文件内的信息&#xff0c;这是一个看起来简单但如果纯…