文章目录
- 一、项目介绍
- 二、什么是内存池
- 2.1 池化技术
- 2.2 内存池
- 2.3 内存池的作用
- 2.4 malloc
- 三、设计定长内存池
- 四、高并发内存池整体框架设计
- 六、threadcache
- 6.1 threadcache整体设计
- 6.2 threadcache哈希桶映射对齐规则
- 6.3 编写对齐和映射的相关函数
- 6.4 编写ThreadCache类
- 6.5 thread cacheTLS无锁访问
- 七、central cache
- 7.1 central cache整体设计
- 7.2 central cache结构设计
- span的结构
- span双链表结构
- central cache的结构
- 7.3 central cache类
- 单例模式创建
- 慢开始反馈调节算法
- 八、page cache
- 8.1 page cache整体设计
- 8.2 page cache申请释放
- 8.3 page cache类
- 九、项目源码
一、项目介绍
本项目是实现一个高并发的内存池,它的原型是Google的一个开源项目tcmalloc,tcmalloc全称Thread-Caching Malloc,即线程缓存的malloc,实现了高效的多线程内存管理,用于替代系统的内存分配相关的函数(malloc、free)。
本项目是模拟tcmalloc最核心的简化框架,实现一个简易的高并发内存池,目的是学习tcmalloc的精华,与博主先前模拟实现STL库来学习STL容器的方式类似。
这个项目会用到C/C++、数据结构(链表、哈希桶)、操作系统内存管理、单例模式、多线程、互斥锁等等方面的知识。
二、什么是内存池
2.1 池化技术
所谓“池化技术”,就是程序先向系统申请过量的资源,然后自己管理,以备不时之需。
之所以要申请过量的资源,是因为每次申请该资源都有较大的开销,不如提前申请一些资源放入“池”中,当需要资源时直接从“池”中获取,不需要时就将该资源重新放回“池”中即可。这样使用时就会变得非常快捷,可以大大提高程序的运行效率。
在计算机中,有很多使用“池”这种技术的地方,除了内存池,还有连接池、线程池、对象池等。以服务器上的线程池为例,它的主要思想是:先启动若干数量的线程,让它们处于睡眠状态,当接收到客户端的请求时,唤醒池中某个睡眠的线程,让它来处理客户端的请求,当处理完这个请求,线程又进入睡眠状态。
2.2 内存池
内存池是指程序预先向操作系统申请一块足够大的内存,此后,当程序中需要申请内存的时候,不是直接向操作系统申请,而是直接从内存池中获取;同理,当释放内存的时候,并不是真正将内存返回给操作系统,而是将内存返回给内存池。当程序退出时(或某个特定时间),内存池才将之前申请的内存真正释放。
2.3 内存池的作用
内存池主要解决的就是效率的问题,它能够避免让程序频繁的向系统申请和释放内存。其次,内存池作为系统的内存分配器,还需要尝试解决内存碎片的问题。
内存碎片分为内部碎片和外部碎片:
- 外部碎片是一些空闲的小块内存区域,由于这些内存空间不连续,以至于合计的内存足够,但是不能满足一些内存分配申请需求。
- 内部碎片是由于一些对齐的需求,导致分配出去的空间中一些内存无法被利用。
注意: 内存池尝试解决的是外部碎片的问题,同时也尽可能的减少内部碎片的产生。
2.4 malloc
C/C++中我们要动态申请内存并不是直接去堆申请的,而是通过malloc函数去申请的,包括C++中的new实际上也是封装了malloc函数的。我们申请内存块时是先调用malloc,malloc再去向操作系统申请内存。
malloc实际就是一个内存池,malloc相当于向操作系统“批发”了一块较大的内存空间,然后“零售”给程序用,当全部“售完”或程序有大量的内存需求时,再根据实际需求向操作系统“进货”。
[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-1eBB5KNH-1673140698313)(null)]
malloc的实现方式有很多种,一般不同编译器平台用的都是不同的。比如windows的vs系列用的微软自己写的一套,linux gcc用的glibc中的ptmalloc。
推荐文章:
- 一文了解,Linux内存管理,malloc、free 实现原理
- malloc的底层实现(ptmalloc)
- malloc()背后的实现原理
三、设计定长内存池
malloc是一个通用的内存池,在什么场景下都可以使用,但这也意味着malloc在什么场景下都不会有很高的性能,因为malloc并不是针对某种场景专门设计的。
定长内存池就是针对固定大小内存块的申请和释放的内存池,由于定长内存池只需要支持固定大小内存块的申请和释放,因此我们可以将其性能做到极致,并且在实现定长内存池时不需要考虑内存碎片等问题,因为我们申请/释放的都是固定大小的内存块。
我们可以通过实现定长内存池来熟悉一下对简单内存池的控制,其次,这个定长内存池后面会作为高并发内存池的一个基础组件。
**如何实现定长?**👈
比如我们可以使用非类型模板参数,使得在该内存池中申请到的对象的大小都是N。
template<size_t N>
class Pool
{};
在创建内存池时,内存池可以根据传入的对象类型的大小来实现“定长”,因此我们可以通过使用模板参数来实现“定长”,比如创建定长内存池时传入的对象类型是int,那么该内存池就只支持4字节大小内存的申请和释放。
template<class T>
class Pool
{};
**如何直接向堆申请空间?**👈
首先向系统申请一块内存空间,然后对其进行管理。要想直接向堆申请内存空间,在Windows下,可以调用VirtualAlloc函数;在Linux下,可以调用brk或mmap函数。
通过条件编译可以将对应平台下向堆申请内存的函数进行封装,这样就可以不再关心代码在不同平台使用的问题,提高了兼容性,当我们需要直接向堆申请内存时直接调用我们封装后的SystemAlloc函数即可。
#ifdef _WIN32
#include<windows.h> // VirtualAlloc
#else
//...
#endif
// 直接去堆上按页申请空间
inline static void* SystemAlloc(size_t kpage)
{
#ifdef _WIN32
void* ptr = VirtualAlloc(0, kpage << 13, MEM_COMMIT | MEM_RESERVE, PAGE_READWRITE);//绕过malloc,直接从内核申请内存
#else
// linux下brk mmap等
#endif
if (ptr == nullptr)
throw std::bad_alloc();
return ptr;
}
**定长内存池中应该包含哪些成员变量?**👈
对于向堆申请到的大块内存,我们可以用一个指针来对其进行管理,但仅用一个指针肯定是不够的,我们还需要用一个变量来记录这块内存的长度:_remainBytes。
由于此后我们需要将这块内存进行切分,为了方便切分操作,指向这块内存的指针最好是字符指针,因为指针的类型决定了指针向前或向后走一步有多大距离,对于字符指针来说,当我们需要向后移动n个字节时,直接对字符指针进行加n操作即可。
另外,释放回来的定长内存块也需要被管理,我们可以将这些释放回来的定长内存块链接成一个链表,这里我们将管理释放回来的内存块的链表叫做自由链表,为了能找到这个自由链表,我们还需要一个指向自由链表的指针:_freeList。这个自由链表中:每一个还回来的内存块都用前面的4/8字节来保存下一个内存块的地址,因此连接起来。
因此,定长内存池当中包含三个成员变量:
- _memory:指向大块内存的指针。
- _remainBytes:大块内存切分过程中剩余字节数。
- _freeList:还回来过程中链接的自由链表的头指针。
**内存池如何管理释放的对象?**👈
对于还回来的定长内存块,我们可以用自由链表将其链接起来,但我们并不需要为其专门定义链式结构,我们可以让内存块的前4个字节(32位平台)或8个字节(64位平台)作为指针,存储后面内存块的起始地址即可。
因此在向自由链表插入被释放的内存块时,先让该内存块的前4个字节或8个字节存储自由链表中第一个内存块的地址,然后再让_freeList指向该内存块即可,也就是一个简单的链表头插操作。
如何让一个指针在32位平台下解引用后能向后访问4个字节,在64位平台下解引用后能向后访问8个字节?
32位平台下指针的大小是4个字节,64位平台下指针的大小是8个字节。而指针指向数据的类型,决定了指针解引用后能向后访问的空间大小,因此我们这里需要的是一个指向指针的指针,这里使用二级指针就行了。
当我们需要访问一个内存块的前4/8个字节时,我们将该内存块的地址强转为二级指针类型,这个时候可以理解为:对该指针解引用所能访问的权限就变成了一个一级指针大小,而指针的大小取决于平台,因此在32位平台下访问的就是4个字节,在64位平台下访问的就是8个字节,此时我们访问到了该内存块的前4/8个字节。
static void*& NextObj(void* obj) //求链表的下一个地址
{
return *(void**)obj;
}
在释放对象时,我们应该显示调用该对象的析构函数清理该对象,因为该对象可能还管理着其他某些资源,如果不对其进行清理那么这些资源将无法被释放,就会导致内存泄漏。
//释放对象
void Delete(T* obj)
{
obj->~T();//显示调用T的析构函数清理对象
//头插
NextObj(obj) = _freeList;//obj的前 指针大小 的字节数里保存下一个内存的地址
_freeList = obj;
}
内存池申请对象(内存)的过程👈
- 内存池应该优先把还回来的内存块对象再次重复利用,因此如果自由链表当中有内存块的话,就直接从自由链表头删一个内存块进行返回即可。
[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-E18F3VHX-1673140698370)(null)]
- 如果自由链表当中没有内存块,就在大块内存中切出定长的内存块进行返回,当内存块切出后及时更新
_memory
指针的指向,以及_remainBytes
的值即可。
由于当内存块释放时我们需要将内存块链接到自由链表当中,因此我们必须保证切出来的对象至少能够存储得下一个指针大小的地址,所以当对象的大小小于当前所在平台指针的大小时,需要分配一个指针大小的内存块给申请者。
此外,当大块内存已经不足以切分出一个对象时,我们就应该调用我们封装的SystemAlloc函数,再次向堆申请一块内存空间,此时也要注意及时更新_memory
指针的指向,以及_remainBytes
的值。
T* New()
{
T* obj = nullptr;
// 优先使用归还回来的内存块对象,重复利用
if (_freeList)//不为空
{
obj = (T*)_freeList;//从自由链表头删一个对象
//_freeList里面保存的是第一个内存块的地址
_freeList = NextObj(_freeList);
}
else
{
// 剩余内存不够一个对象大小时,则重新开大块空间
if (_remainBytes < sizeof(T))
{
_remainBytes = 8 * 1024;
//这里相当于是申请一页物理内存
_memory = (char*)SystemAlloc(_remainBytes >> 13);//右移13位相当于除以8KB,得到的数值单位就是页
}
obj = (T*)_memory;
//如果 使用的类型的大小 比指针小,就给他一个指针大小的字节数(多开空间),便于后续回收
size_t objSize = (sizeof(T) < sizeof(void*) ? sizeof(void*) : sizeof(T));
_memory += objSize;
_remainBytes -= objSize;
}
// 定位new,显示调用T的构造函数初始化
new(obj)T;
return obj;
}
与释放对象时需要显示调用该对象的析构函数一样,当内存块切分出来后,我们也应该使用定位new,显示调用该对象的构造函数对其进行初始化。
定长内存池整体代码:
template<class T>
class ObjectPool
{
public:
T* New()
{
T* obj = nullptr;
// 优先使用归还回来的内存块对象,重复利用
if (_freeList)//不为空
{
obj = (T*)_freeList;//从自由链表头删一个对象
//_freeList里面保存的是第一个内存块的地址
_freeList = NextObj(_freeList);
}
else
{
// 剩余内存不够一个对象大小时,则重新开大块空间
if (_remainBytes < sizeof(T))
{
_remainBytes = 8 * 1024;
_memory = (char*)SystemAlloc(_remainBytes >> 13);//右移13位相当于除以8KB
}
obj = (T*)_memory;
//如果 使用的类型的大小 比指针小,就给他一个指针大小的字节数(多开空间),便于后续回收
size_t objSize = (sizeof(T) < sizeof(void*) ? sizeof(void*) : sizeof(T));
_memory += objSize;
_remainBytes -= objSize;
}
// 定位new,显示调用T的构造函数初始化
new(obj)T;
return obj;
}
//释放对象
void Delete(T* obj)
{
obj->~T();//显示调用T的析构函数清理对象
//头插
NextObj(obj) = _freeList;//obj的前 指针大小 的字节数里保存下一个内存的地址
_freeList = obj;
}
private:
char* _memory = nullptr; // 指向大块内存的指针
size_t _remainBytes = 0; // 大块内存在切分过程中剩余字节数
void* _freeList = nullptr; // 用来保存和指向归还内存的头指针,连成一个链表
};
因为malloc是一个通用的内存池,而定长内存池是专门针对申请定长对象而设计的,所以在这种特殊场景下定长内存池的效率更高。
四、高并发内存池整体框架设计
项目背景
现代很多的开发环境都是多核多线程,在申请内存的场景下,必然存在激烈的锁竞争问题。malloc本身其实已经很优秀,那么我们项目的原型tcmalloc就是在多线程高并发的场景下更胜一筹,所以这次我们实现的内存池需要考虑以下几方面的问题。
-
性能问题。
-
多线程环境下,锁竞争问题。
-
内存碎片问题。
高并发内存池主要由以下三个部分构成:
- thread cache: 线程缓存,是每个线程独有的,用于 小于等于256KB 的内存分配,线程从这里申请内存不需要加锁,每个线程独享一个cache,这也就是这个并发线程池高效的地方。
- central cache: 中心缓存,所有线程所共享的,当thread cache需要内存时会按需从central cache中获取内存,central cache在合适的时机回收thread cache中的对象,避免一个线程占用太多的内存,导致其他线程的内存吃紧,达到内存分配在多个线程中更均衡的按需调度的目的。central cache是存在竞争的,所以从这里取内存对象需要加锁,首先这里用的是桶锁,其次只有thread cache没有内存对象时才会找central cache,所以这里竞争不会很激烈。
- page cache: 页缓存,存储的内存是以页为单位进行存储及分配的,当central cache需要内存时,page cache会分配出一定数量的页并切割成定长大小的小块内存分配给central cache。当一个span的几个跨度页的对象都回收以后,page cache会回收central cache满足条件的span对象,并且合并相邻的页,组成更大的页,缓解内存碎片的问题。
总结:每个部分的作用
-
thread cache主要解决锁竞争的问题,每个线程独享自己的thread cache,当自己的thread cache中有内存时该线程不会去和其他线程进行竞争,每个线程只要在自己的thread cache申请内存就行了。
-
central cache主要起到一个居中调度的作用,每个线程的thread cache需要内存时就从central cache获取,而当thread cache的内存多了就会将内存还给central cache,其作用类似于一个中枢,因此取名为中心缓存。
-
page cache就负责提供以页为单位的大块内存,当central cache需要内存时就会去向page cache申请,而当page cache没有内存了就会直接去找系统,也就是直接去堆上按页申请内存块。
六、threadcache
6.1 threadcache整体设计
我们上面实现的定长内存池只支持固定大小的内存块的申请和释放,所以定长内存池只需要一个自由链表管理释放回来的内存块。现在我们要支持申请和释放不同大小的内存块,那么我们就需要多个自由链表来管理释放回来的多种不同大小的内存块,因此thread cache实际上一个哈希桶结构,每个桶中存放的都是一个自由链表,这些自由链表管理不同大小的内存块。每个线程都会有一个thread cache对象,这样每个线程在这里获取对象和释放对象时是无锁的。
thread cache支持小于等于256KB内存的申请,如果我们将每种字节数的内存块都用一个自由链表进行管理的话,那么此时我们就需要20多万个自由链表,光是存储这些自由链表的头指针就需要消耗大量内存。
所以我们可以让不同内存块根据他们的字节数大小来按照某种对齐规则分类,例如我们让这些字节数都按照8字节进行向上对齐,那么thread cache的结构就是下面这样的,当线程申请1 ~ 8字节的内存时会直接给出8字节,而当线程申请9 ~ 16字节的内存时会直接给出16字节,以此类推。
因此当线程要申请某一大小的内存块时,就需要经过某种计算得到对齐后的字节数,进而找到对应的哈希桶,如果该哈希桶中的自由链表中有内存块,那就从自由链表中头删一个内存块进行返回;如果该自由链表已经为空了,那么就需要向下一层的central cache进行获取。
但此时由于对齐的原因,就可能会产生一些碎片化的内存无法被利用,比如线程只申请了6字节的内存,而thread cache却直接给了8字节的内存,这多给出的2字节就无法被利用,导致了一定程度的空间浪费,这些因为某些对齐原因导致无法被利用的内存,就是内存碎片中的内部碎片。
申请内存:
-
当内存申请size<=256KB,先获取到线程本地存储的thread cache对象,计算size映射的哈希桶自由链表下标i。
-
如果自由链表_freeLists[i]中有对象,则直接Pop一个内存对象返回。
-
如果_freeLists[i]中没有对象时,则批量从central cache中获取一定数量的对象,插入到自由链表并返回一个对象。
释放内存:
-
当释放内存小于256k时将内存释放回thread cache,计算size映射自由链表桶位置i,将对象Push到_freeLists[i]。
-
当链表的长度过长,则回收一部分内存对象到central cache。
接下来我们对 自由链表结构 进行封装,先实现Push和Pop成员函数:
Push:将内存块头插到自由链表里
Pop:从自由链表头删一个内存块,并将该内存块返回给外部使用。
//管理切分好的小对象的 自由链表
class FreeList
{
public:
void Push(void* obj)//将释放的对象头插到自由链表
{
assert(obj);
// 头插
NextObj(obj) = _freeList;
_freeList = obj;
++_size;
}
void* Pop()//从自由链表头部获取一个对象
{
assert(_freeList);
// 头删
void* obj = _freeList;
_freeList = NextObj(obj);
--_size;
return obj;
}
private:
void* _freeList = nullptr;//自由链表
size_t _size = 0;//记录链表里的内存块个数
};
thread cache实际就是一个数组,数组中存储的就是一个个的自由链表,数组存储自由链表的个数取决于字节数对齐时使用的映射对齐规则。
6.2 threadcache哈希桶映射对齐规则
让不同范围的字节数按照不同的对齐数进行对齐,具体对齐方式如下:
字节数 | 对齐数 | 哈希桶下标 |
---|---|---|
[1, 128] | 8 | [0, 16) |
[128+1, 1024] | 16 | [16, 72) |
[1024+1, 8*1024] | 128 | [72, 128) |
[8 * 1024+1, 64 * 1024] | 1024 | [128, 184) |
[64 * 1024, 256 * 1024] | 8*1024 | [184, 208) |
空间浪费率计算:
$浪费率 = 浪费的字节数 / 对齐后的字节数 $,最大浪费率:分子取最大,分母取最小
比如129~1024这个区间,该区域的对齐数是16,那么最大浪费的字节数就是15,而最小对齐后的字节数就是这个区间内的前16个数所对齐到的字节数,也就是144,那么该区间的最大浪费率也就是$15 ÷ 144 ≈ 10.42 $%。同样的道理,后面两个区间的最大浪费率分别是 127 ÷ 1152 ≈ 11.02 127 ÷ 1152 ≈ 11.02 127÷1152≈11.02%和 1023 ÷ 9216 ≈ 11.10 1023 ÷ 9216 ≈ 11.10 1023÷9216≈11.10%。
6.3 编写对齐和映射的相关函数
有了字节数的对齐规则后,我们就需要提供两个对应的函数,分别用于获取某一字节数对齐后的字节数,以及该字节数对应的哈希桶下标。关于处理对齐和映射的函数,我们可以将其封装到一个类当中。
//管理对齐和映射等关系
class SizeClass
{
public:
//获取向上对齐后的字节数
static inline size_t RoundUp(size_t bytes);
//获取对应哈希桶的下标
static inline size_t Index(size_t bytes);
};
SizeClass类当中的成员函数最好设置为静态成员函数,否则我们在调用这些函数时就需要通过对象去调用,并且对于这些可能会频繁调用的函数,可以考虑将其设置为内联函数。
RoundUp:
位运算的速度更快。
// 计算对象大小的对齐规则
class SizeClass
{
public:
/*size_t _RoundUp(size_t bytes, size_t alignNum)//对齐数alignNum
{
size_t alignSize;//保存调整后的数字
if (bytes % alignNum != 0)
{
alignSize = (bytes / alignNum + 1)*alignNum;
}
else
{
alignSize = bytes;
}
return alignSize;
}*/
// 1-8
static inline size_t _RoundUp(size_t bytes, size_t alignNum)//对齐数alignNum
{
return ((bytes + alignNum - 1) & ~(alignNum - 1));
}
static inline size_t RoundUp(size_t bytes)//向上对齐,返回该字节对齐哪个数字
{
if (bytes <= 128)
{
return _RoundUp(bytes, 8);
}
else if (bytes <= 1024)
{
return _RoundUp(bytes, 16);
}
else if (bytes <= 8 * 1024)
{
return _RoundUp(bytes, 128);
}
else if (bytes <= 64 * 1024)
{
return _RoundUp(bytes, 1024);
}
else if (bytes <= 256 * 1024)
{
return _RoundUp(bytes, 8 * 1024);
}
else
{
//大于256KB的按页对齐
return _RoundUp(bytes, 1 << PAGE_SHIFT);
}
}
/*size_t _Index(size_t bytes, size_t alignNum)
{
if (bytes % alignNum == 0)
{
return bytes / alignNum - 1;
}
else
{
return bytes / alignNum;
}
}*/
static inline size_t _Index(size_t bytes, size_t align_shift)
{
return ((bytes + (1 << align_shift) - 1) >> align_shift) - 1;
}
// 计算映射的哪一个自由链表的 桶
static inline size_t Index(size_t bytes)
{
assert(bytes <= MAX_BYTES);
// 每个区间有多少个链
static int group_array[4] = { 16, 56, 56, 56 };//每一段的桶的个数
if (bytes <= 128) {
return _Index(bytes, 3);
}
else if (bytes <= 1024) {
//前面是计算该字节在当前段的第几个下标, 后面是加上前面所有段的桶的总数
return _Index(bytes - 128, 4) + group_array[0];
}
else if (bytes <= 8 * 1024) {
return _Index(bytes - 1024, 7) + group_array[1] + group_array[0];
}
else if (bytes <= 64 * 1024) {
return _Index(bytes - 8 * 1024, 10) + group_array[2] + group_array[1] + group_array[0];
}
else if (bytes <= 256 * 1024) {
return _Index(bytes - 64 * 1024, 13) + group_array[3] + group_array[2] + group_array[1] + group_array[0];
}
else {
assert(false);
}
return -1;
}
};
Index:
在获取某一字节数对应的哈希桶下标时,也是先判断该字节数属于哪一个区间,然后再通过调用一个子函数进行进一步处理。
// 计算对象大小的对齐规则
class SizeClass
{
public:
/*size_t _Index(size_t bytes, size_t alignNum)
{
if (bytes % alignNum == 0)
{
return bytes / alignNum - 1;
}
else
{
return bytes / alignNum;
}
}*/
//alignShift:对齐数写成2的n次方的形式后,所对应的n值
static inline size_t _Index(size_t bytes, size_t align_shift)
{
return ((bytes + (1 << align_shift) - 1) >> align_shift) - 1;
}
// 计算映射的哪一个自由链表的 桶
static inline size_t Index(size_t bytes)
{
assert(bytes <= MAX_BYTES);
// 每个区间有多少个链
static int group_array[4] = { 16, 56, 56, 56 };//每一段的桶的个数
if (bytes <= 128) {
return _Index(bytes, 3);//3: 2^3 = 8
}
else if (bytes <= 1024) {
//前面是计算该字节在当前段的第几个下标, 后面是加上前面所有段的桶的总数
return _Index(bytes - 128, 4) + group_array[0];
}
else if (bytes <= 8 * 1024) {
return _Index(bytes - 1024, 7) + group_array[1] + group_array[0];
}
else if (bytes <= 64 * 1024) {
return _Index(bytes - 8 * 1024, 10) + group_array[2] + group_array[1] + group_array[0];
}
else if (bytes <= 256 * 1024) {
return _Index(bytes - 64 * 1024, 13) + group_array[3] + group_array[2] + group_array[1] + group_array[0];
}
else {
assert(false);
}
return -1;
}
};
6.4 编写ThreadCache类
根据对齐规则可知,thread cache中桶的个数,也就是自由链表的个数是208,以及thread cache允许申请的最大内存大小256KB,我们可以将这些数据按照如下方式进行定义。
//小于等于MAX_BYTES,就找thread cache申请
//大于MAX_BYTES,就直接找page cache或者系统堆申请
static const size_t MAX_BYTES = 256 * 1024;
//thread cache和central cache自由链表哈希桶的表大小
static const size_t NFREELISTS = 208;
thread cache就是一个存储208个自由链表的数组。
#pragma once
#include "Common.h"
class ThreadCache
{
public:
//申请内存对象
void* Allocate(size_t size);
//释放内存对象
void Deallocate(void* ptr, size_t size);
// 从中心缓存获取内存对象
void* FetchFromCentralCache(size_t index, size_t size);
//释放对象导致链表过长,回收内存到中心缓存
void ListTooLong(FreeList& list, size_t size);
private:
FreeList _freeLists[NFREELISTS];
};
static _declspec(thread) ThreadCache* pTLS_ThreadCache = nullptr;
在thread cache申请对象时,通过申请的字节数计算出对应的哈希桶下标,如果对应的桶中自由链表不为空,则从该自由链表中取出一个对象进行返回即可;但如果此时自由链表为空,那么我们就需要从central cache进行获取,这里的FetchFromCentralCache函数也是thread cache类中的一个成员函数。
//申请内存
void* ThreadCache::Allocate(size_t size)
{
assert(size <= MAX_BYTES);
size_t alignSize = SizeClass::RoundUp(size);//对齐后,需要的字节个数
size_t index = SizeClass::Index(size);//该字节对应的段下标
if (!_freeLists[index].Empty())//该链表下面有空闲内存
{
return _freeLists[index].Pop();
}
else
{
return FetchFromCentralCache(index, alignSize);//到中心缓存申请
}
}
6.5 thread cacheTLS无锁访问
每个线程对应都有属于自己的thread cache,因此我们不能把thread cache设置为全局全局变量(全局变量被所有线程共享),这样的话就需要加锁控制。
我们可以使用线程局部存储TLS(Thread Local Storage),这种变量存储方法保证每个线程能无锁访问属于自己的thread cache。TLS:使用该存储方法的变量在它所在的线程是全局可访问的,但是不能被其他线程访问到,保证了数据的线程独立性。
TLS相关文章:
Thread Local Storage(线程局部存储)
//ThreadCache.h 头文件
class ThreadCache
{
public:
//申请内存对象
void* Allocate(size_t size);
private:
FreeList _freeLists[NFREELISTS]; //哈希桶
};
static _declspec(thread) ThreadCache* pTLS_ThreadCache = nullptr;
不是每个线程被创建时就立马有了属于自己的thread cache,而是当该线程调用相关申请内存的接口时才会创建自己的thread cache,因此在申请内存的函数中会包含以下逻辑。
//通过TLS,每个线程无锁的获取自己专属的ThreadCache对象
if (pTLS_ThreadCache == nullptr)
{
pTLS_ThreadCache = new ThreadCache;
}
七、central cache
7.1 central cache整体设计
central cache也是一个哈希桶结构,他的哈希桶的映射关系跟thread cache是一样的。不同的是他的每个哈希桶位置挂是SpanList链表结构,每个映射桶下面的span中的大内存块按映射关系切成了一个个小内存块对象挂在span的自由链表中。
补充:
thread cache是每个线程独享的,而central cache是所有线程共享的,因为每个线程的thread cache没有内存了都会去找central cache,因此在访问central cache时是要加锁的。 但central cache在加锁时并不是将整个central cache全部锁上了,central cache在加锁时用的是桶锁,也就是说每个桶都有一个锁。此时只有当多个线程同时访问central cache的同一个桶时才会存在锁竞争,如果是多个线程同时访问central cache的不同桶就不会存在锁竞争。
central cache与thread cache的第二个不同之处就是,thread cache的每个桶中挂的是一个个切好的内存块,而central cache的每个桶中挂的是一个个的span。每个span管理的都是一个以页为单位的大块内存,每个桶里面的若干span是按照双链表的形式链接起来的,并且每个span里面还有一个自由链表,这个自由链表里面挂的就是一个个切好了的内存块,根据其所在的哈希桶这些内存块被切成了对应的大小。
申请内存:
-
当thread cache中没有内存时,就会批量向central cache申请一些内存对象,这里的批量获取对象的数量使用了类似网络tcp协议拥塞控制的慢开始算法;central cache也有一个哈希映射spanlist,spanlist中挂着span,从span中取出对象给thread cache,这个过程是需要加锁的,不过这里使用的是一个桶锁,尽可能提高效率。
-
central cache映射的spanlist中所有span的都没有内存以后,则需要向page cache申请一个新的span对象,拿到span以后将span管理的内存按大小切好作为自由链表链接到一起。然后从span中取对象给thread cache。
-
central cache的中挂的span中有一个use_count成员变量用来记录分配了多少个对象出去,分配一个对象给thread cache,就++use_count
释放内存:
当thread_cache过长或者线程销毁,则会将内存释放回central cache中的,释放回来时–use_count。当use_count减到0时则表示所有对象都回到了span,则将span释放回page cache,page cache中会对前后相邻的空闲页进行合并。
7.2 central cache结构设计
每个程序运行起来后都有自己的进程地址空间,在32位平台下,进程地址空间的大小是 2 32 2^{32} 232;64位平台下,进程地址空间的大小是 2 64 2^{64} 264。
页的大小一般是4K或者8K,以8K( 8 ∗ 1024 = 2 13 8*1024 = 2^{13} 8∗1024=213)为例。在32位平台下,进程地址空间就可以被分成$ 2^{32} ÷ 2^{13} = 2{19}$个页;在64位平台下,进程地址空间就可以被分成$2{64}÷2{13}=2{51} $个页。页号本质与地址是一样的,它们都是一个编号,只不过地址是以一个字节为一个单位,而页是以多个字节为一个单位。
由于页号在64位平台下的取值范围是 [ 0 , 2 51 ) [0,2^{51}) [0,251),不能简单的用一个无符号整型来存储页号,这里可以使用条件编译:
#ifdef _WIN64
typedef unsigned long long PAGE_ID;
#elif _WIN32
typedef size_t PAGE_ID;
#else
//linux
#endif
需要注意的是,在32位下,_WIN32
有定义,_WIN64
没有定义;而在64位下,_WIN32
和_WIN64
都有定义。因此在条件编译时,我们应该先判断_WIN64
是否有定义,再判断_WIN32
是否有定义。
span的结构
span是一个管理以页为单位的大块内存,span的结构如下:
//管理以页为单位的大块内存
struct Span
{
PAGE_ID _pageId = 0; //大块内存起始页的页号
size_t _n = 0; //页的数量
Span* _next = nullptr; //双向链表结构
Span* _prev = nullptr;
size_t _objSize = 0; //切好的小对象的大小
size_t _useCount = 0; //切好的小块内存,被分配给thread cache的计数
void* _freeList = nullptr; //切好的小块内存的自由链表
bool _isUse = false; //是否在被使用
};
对于span管理的以页为单位的大块内存,我们需要知道这块内存具体在哪一个位置,便于之后page cache进行前后页的合并,因此span结构当中会记录所管理大块内存起始页的页号。
至于每一个span管理的到底是多少个页,这并不是固定的,需要根据多方面的因素来控制,因此span结构当中有一个_n成员,该成员就代表着该span管理的页的数量。
每个span管理的大块内存,都会被切成相应大小的内存块挂到当前span的自由链表中,比如8Byte哈希桶中的span,会被切成一个个8Byte大小的内存块挂到当前span的自由链表中,因此span结构中需要存储切好的小块内存的自由链表。
span结构当中的_useCount
成员记录的是,当前span中切好的小块内存,被分配给thread cache的计数,当某个span的_useCount
计数变为0时,代表当前span切出去的内存块对象全部还回来了,此时central cache就可以将这个span再还给page cache。
每个桶当中的span是以双链表的形式组织起来的,当我们需要将某个span归还给page cache时,就可以很方便的将该span从双链表结构中移出。如果用单链表结构的话就比较麻烦了,因为单链表在删除时,需要知道当前结点的前一个结点。
span双链表结构
central cache的每个哈希桶里面存储的都是一个双链表结构:
从双链表删除的span会还给下一层的page cache,相当于只是把这个span从双链表中移除,因此不需要对删除的span进行delete操作。
//带头双向循环链表
class SpanList
{
public:
SpanList()
{
_head = new Span;
_head->_next = _head;
_head->_prev = _head;
}
void Insert(Span* pos, Span* newSpan)//pos位置前插入newSpan
{
assert(pos);
assert(newSpan);
Span* prev = pos->_prev;
prev->_next = newSpan;
newSpan->_prev = prev;
newSpan->_next = pos;
pos->_prev = newSpan;
}
void Erase(Span* pos)//不delete,因为pos是要交给thread
{
assert(pos);
//assert(pos != _head);
Span* prev = pos->_prev;
Span* next = pos->_next;
prev->_next = next;
next->_prev = prev;
}
Span* Begin()
{
return _head->_next;
}
Span* End()
{
return _head;
}
bool Empty()
{
return _head == _head->_next;
}
void PushFront(Span* span)
{
Insert(Begin(), span);
}
Span* PopFront()
{
Span* front = Begin();
Erase(Begin());
return front;
}
private:
Span* _head;
public:
std::mutex _mtx; //桶锁
};
central cache的结构
central cache的映射规则和thread cache是一样的,因此central cache里面哈希桶的个数也是208,但central cache每个哈希桶中存储就是我们上面定义的双链表结构。
#pragma once
#include "Common.h"
class CentralCache
{
public:
//提供一个全局访问点
static CentralCache* GetInstance()
{
return &_sInst;
}
//从central cache获取一定数量的对象给thread cache
size_t FetchRangeObj(void*& start, void*& end, size_t bachNum, size_t size);
//获取一个非空span
Span* GetOneSpan(SpanList& list, size_t size);
//将取出的对象还给central cache中对应的span
void ReleaseListToSpans(void* start, size_t size);
private:
CentralCache(){ }//构造函数私有
CentralCache(const CentralCache&) = delete;//防拷贝
private:
SpanList _spanLists[NFREELISTS];
static CentralCache _sInst;
};
central cache和thread cache的映射规则一样的好处:当thread cache的某个桶没有内存了,可以直接去central cache对应的哈希桶进行申请。
7.3 central cache类
单例模式创建
central cache和page cache在整个进程中只有一个,对于这种只能创建一个对象的类,我们可以将其设置为单例模式。
单例模式可以保证系统中该类只有一个实例,并提供一个访问它的全局访问点,该实例被所有程序模块共享。单例模式又分为饿汉模式和懒汉模式,懒汉模式相对较复杂,我们这里使用饿汉模式就足够了。
饿汉模式:程序运行起来后我们就立马创建该对象,在此后的程序中只有这一个单例。
//CentralCache.h文件
class CentralCache
{
public:
//提供一个全局访问点
static CentralCache* GetInstance()
{
return &_sInst;
}
private:
CentralCache(){ }//构造函数私有
CentralCache(const CentralCache&) = delete;//防拷贝
private:
SpanList _spanLists[NFREELISTS];
static CentralCache _sInst;
};
//CentralCache.cpp文件
CentralCache CentralCache::_sInst;//初始化
慢开始反馈调节算法
慢开始反馈调节算法用于计算central cache应该分配给thread cache多少个内存块对象,当thread cache向central cache申请内存时,如果申请的是较小的对象,那么可以多给一点,但如果申请的是较大的对象,就可以少给一点。
//管理对齐和映射等关系
class SizeClass
{
public:
//thread cache一次从central cache获取对象的上限
static size_t NumMoveSize(size_t size)
{
assert(size > 0);
//对象越大,计算出的上限越低
//对象越小,计算出的上限越高
int num = MAX_BYTES / size;
if (num < 2)
num = 2;//最少给两个
if (num > 512)
num = 512;//小对象,最多给512个
return num;
}
};
就算申请的是小对象,一次性给出512个也是比较多的,基于这个原因,我们在FreeList结构中增加一个叫做_maxSize
的成员变量,该变量的初始值设置为1,并且提供一个公有成员函数用于获取这个变量。也就是说,现在thread cache中的每个自由链表都会有一个自己的_maxSize
。
//管理切分好的小对象的自由链表
class FreeList
{
public:
size_t& MaxSize()
{
return _maxSize;
}
private:
void* _freeList = nullptr; //自由链表
size_t _maxSize = 1;
};
此时,当thread cache申请对象时,会比较_maxSize
和计算得出的值,取出其中的较小值作为本次申请对象的个数。此外,如果本次采用的是_maxSize
的值,那么还会将thread cache中该自由链表的_maxSize
的值进行加一。
因此,thread cache第一次向central cache申请某大小的对象时,申请到的都是一个,但下一次thread cache再向central cache申请同样大小的对象时,因为该自由链表中的_maxSize
增加了,最终就会申请到两个。直到该自由链表中_maxSize
的值,增长到超过计算出的值后就不会继续增长了,此后申请到的对象个数就是计算出的个数。
八、page cache
8.1 page cache整体设计
- Page cache是一个以
页
为单位的span自由链表 - 为了保证全局只有唯一的
Page cache
,这个类可以被设计成单例模式 - 本单例模式采用饿汉模式
page cache与central cache结构的相同之处
page cache与central cache一样,它们都是哈希桶的结构,并且page cache的每个哈希桶中里挂的也是一个个的span,这些span也是按照双链表的结构链接起来的。
page cache与central cache结构的不同之处
首先,central cache的映射规则与thread cache保持一致,而page cache的映射规则与它们都不相同。page cache的哈希桶映射规则采用的是直接定址法,比如1号桶挂的都是1页的span,2号桶挂的都是2页的span,以此类推。
其次,central cache每个桶中的span被切成了一个个对应大小的对象,以供thread cache申请。而page cache当中的span是没有被进一步切小的,因为page cache服务的是central cache,当central cache没有span时,向page cache申请的是某一固定页数的span,而如何切分申请到的这个span就应该由central cache自己来决定。
至于page cache当中究竟有多少个桶,这就要看你最大想挂几页的span了,这里我们就最大挂128页的span,为了让桶号与页号对应起来,我们可以将第0号桶空出来不用,因此我们需要将哈希桶的个数设置为129。
//page cache中哈希桶的个数
static const size_t NPAGES = 129;
为什么这里最大挂128页的span呢?因为线程申请单个对象最大是256KB,而128页可以被切成4个256KB的对象,因此是足够的。当然,如果你想在page cache中挂更大的span也是可以的,根据具体的需求进行设置就行了。
8.2 page cache申请释放
申请内存:
- 当central cache向page cache申请内存时,page cache先检查对应位置有没有span,如果没有则向更大页寻找一个span,如果找到则分裂成两个。比如:申请的是4页page,4页page后面没有挂span,则向后面寻找更大的span,假设在10页page位置找到一个span,则将10页pagespan分裂为一个4页page span和一个6页page span。
- 如果找到_spanList[128]都没有合适的span,则向系统使用mmap、brk或者VirtualAlloc等方式申请128页page span挂在自由链表中,再重复1中的过程。
- 需要注意的是central cache和page cache 的核心结构都是spanlist的哈希桶,但是他们是有本质区别的,central cache中哈希桶,是按跟thread cache一样的大小对齐关系映射的,他的spanlist中挂的span中的内存都被按映射关系切好链接成小块内存的自由链表。而page cache 中的spanlist则是按下标桶号映射的,也就是说第 i 号桶中挂的span都是i页内存。
释放内存:
- 如果central cache释放回一个span,**则依次寻找span的前后page id的没有在使用的空闲span,看是否可以合并,如果合并继续向前寻找。这样就可以将切小的内存合并收缩成大的span,减少内存碎片。**但是合并的最大页数超过128页,就不能合并。
- 如果ThreadCache想直接申请大于64k的内存,直接去PageCache去申请,当在PageCache申请时,如果申请的内存大于128页,则直接向系统申请这块内存,如果小于128页,则去SpanList去查找。
8.3 page cache类
#pragma once
#include"Common.h"
#include "PageMap.h"
class PageCache
{
public:
static PageCache* GetInstance()//全局访问点
{
return &_sInst;
}
//从自己的桶里获取一个k页的span
Span* NewSpan(size_t k);
//获取对象到span的映射
Span* MapObjectToSpan(void* obj);
//释放空闲的span回到PageCache,并合并相邻的span
void ReleaseSpanToPageCache(Span* span);
private:
SpanList _spanLists[NPAGES];
ObjectPool<Span> _spanPool;
//std::unordered_map<PAGE_ID, Span*> _idSpanMap;
TCMalloc_PageMap1<32 - PAGE_SHIFT> _idSpanMap;
public:
// 为了锁住SpanList,可能会存在多个线程同时来PageCache申请span
std::mutex _pageMtx;//page的大锁
private:
PageCache(){ }//构造函数私有
PageCache(const PageCache&) = delete;
static PageCache _sInst;
};
九、项目源码
Gitee:高并发内存池项目: 模拟学习tcmalloc (gitee.com)