flink内存管理(二):MemorySegment的设计与实现:(1)架构、(2)管理堆内/外内存、(3)写入/读取内存、(4)垃圾清理器

news2025/1/16 4:58:47

文章目录

  • 一. MemorySegment架构概览
  • 二. MemorySegment详解
      • 1.基于MemorySegment管理堆内存
      • 2.基于MemorySegment管理堆外内存
      • 3.基于Unsafe管理MemorySegment
      • 4.写入和读取内存数据
      • 5.创建MemoryCleaner垃圾清理器

  • 在flink内存管理(一)中我们已经知道:在Flink中会将对象序列化成二进制格式数据,然后写入预先分配的内存块,而这个内存块就是MemorySegment。
  • MemorySegments作为Flink内存管理的最小内存分配单元,能够申请堆内存和堆外内存空间,并对上层提供丰富且高效的内存数据读写方法。

一. MemorySegment架构概览

在flink1.16.1中MemorySegment已作为单独的一个类用于处理:堆内内存、堆外直接内存或堆外不安全内存。

单独的原因:关于效率的注意事项:为了获得最佳效率,MemorySegment不会通过继承来分离不同内存类型的实现,以避免在调用抽象方法时寻找具体实现的开销。

在这里插入图片描述

MemorySegment架构描述

  1. 为了尽可能避免直接实例化MemorySegment对象,Flink通过MemorySegmentFactory工厂类创建了MemorySegment。这是因为使用工厂模式控制类的创建,能够帮助JIT执行虚化(de-virtualized)和内联(inlined)的性能优化。
  2. DataOutputView接口扩展了java.io.DataOutput接口,提供了对一个或多个MemorySegment执行写入操作的视图。使用DataOutputView提供的方法可以灵活高效地将数据按顺序写入连续的MemorySegment内存块中
  3. DataInputView接口扩展了java.io.DataInput接口,提供了对一个或多个MemorySegment执行读取操作的视图。使用DataInputView提供的方法可以灵活高效地按顺序读取MemorySegment中的内存数据
  4. MemoryManager主要用于管理排序、哈希和缓存等操作对应的内存空间,且这些操作主要集中在离线计算场景中。
  5. NetworkBufferPool通过MemorySegmentFactory申请用于存储NetworkBuffer的MemorySegment内存空间

 

JIT编译

JIT(即时编译)是一种在程序运行时将代码编译成机器码的编译技术。JIT编译的优势在于它可以根据程序的实际运行情况进行优化,使得在特定平台上达到更好的性能。

MemorySegment是系统最底层的内存管理单元,可想而知,MemorySegment在整个系统中的使用频率是非常高的。在JIT编译过程中,最好的处理方式就是明确需要调用的方法,早期MemorySegment因为是独立的Final类,JIT编译时要调用的方法都是确定的。

之前的版本将HybridMemorySegment和HeapMemorySegment两个子类加载到JVM,此时JIT编译器只有在真正执行方法的时候才会确认是哪一个子类的方法,这样就无法提前判断使用哪一个实现的虚方法,也就无法直接调用,就会影响JVM的性能。

 

二. MemorySegment详解

1.基于MemorySegment管理堆内存

我们已经知道,MemorySegment只能通过MemorySegmentFactory创建,并且在MemorySegmentFactory中直接提供了基于堆内存创建MemorySegment的方法。

如下代码所示,在MemorySegmentFactory.wrap()方法中可以直接将byte[] buffer数组封装成MemorySegment,其中byte[]数组中的内存空间实际上就是从堆内存中申请的。

/**
*创建一个以给定堆内存区域为新的memory segments。
此方法应用于将短期字节数组转换为memory segments.
*/
public static MemorySegment wrap(byte[] buffer) {  
    return new MemorySegment(buffer, null);  
}

除了将已有的byte[]数组空间转换成MemorySegment之外,在MemorySegmentFactory中同时提供了通过分配堆内存空间创建MemorySegment的方法。如代码所示,在MemorySegmentFactory.allocateUnpooledSegment()方法中通过指定参数size申请固定数量的byte[]数组,这里new byte[size]的操作实际上就是从堆内存申请内存空间。

