一、上下文
《Spark-ShuffleWriter-UnsafeShuffleWriter》中提到在进行Page内存分配时,调用了一行代码
MemoryBlock page = memoryManager.tungstenMemoryAllocator().allocate(acquired);
这里就会走MemoryManager的钨丝内存分配,下面我们来详细看下
二、模式设定
final val tungstenMemoryMode: MemoryMode = {
//spark.memory.offHeap.enabled 默认 false 即:使用堆上分配
if (conf.get(MEMORY_OFFHEAP_ENABLED)) {
//spark.memory.offHeap.size 默认 0
//堆外分配的绝对内存量
//此设置对堆内存使用没有影响,因此,如果执行器的总内存消耗必须符合某个硬限制,那么一定要相应地缩小JVM堆大小。
require(conf.get(MEMORY_OFFHEAP_SIZE) > 0,
"spark.memory.offHeap.size must be > 0 when spark.memory.offHeap.enabled == true")
//当运行JVM时,其中有sun的Unsafe包可用,并且底层系统具有未对齐的访问能力,则为true。
require(Platform.unaligned(),
"No support for unaligned Unsafe. Set spark.memory.offHeap.enabled to false.")
MemoryMode.OFF_HEAP
} else {
MemoryMode.ON_HEAP
}
}
如果我们想使用堆外内存分配,必须满足3个条件
1、将spark.memory.offHeap.enabled设置为true
2、将spark.memory.offHeap.size设置为一个正数
3、运行JVM时,其中有sun的Unsafe包可用,并且底层系统具有未对齐的访问能力
三、堆上堆外区别
堆是什么?
为了严谨我们看看官网给出的解释:
https://docs.oracle.com/javase/specs/jvms/se8/html/jvms-2.html#jvms-2.5.3
Java虚拟机有一个堆,在所有Java虚拟机线程之间共享。堆是运行时数据区域,从中为所有类实例和数组分配内存。
堆是在虚拟机启动时创建的。对象的堆存储由自动存储管理系统(称为垃圾收集器)回收;对象永远不会被显式释放。Java虚拟机不采用特定类型的自动存储管理系统,可以根据实现者的系统要求选择存储管理技术。堆的大小可以是固定的,也可以根据计算的需要进行扩展,如果不需要更大的堆,则可以进行收缩。堆的内存不需要是连续的。
Java虚拟机实现可以为程序员或用户提供对堆初始大小的控制,以及如果堆可以动态扩展或收缩,则可以控制最大和最小堆大小。(-Xms,-Xmx等参数)
以下异常情况与堆有关:
如果计算需要比自动存储管理系统可用的堆更多的堆,Java虚拟机会抛出OutOfMemoryError。
堆上:ON_HEAP
Java分配的非空对象都是由JVM的gc管理的,这一部分称堆上内存分配。JVM会定期对垃圾内存进行回收,在某些特定的时间点,它会进行一次彻底的回收(full gc)。彻底回收时,垃圾收集器会对所有分配的堆内内存进行完整的扫描,这意味着会对Java应用造成性能影响。
堆上存储的都是对象,对象存储包含三个方面:
- 对象头:包含对象的元信息,如哈希码、锁信息等。在64位JVM上,对象头通常占用16字节的空间。
- 实例数据:存储对象的属性值。每个实例变量占用一定的空间,具体大小取决于变量类型和对齐要求。
- 对齐填充:为了保证对象在内存中的地址是8字节对齐的,可能会添加一些额外的填充字节。
在Spark的MemoryManager负责堆上分配的对象是HeapMemoryAllocator
堆外:OFF_HEAP
不受垃圾收集器管理的内存,受操作系统直接管理。因此只能存字节型数据,相比堆上分配更节省空间,寻址也更快。
在Spark的MemoryManager负责堆外分配的对象是UnsafeMemoryAllocator
四、HeapMemoryAllocator
1、allocate
//WeakReference 是 Java 中用于实现弱引用的类。
//当你希望引用一个对象,但是不希望这个对象被 JVM 的垃圾回收器(GC)视为垃圾回收的重要依据时,你可以使用弱引用。
//弱引用所引用的对象一旦被垃圾回收器标记为可回收的对象,就会被自动清除(即使垃圾回收器在运行时还没有进行实际的回收动作)。
//使用场景:
// 1、缓存对象:当你需要缓存一些对象,并且希望在内存紧张的时候能够释放这些对象,那么弱引用可以很好地满足这种需求。
// 2、监听器和事件处理:在事件监听器中,如果你希望监听器能够被垃圾回收器回收,但是又不希望在监听器不再使用的时候手动去移除监听器,那么弱引用可以很好地满足这种需求。
//很明显我们属于第1种场景
private final Map<Long, LinkedList<WeakReference<long[]>>> bufferPoolsBySize = new HashMap<>();
//1M
private static final int POOLING_THRESHOLD_BYTES = 1024 * 1024;
private boolean shouldPool(long size) {
// 非常小的分配不太可能从池中受益。
return size >= POOLING_THRESHOLD_BYTES;
}
public MemoryBlock allocate(long size) throws OutOfMemoryError {
//多少个字,申请的内存都是 1 Byte 的倍数
int numWords = (int) ((size + 7) / 8);
//校准后的内存大小 单位 Byte
long alignedSize = numWords * 8L;
assert (alignedSize >= size);
//是否满足池机制 即申请的内存 >= 1/8 M 即 128 KB
if (shouldPool(alignedSize)) {
synchronized (this) {
//bufferPoolsBySize种存的是已经释放的内存,如果正好有这部分内存,可以直接拿来用
final LinkedList<WeakReference<long[]>> pool = bufferPoolsBySize.get(alignedSize);
if (pool != null) {
//循环这个缓存池 pool
while (!pool.isEmpty()) {
//取出一个 数组引用
final WeakReference<long[]> arrayReference = pool.pop();
final long[] array = arrayReference.get();
if (array != null) {
assert (array.length * 8L >= size);
//重新封装成 MemoryBlock 即 Page 三个参数
// 1、Long[]
// 2、去除头的偏移量
// 3、实际数据长度
//这样就方便程序直接操作数据的那块内存
//Platform 是对 Unsafe 的封装
MemoryBlock memory = new MemoryBlock(array, Platform.LONG_ARRAY_OFFSET, size);
//spark.memory.debugFill 默认 false
if (MemoryAllocator.MEMORY_DEBUG_FILL_ENABLED) {
memory.fill(MemoryAllocator.MEMORY_DEBUG_FILL_CLEAN_VALUE);
}
return memory;
}
}
//将alignedSize 移除,表示其已经被用了
bufferPoolsBySize.remove(alignedSize);
}
}
}
//重新申请内存
long[] array = new long[numWords];
MemoryBlock memory = new MemoryBlock(array, Platform.LONG_ARRAY_OFFSET, size);
if (MemoryAllocator.MEMORY_DEBUG_FILL_ENABLED) {
memory.fill(MemoryAllocator.MEMORY_DEBUG_FILL_CLEAN_VALUE);
}
return memory;
}
最终申请的内存如图所示:
MemoryBlock是管理内存中的这一页数据的对象
MemoryBlock
public class MemoryBlock extends MemoryLocation {
//未由TaskMemoryManagers分配的页面的特殊“pageNumber”值
public static final int NO_PAGE_NUMBER = -1;
//用于标记TaskMemoryManager中已释放页面的特殊“pageNumber”值
public static final int FREED_IN_TMM_PAGE_NUMBER = -2;
//MemoryAllocator(内存分配器释)放的页面的特殊“pageNumber”值。这使我们能够检测到双重释放。
public static final int FREED_IN_ALLOCATOR_PAGE_NUMBER = -3;
private final long length;
//可选页码;当此内存块表示由TaskMemoryManager分配的页面时使用。
//此字段是公共的,因此可以由位于不同包中的TaskMemoryManager进行修改
public int pageNumber = NO_PAGE_NUMBER;
public MemoryBlock(@Nullable Object obj, long offset, long length) {
super(obj, offset); // 就是一个 long []
this.length = length; //数据真实需要的大小
}
//返回内存块的大小
public long size() {
return length;
}
//创建指向 long [] 使用的内存的内存块
public static MemoryBlock fromLongArray(final long[] array) {
return new MemoryBlock(array, Platform.LONG_ARRAY_OFFSET, array.length * 8L);
}
//用指定的字节值填充内存块
public void fill(byte value) {
Platform.setMemory(obj, offset, length, value);
}
}
2、free
public void free(MemoryBlock memory) {
//如果要释放内存,必须满足以下几个条件
// 1、之前申请过内存,也就是 long [] 是存在的
assert (memory.obj != null) :
"baseObject was null; are you trying to use the on-heap allocator to free off-heap memory?";
// 2、内存之前没有被释放
assert (memory.pageNumber != MemoryBlock.FREED_IN_ALLOCATOR_PAGE_NUMBER) :
"page has already been freed";
// 3、TMM分配的页面必须首先通过TMM.freePage()释放,而不是直接在分配器free()中释放
assert ((memory.pageNumber == MemoryBlock.NO_PAGE_NUMBER)
|| (memory.pageNumber == MemoryBlock.FREED_IN_TMM_PAGE_NUMBER)) :
"TMM-allocated pages must first be freed via TMM.freePage(), not directly in allocator " +
"free()";
//申请的内存大小
final long size = memory.size();
//spark.memory.debugFill 默认 false
//是否分别用0xa5和0x5a字节填充新分配和释放的内存。这有助于发现未初始化或已释放内存的误用,但会带来一些开销。
if (MemoryAllocator.MEMORY_DEBUG_FILL_ENABLED) {
memory.fill(MemoryAllocator.MEMORY_DEBUG_FILL_FREED_VALUE);
}
// 将页面标记为已释放(这样我们就可以检测到双重释放)
memory.pageNumber = MemoryBlock.FREED_IN_ALLOCATOR_PAGE_NUMBER;
// 作为防止在释放bug后使用的额外防御层,我们对MemoryBlock进行了修改,以清空其对long[]数组的引用
long[] array = (long[]) memory.obj;
//将 页中的 long[] 置为null 且偏移量 置为 0
memory.setObjAndOffset(null, 0);
//校准后的内存分配大小
long alignedSize = ((size + 7) / 8) * 8;
//判断释放满足最小分配大小 即:128 KB ,如果小于它也是不会申请页成功的
if (shouldPool(alignedSize)) {
synchronized (this) {
LinkedList<WeakReference<long[]>> pool = bufferPoolsBySize.get(alignedSize);
if (pool == null) {
pool = new LinkedList<>();
bufferPoolsBySize.put(alignedSize, pool);
}
//如果下次还有alignedSize的内存申请可以直接用
pool.add(new WeakReference<>(array));
}
} else {
// Do nothing
}
}
五、UnsafeMemoryAllocator
1、allocate
public MemoryBlock allocate(long size) throws OutOfMemoryError {
//使用Unsafe来从堆外分配怎么大的内存,不用校准
long address = Platform.allocateMemory(size);
//同样构建一个MemoryBlock 只是将obj 设置成了 null 且 address 设置成了直接地址
MemoryBlock memory = new MemoryBlock(null, address, size);
//spark.memory.debugFill 默认 false
if (MemoryAllocator.MEMORY_DEBUG_FILL_ENABLED) {
memory.fill(MemoryAllocator.MEMORY_DEBUG_FILL_CLEAN_VALUE);
}
return memory;
}
2、free
public void free(MemoryBlock memory) {
//释放内存前先检查 和堆上一样
assert (memory.obj == null) :
"baseObject not null; are you trying to use the off-heap allocator to free on-heap memory?";
assert (memory.pageNumber != MemoryBlock.FREED_IN_ALLOCATOR_PAGE_NUMBER) :
"page has already been freed";
assert ((memory.pageNumber == MemoryBlock.NO_PAGE_NUMBER)
|| (memory.pageNumber == MemoryBlock.FREED_IN_TMM_PAGE_NUMBER)) :
"TMM-allocated pages must be freed via TMM.freePage(), not directly in allocator free()";
if (MemoryAllocator.MEMORY_DEBUG_FILL_ENABLED) {
memory.fill(MemoryAllocator.MEMORY_DEBUG_FILL_FREED_VALUE);
}
//调用Unsafe来释放内存
Platform.freeMemory(memory.offset);
//修改了MemoryBlock以重置其指针。
memory.offset = 0;
// 将页面标记为已释放(这样我们就可以检测到双重释放)。
memory.pageNumber = MemoryBlock.FREED_IN_ALLOCATOR_PAGE_NUMBER;
}