//分配一些非池化内存并创建一个代表该内存的新内存段。
//此方法类似于allocateUnpooledSegment(int, Object) ,但内存段的所有者为 null。
public static MemorySegment allocateUnpooledSegment(int size) {  
    return allocateUnpooledSegment(size, null);  
}

 

在MemorySegment构造器中,提供了对byte[] buffer堆内存进行初始化的逻辑,在方法中首先将buffer赋值给heapMemory,然后将address设定为BYTE_ARRAY_BASE_OFFSET,表示byte[]数组内容的起始部分,然后根据数组对象和偏移量获取元素值(getObject)。

设定offHeapBuffer和cleaner为空。offHeapBuffer和cleaner主要在OffHeap中使用,owner参数表示当前的所有者,通常情况下设定为空。

//创建一个新的memory segment,表示字节数组的内存。
//由于字节数组由堆内存支持,因此该内存段将其数据保存在堆上。缓冲区的大小必须至少为 8 字节。
//memory segment引用给定的owner
MemorySegment(@Nonnull byte[] buffer, @Nullable Object owner) {  
    this.heapMemory = buffer;  
    this.offHeapBuffer = null;  
    this.size = buffer.length;  
    this.address = BYTE_ARRAY_BASE_OFFSET;  
    this.addressLimit = this.address + this.size;  
    this.owner = owner;  
    this.allowWrap = true;  
    this.cleaner = null;  
    this.isFreedAtomic = new AtomicBoolean(false);  
}

2.基于MemorySegment管理堆外内存

在MemorySegment中通过ByteBuffer.allocateDirect(numBytes)方法申请堆外内存,然后用sun.misc.Unsafe对象操作堆外内存。

如下代码,在MemorySegmentFactory.allocateOffHeapUnsafeMemory()方法中,

  • 调用MemoryUtils.allocateUnsafe(size)方法获取堆外内存空间的地址,
  • 然后调用MemoryUtils.wrapUnsafeMemoryWithByteBuffer()方法从给定的内存地址中申请内存空间,并转换成ByteBuffer,
  • 最后通过HybridMemorySegment对象封装ByteBuffer,并返回给使用方进行使用。
/**
分配堆外不安全内存并创建一个新的内存段来表示该内存。
该段的创建会在其 java 包装对象即将被垃圾回收时调度其内存释放操作,类似于java.nio.DirectByteBuffer.DirectByteBuffer(int) 。不同之处在于,此内存分配不受选项 -XX:MaxDirectMemorySize 限制。
**/
public static MemorySegment allocateOffHeapUnsafeMemory(  
        int size, Object owner, Runnable customCleanupAction) {  
    long address = MemoryUtils.allocateUnsafe(size);  
    ByteBuffer offHeapBuffer = MemoryUtils.wrapUnsafeMemoryWithByteBuffer(address, size);  
    Runnable cleaner = MemoryUtils.createMemoryCleaner(address, customCleanupAction);  
    return new MemorySegment(offHeapBuffer, owner, false, cleaner);  
}

如代码清单8-6所示,在MemoryUtils.wrapUnsafeMemoryWithByteBuffer()方法中,

  • 首先调用DIRECT_BUFFER_CONSTRUCTOR.newInstance(address, size)方法从堆外内存中申请内存空间并转换为ByteBuffer对象。
  • 对于DIRECT_BUFFER_CONSTRUCTOR变量,实际上就是创建DirectByteBuffer对应的私有构造器,调用allocateInstance()方法就能直接从堆外内存中创建DirectByteBuffer对象。
//用ByteBuffer包装不安全的native memory
static ByteBuffer wrapUnsafeMemoryWithByteBuffer(long address, int size) {  
    //noinspection OverlyBroadCatchBlock  
    try {  
        ByteBuffer buffer = (ByteBuffer) UNSAFE.allocateInstance(DIRECT_BYTE_BUFFER_CLASS);  
        UNSAFE.putLong(buffer, BUFFER_ADDRESS_FIELD_OFFSET, address);  
        UNSAFE.putInt(buffer, BUFFER_CAPACITY_FIELD_OFFSET, size);  
        buffer.clear();  
        return buffer;  
    } catch (Throwable t) {  
        throw new Error("Failed to wrap unsafe off-heap memory with ByteBuffer", t);  
    }  
}

在MemoryUtils中,DIRECT_BUFFER_CONSTRUCTOR通过反射获取。通过java.nio.DirectByteBuffer构造器创建ByteBuffer内存对象,并将其封装在MemorySegment中。

private static Class<?> getClassByName(  
        @SuppressWarnings("SameParameterValue") String className) {  
    try {  
        return Class.forName(className);  
    } catch (ClassNotFoundException e) {  
        throw new Error("Could not find class '" + className + "' for unsafe operations.", e);  
    }  
}

接下来就可以通过MemorySegment使用申请到的堆外内存存储数据了,数据最终会以二进制的形式存储在指定地址的堆外内存空间中,再看下构造方法。

public static MemorySegment allocateOffHeapUnsafeMemory(  
        int size, Object owner, Runnable customCleanupAction) {  
    long address = MemoryUtils.allocateUnsafe(size);  
    ByteBuffer offHeapBuffer = MemoryUtils.wrapUnsafeMemoryWithByteBuffer(address, size);  
    Runnable cleaner = MemoryUtils.createMemoryCleaner(address, customCleanupAction);  
    return new MemorySegment(offHeapBuffer, owner, false, cleaner);  
}

 

3.基于Unsafe管理MemorySegment

使用unsafe操作内存
MemorySegment能够同时操作堆内存和堆外内存,得益于sun.misc.Unsafe类的实现,Unsafe类提供了一系列可以直接操作内存的方法。sun.misc.Unsafe目前也已经下沉到MemorySegment中实现。

需要注意的是,Unsafe并不是JDK的标准实现,而是Sun的内部实现,存在于sun.misc包中,在Oracle发行的JDK中并不包含其源代码。
 
虽然我们在一般的并发编程中不会直接用到Unsafe,但对于很多Java基础类库,如Netty、Cassandra和Kafka等高性能的框架,基本都会使用Unsafe操作内存空间。Unsafe在提升Java运行效率以及增强Java语言底层操作内存的能力方面起了很大作用。

借助unsafe可以操作堆内、堆外内存使用
如代码所示,MemorySegment中使用sun.misc.Unsafe实现了对内存空间的管理,MemorySegment将创建出来的Unsage对象存储至静态变量,供所有MemorySegment持有者操作堆内存和堆外内存使用。

/** The unsafe handle for transparent memory copied (heap / off-heap). */  
@SuppressWarnings("restriction")  
private static final sun.misc.Unsafe UNSAFE = MemoryUtils.UNSAFE;

MemorySegment中的Unsafe对象主要通过MemoryUtils创建,和其他框架使用Unsafe库一样,都是通过反射的方式创建的,MemorySegment中的Unsafe创建完毕后,可以通过Unsafe库操作和管理堆内存和堆外内存空间。

 

4.写入和读取内存数据

在MemorySegment中将int类型的数据写入内存中

MemorySegment.putInt()方法中,需要传递:值+内存位置

  • (1)int value值以及(2)value对应的内存位置(index),其中int类型数据大小为4字节。
  • 根据address以及index参数计算位置偏移量(pos),然后调用UNSAFE.putInt()方法将int数值写入指定的内存空间。
/**将给定的 int 值(32 位,4 个字节)写入系统本机字节顺序中的给定位置。
此方法为整数写入提供了最佳速度,除非需要特定的字节顺序,否则应使用此方法。
在大多数情况下,只要知道写入值的字节顺序与读取值的字节顺序相同就足够了
(例如内存中的瞬时存储,或者 I/O 和网络的序列化),使得此方法更好的选择。
**/
public void putInt(int index, int value) {  
    final long pos = address + index;  
    if (index >= 0 && pos <= addressLimit - 4) {  
        UNSAFE.putInt(heapMemory, pos, value);  
    } else if (address > addressLimit) {  
        throw new IllegalStateException("segment has been freed");  
    } else {  
        // index is in fact invalid  
        throw new IndexOutOfBoundsException();  
    }  
}


 
读取MemorySegment中int类型的数据

在MemorySegment.getInt()方法中需要指定:位置+偏移量

  • (1)读取int值的位置信息
  • (2)根据公式address+index计算位置偏移量(pos),address实际上就是前面创建的arrayBaseOffset指标,即基本类型数组的偏移量;
  • (3)通过数组对象和偏移量获取int元素值。
/**
按照系统的本机字节顺序从给定位置读取 int 值(32 位,4 字节)。
此方法提供了整数读取的最佳速度,除非需要特定的字节顺序,否则应使用此方法。
在大多数情况下,只要知道写入值的字节顺序与读取值的字节顺序相同就足够了
(例如内存中的瞬时存储,或者 I/O 和网络的序列化),使得此方法更好的选择
**/
public int getInt(int index) {  
    final long pos = address + index;  
    if (index >= 0 && pos <= addressLimit - 4) {  
        return UNSAFE.getInt(heapMemory, pos);  
    } else if (address > addressLimit) {  
        throw new IllegalStateException("segment has been freed");  
    } else {  
        // index is in fact invalid  
        throw new IndexOutOfBoundsException();  
    }  
}

 

5.创建MemoryCleaner垃圾清理器

内存泄漏的情况

通常情况下,直接申请堆外内存是不安全的,一旦JVM进程无法直接访问动态内存,即无法通过指针指向一个可访问的起始对象并访问该对象的内存空间时**,内存空间就会变为不可访问内存**,在使用人工内存管理中,系统不可访问内存最终会导致内存泄漏

flink提供了MemoryCleaner

因此在创建堆外内存空间的同时,Flink也会调用MemoryUtils.createMemoryCleaner()方法创建内存垃圾清理器,专门用于回收和释放没有被正常释放的内存资源,防止内存泄漏。

 
createMemoryCleaner封装了两个逻辑:

  • 清理内存:UNSAFE.freeMemory(address)
  • 自定义清理策略:customCleanup
//MemorySegmentFactory. 创建MemoryCleaner
public static MemorySegment allocateOffHeapUnsafeMemory(  
        int size, Object owner, Runnable customCleanupAction) {  
    long address = MemoryUtils.allocateUnsafe(size);  
    ByteBuffer offHeapBuffer = MemoryUtils.wrapUnsafeMemoryWithByteBuffer(address, size);  
    Runnable cleaner = MemoryUtils.createMemoryCleaner(address, customCleanupAction);  
    return new MemorySegment(offHeapBuffer, owner, false, cleaner);  
}


//在MemoryUtils中,创建MemoryCleaner
/**
创建一个清理器来释放不安全的内存。
参数:
address – 要释放的不安全内存的地址 customCleanup – 清理 GC 的自定义操作
返回:
手动运行以释放不安全内存的操作
**/
static Runnable createMemoryCleaner(long address, Runnable customCleanup) {  
    return () -> {  
    
        releaseUnsafe(address);  
        customCleanup.run();  
    };  
}

private static void releaseUnsafe(long address) {  
    UNSAFE.freeMemory(address);  
}

customCleanup在flink内部定义了两种情况的清理逻辑:

  1. 内存大小来释放内存
package org.apache.flink.runtime.memory.UnsafeMemoryBudget.
//根据内存大小来释放内存
void releaseMemory(@Nonnegative long size) {  
    if (size == 0) {  
        return;  
    }  
    boolean released = false;  
    long currentAvailableMemorySize = 0L;  
    while (!released  
            && totalMemorySize  
                    >= (currentAvailableMemorySize = availableMemorySize.get()) + size) {  
        released =  
                availableMemorySize.compareAndSet(  
                        currentAvailableMemorySize, currentAvailableMemorySize + size);  
    }  
    if (!released) {  
        throw new IllegalStateException(  
                String.format(  
                        "Trying to release more managed memory (%d bytes) than has been allocated (%d bytes), the total size is %d bytes",  
                        size, currentAvailableMemorySize, totalMemorySize));  
    }  
}
  1. 没有清理策略,直接清理
private static final Runnable NO_OP = () -> {};

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/1401005.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

四.Winform使用Webview2加载本地HTML页面并互相通信

Winform使用Webview2加载本地HTML页面并互相通信 往期目录本节目标核心代码实现HTML代码实现的窗体Demo2代码效果图 往期目录 往期相关文章目录 专栏目录 本节目标 实现刷新按钮点击 C# winform按钮可以调用C# winform代码显示到html上点击HTML按钮可以调用C# winform代码更…

Python schedule任务调度及其用法

如果需要执行更复杂的任务调度&#xff0c;则可使用 Python 提供的 sched 模块。该模块提供了 sched.scheduler 类&#xff0c;该类代表一个任务调度器。 sched.scheduler(timefunctime.monotonic, delayfunctime.sleep) 构造器支持两个参数&#xff1a; timefunc&#xff1a…

思迅商旗-loaddata-信息泄露-未公开Day漏洞复现

0x01阅读须知 本文章仅供参考&#xff0c;此文所提供的信息只为网络安全人员对自己所负责的网站、服务器等&#xff08;包括但不限于&#xff09;进行检测或维护参考。本文章仅用于信息安全防御技术分享&#xff0c;因用于其他用途而产生不良后果,作者不承担任何法律责任&#…

比特币狂人引爆达沃斯论坛

点击查看TechubNews原文链接&#xff1a;比特币狂人引爆达沃斯论坛 比特币狂人、自称无政府资本主义者的阿根廷总统米莱在达沃斯的最新演讲引爆社交网络大讨论。 1 月 15 日&#xff0c;第 54 届世界经济论坛在瑞士阿尔卑斯山的达沃斯开幕。来自约 60 个国家首脑和跨国公司的领…

UE5 - Polycam扫描文件导入插件

Polycam是利用Gaussian Splatting进行3D重建的3D扫描相关软件&#xff0c;其对应有UE引擎的插件&#xff08;Plugin_XV3dGS&#xff09;可以把相关格式的文件导入到引擎&#xff1b; 首先Polycam的官网为&#xff1a;My Captures | Polycam 可以下载各种用户扫描文件&#xff…

Linux中的共享内存

定义&#xff1a; 共享内存允许两个或者多个进程共享物理内存的同一块区域&#xff08;通常被称为段&#xff09;。由于一个共享内存段会称为一个进程用户空间的一部分&#xff0c;因此这种 IPC 机制无需内核介入。所有需要做的就是让一个进程将数 据复制进共享内存中&#xff…

Mysql运维篇(三) MySQL数据库分库分表方案

一路走来&#xff0c;所有遇到的人&#xff0c;帮助过我的、伤害过我的都是朋友&#xff0c;没有一个是敌人&#xff0c;如有侵权请留言&#xff0c;我及时删除。 一、前言 关系型数据库本身比较容易成为系统瓶颈&#xff0c;单机存储容量、连接数、处理能力都有限。当单表的数…

Liunx系统和Window系统有什么区别

在信息技术世界里&#xff0c;操作系统扮演着至关重要的角色&#xff0c;它负责管理和控制计算机硬件与软件资源。Linux和Windows是市面上两个最流行的操作系统。接下来&#xff0c;我们将深入研究这两种操作系统的主要差异。 核心体系结构及源代码访问&#xff1a; 首先&#…

多线程-线程状态和线程安全(加锁-synchronized 关键字)

目录 1.线程状态 示例&#xff1a; 1.1线程状态和状态转移的意义 2.线程安全 2.1观察线程不安全 2.2线程不安全的原因 3.synchronized 关键字 - 监视器锁 monitor lock 3.1synchronized 的特性 1. 互斥 2.可重⼊ 应用示例&#xff1a; 3.2synchronized 使⽤⽰例 1.…

简单了解AJAX

文章目录 1、什么是AJAX2、AJAX快速入门3、Axios异步框架3.1、Axios 快速入门3.2、Axios 请求方式别名 1、什么是AJAX 概念&#xff1a;AJAX(Asynchronous JavaScript And XML)&#xff1a;异步的 JavaScript 和 XML AJAX作用&#xff1a; 与服务器进行数据交换&#xff1a;通…

【Unity学习笔记】New Input System 部分源码和测试用例补充

转载请注明出处&#xff1a;&#x1f517;https://blog.csdn.net/weixin_44013533/article/details/135630016 作者&#xff1a;CSDN|Ringleader| 主要参考&#xff1a; Unity官方Input System手册与API【Unity学习笔记】Unity TestRunner使用【Unity学习笔记】第十二 New Inp…

k8s资源介绍

Kubernetes架构图 Kubernetes系统用于管理分布式节点集群中的微服务或容器化应用程序&#xff0c;并且其提供了零停机时间部署、自动回滚、缩放和容器的自愈&#xff08;其中包括自动配置、自动重启、自动复制的高弹性基础设施&#xff0c;以及容器的自动缩放等&#xff09;等…

java黑马学习笔记

数组 变量存在栈中&#xff0c;变量值存放在堆中。 数组反转 public class test{public static void main(String[] args){//目标&#xff1a;完成数组反转int[] arr {10,20,30,40,50};for (int i 0,j arr.length - 1;i < j;i,j--){int tep arr[j]; //后一个值赋给临时…

数学建模常见算法的通俗理解(2)

目录 6 K-Means&#xff08;K-均值&#xff09;聚类算法&#xff08;无需分割数据即可分类&#xff09; 6.1 粗浅理解 6.2 算法过程 6.2.1 选定质心 6.2.2 分配点 6.2.3 评价 7 KNN算法&#xff08;K近邻算法&#xff09;&#xff08;K个最近的决定方案&#xff09; 7.…

怎么从视频中提取动图?一个方法快速提取gif

视频以连续的方式播放一系列图像帧&#xff0c;通过每秒播放的帧数&#xff08;帧率&#xff09;来创做&#xff0c;由于GIF动图则以循环播放一系列静态图像帧的方式展现动画效果。由于视频的优势在于流畅的动画、丰富的细节和长时间播放&#xff0c;因此常用于电影、电视节目、…

DAG最小路径点覆盖,最小路径可重复覆盖,详解

文章目录 前言有向无环图的最小路径点覆盖概念拆点二分图定理**证明** 最小路径可重复覆盖解决策略代码实现 OJ练习 前言 关于二分图&#xff1a;二分图及染色法判定 关于二分图最大匹配&#xff1a;二分图最大匹配——匈牙利算法详解 关于二分图带权最大完备匹配&#xff1…

Docker使用及部署python项目

一、准备项目 ​ 我写的是一个爬取某ppt网站的代码&#xff0c;就一个ppt1.py是爬虫&#xff0c;然后&#xff0c;ppts是存放下载的ppt的 二、准备requirement.txt文件 这个是需要哪些python库支持&#xff0c;写好 ​ 三、准备Dockerfile文件 需要一个名为Dockerfile的文件&…

基于SpringBoot的船运物流管理系统

文章目录 项目介绍主要功能部分代码展示设计总结项目获取方式 &#x1f345; 作者主页&#xff1a;超级无敌暴龙战士塔塔开 &#x1f345; 简介&#xff1a;Java领域优质创作者&#x1f3c6;、 简历模板、学习资料、面试题库【关注我&#xff0c;都给你】 &#x1f345;文末获取…

计算机组成原理04:一位乘法

原码的一位乘法是基于加法设计的。回想我们在竖式计算乘法时&#xff0c;都是通过一个数与另外一个数的另外一位相乘&#xff0c;最后相加得到结果。计算机计算原码一位乘法也是一样的原理。这里就涉及到计算时一个非常重要的操作&#xff1a;数据移位。 原码乘法问题分析 需…

13.浮动面板(PaletteSet)

愿你出走半生,归来仍是少年&#xff01; 环境&#xff1a;.NET FrameWork4.5、ObjectArx 2016 64bit、Entity Framework 6. 在CAD中进行通用组件开发或常驻界面的控件开发时&#xff0c;可使用PaletteSet作为停靠面板&#xff0c;然后将自己的空间放入其中。 1.示例 SearchRe…