目录
整体框架介绍
ThreadCache的主体框架
自由链表-FreeList
内存对齐-RoundUp
计算桶位置-Index
基础版
进阶版
线程局部存储
__declspec(thread) 关键字
实现线程无锁
申请内存-Allocate
释放内存-Deallocate
从中心缓存中申请内存
整体框架介绍
高并发内存池(concurrent memory pool)主要由以下三个部分构成:
- 线程缓存(Thread Cache):是哈希桶结构,每个桶下都挂有一个自由链表,每个线程独享,用于分配单次申请的内存小于256KB的情况(而不是说它一共可分配的内存为256KB),每个线程从这里申请内存不需要加锁,更加高效
- 中心缓存(Central Cache):所有线程共享,故使用桶锁来解决各个线程在申请内存时存在的竞争关系(因为只有某个线程缓存没有足够的内存时才会向中心缓存申请,所以这里的竞争没有那么激烈),每个线程的线程缓存会按需求从中心缓存中获取内存,中心缓存再在合适的时机回收线程缓存中的内存,从而达到内存分配在多线程中更加的均衡,中心缓存没对象时会去页缓存申请页
- 页缓存(Page Cache):以页为单位进行存储和分配,中心缓存没有足够的内存对象时,页缓存会分配一定数量的页,并切割成小块内存,分配给中心缓存,当一个span管理的几个页对象都回收后,页缓存会回收中心缓存中满足条件的span对象,并且合并相邻的页,组成更大的页,从而缓解内存碎片问题
哈希桶:具有相同映射关系的对象归于同一子集合,每一个子集合称为一个哈希桶,各个桶中的元素通过一个单链表链接起来,各链表的头结点存储在哈希表中
ThreadCache的主体框架
//定义在common.h中
static const size_t NFREELIST = 208;//提前计算出需要208个桶(后面会有解释)
static const size_t NFREELIST = 208;//规定好有208个桶(后面会有解释)
//定义在ThreadCache.h中
class ThreadCache
{
public:
void* Allocate(size_t bytes);//申请内存
void Deallocate(void* ptr, size_t size);//释放内存
//从中心缓存中获取内存
void* FetchFromCentralCache(size_t index, size_t size);
private:
FreeList _freeLists[NFREELIST];//208个桶(自由链表)
};
//TLS thread local storage(后续会说明)
//static保证该指针只在当前文件可见防止因为多个头文件包含导致的链接时出现多个相同名称的指针
static _declspec(thread)ThreadCache* pTLSThreadCache = nullptr;
//_declspec(thread)关键字指定 pTLSThreadCache 变量是一个线程局部存储(TLS)变量.
//这意味着 pTLSThreadCache 指针是每个线程独有的,当一个线程使用 pTLSThreadCache 时,它访问的是与其他线程完全独立的内存空间。
自由链表-FreeList
基本概念:链表中的各个结点都是归还回来的小块内存
//static修饰防止重名,同时传引用返回防止拷贝
static void*& NextObj(void* obj)
{
return *(void**)obj;
}
//管理小块内存的自由链表
class FreeList
{
public:
//头插
void Push(void* obj)
{
assert(obj);//要插入的对象不能为空
NextObj(obj) = _freeList;
_freeList = obj;
}
//头删
void* Pop()
{
assert(_freeList);//自由链表不能为空
void* obj = _freeList;
_freeList = NextObj(obj);
return obj;
}
//判空
bool Empty()
{
return _freeList == nullptr;
}
//最大结点个数
size_t& MaxSize()
{
return _maxSize;
}
private:
void* _freeList = nullptr;
size_t _maxSize = 1;//用于控制自由链表中结点的最大个数
};
- NextObj(void* obj):用每个链表结点的前4/8个字节(取决于位环境)存放下一个结点的地址,但如果
- void* _freelist::void*类型的指针是通用指针类型,可以指向任何类型的数据。这意味着自由链表的每个节点都可以容纳任何类型的数据结构
int a = 2;
int* ptr = &a;
void* vptr = ptr;
- size_t& MaxSize():用于文章末尾的慢开始调节算法
内存对齐-RoundUp
需求原因:thread cache支持单次内存小于等于256KB的申请,如果我们将每种字节数的内存块都用自由链表来管理,就需要256 * 1024 = 262,144个桶,而存储桶中的自由链表的头指针就需要消耗大量的内存(一个桶下就有一个自由链表)
解决办法:将每次申请的内存大小size按照某种规则进行内存对齐
新问题:对齐数应该大于4字节,因为自由链表中的每个结点都需要存放下一个结点的地址,如果对齐数设为4,则在32位环境下size为3时对齐后为4,可以放下一个指针,但在64位环境下size为3对齐后为4,不能放下一个指针,所以最小对齐数应该为8(惯例,取4或8或16等2的倍数便于OS进行内存管理),但若我们将对齐数均取为8,仍然需要32767个桶
假设有一块8字节的内存,初始时它处于空闲状态:
//32位环境 [ 指向下一个块的地址 | 未使用 ] [ 4字节指针 | 4字节未使用 ] //64位环境 [ 指向下一个块的地址 ] [ 8字节指针 ]
当这个块被分配给一个变量后,整个8字节就可以用来存储变量的数据:
//32位环境下: [ 变量数据 | 变量数据 ] [ 4字节数据 | 4字节数据 ] //64位环境下: [ 变量数据 | 变量数据 ] [ 4字节数据 | 4字节数据 ]
即空闲时的结点中不会存放数据只有下一个结点的地址,所以不用担心64位环境下size = 3,对齐后为8但是放不下一个指针的问题(之前我一直在纠结这里🤡)
最终解决办法:按照size所属的字节范围选用不同的对齐数,即一段范围的值对应一个桶
[1,128]:对齐数为8,一共16个自由链表:
8->8->
16->16->
...
128->128->
[128+1,1024]:对齐数为16,一共56个自由链表:
129->129->
145->145->
...
1024->1024->
...
[64*1024+1,256*1024]:对齐数为8*1024,一共56个自由链表:
64*1024+1->64*1024+1->
64*1024+8*1024->64*1024+8*1024->
...
256*1024->256*1024->
优点:减少高并发内存池(一):项目介绍与定长内存池的实现中提到的内碎片,提高资源利用率,每次分配出去的内存中最多有10%左右的内碎片浪费(size = 15,在[1,128]范围内,按8对齐后的内碎片为1,1 / 16 * 100% = 6.25% ≈ 10%)
对齐规则:
申请的size大小 对齐数 桶/自由链表的个数
[1,128] 按8byte对齐 freelist[0,16)
[128+1.1024] 按16byte对齐 freelist[16,72)
[1024+1,8*1024] 按128byte对齐 freelist[72,128)
[8*1024+1,64*1024] 按1024byte对齐 freelist[128,184)
[64*1024+1,256*1024] 按8*1024byte对齐 freelist[184,208)
//计算对象大小的对齐映射规则
class SizeClass
{
public:
//_函数名:表示一个子/辅助函数
//bytes:申请的内存大小
//alignNum:规定的对齐数
static inline size_t _RoundUp(size_t bytes, size_t alignNum)
{
size_t alignSize;//对齐后大小
if (bytes % 8 != 0)//不满足初始的以8byte对齐就按照
{
alignSize = (bytes / alignSize + 1) * alignNum;
}
else
{
alignSize = bytes;
}
return alignSize;
}
//对齐函数
static inline size_t RoundUp(size_t size)
{
if (size <= 128)
{
return _RoundUp(size, 8);
}
else if (size <= 1024)
{
return _RoundUp(size, 16);
}
else if(size <= 8*1024)
{
return _RoundUp(size, 128);
}
else if(size <= 64*1024)
{
return _RoundUp(size, 1024);
}
else if (size <= 256 * 1024)
{
return _RoundUp(size, 8*1024);
}
else
{
//到这里表示必然出错,直接assert退出即可
assert(false);
return -1;
}
}
};
- 内联函数:减少函数调用的开销,提高程序的运行效率
- 通过位运算实现的计算对齐规则的子函数,虽然这样进行位运算更快,但很难想到:
static inline size_t _RoundUp(size_t bytes, size_t align)
{
return (((bytes)+align - 1) & ~(align - 1));
}
注意事项:这只是为了减少桶的个数而设计的对齐方案,如何找到对应的桶在下面的内容中
计算桶位置-Index
基础版
//基础版寻找桶位置
static inline size_t Index(size_t bytes,size_t alignnum)//申请内存,对齐数
{
if (bytes % alignnum == 0)//刚刚好和对齐数一样
{
return bytes / alignnum - 1;//第一个桶的下标为0,故后续桶计算出的位置要-1
}
else
{
return bytes / alignnum;
}
}
//传递参数的函数与进阶版中的类似,这里不再写
- bytes = 8:alignnum = 8,8 % 8 = 0,8 / 8 - 1 = 0,应该位于第一个桶下的自由链表
- bytes = 9:alignnum = 8,9 / 8 = 1,应该位于第二个桶下的自由链表
进阶版
//进阶版寻找桶位置:
static inline size_t _Index(size_t bytes, size_t align_shift)//申请内存,对齐数的次方
{
return ((bytes + (1 << align_shift) - 1) >> align_shift) - 1;
}
static inline size_t Index(size_t bytes)
{
assert(bytes <= MAX_BYTES);//确保传入申请内存的最大大小不超过256KB,MAX_BYTES会额外定义
static int group_array[4] = { 16,56,56,56 };//提前写出计算好的每个链表的个数
if (bytes <= 128)
{
return _Index(bytes,3);
}
else if (bytes <= 1024)
{
return _Index(bytes - 128, 4) + group_array[0];//加上上一个范围内的桶的个数
}
else if (bytes <= 8 * 1024)
{
return _Index(bytes - 1024, 7) + group_array[0] + group_array[1];
}
else if (bytes <= 64 * 1024)
{
return _Index(bytes - 8 * 1024, 10) + group_array[0] + group_array[1] + group_array[2];
}
else if (bytes <= 256 * 1024)
{
return _Index(bytes - 64 * 1024, 13) + group_array[0] + group_array[1] + group_array[2] + group_array[3];
}
else
{
assert(false);
return -1;
}
}
- bytes = 56,align_shift = 3:(bytes + (1 << align_shift) - 1),56+7=63
- >> align_shift-1:63 >> - 1 = 6,应该位于第七个桶下的自由链表
- 依旧是位运算更快所以才会有进阶版,但实际有点难想到
线程局部存储
基本概念: 允许每个线程有自己的一份数据拷贝,这样多个线程可以同时运行相同的代码,而不必担心会干扰其他线程的数据
__declspec(thread)
关键字
基本概念:是一个用于在Windows平台上声明线程局部存储变量的关键字,它会为每个线程创建一个独立的数据副本,每个线程对这些数据的读写操作都是独立的
~下面的三种用法是补充内容,了解即可~
基本用法:
被__declspec(thread)
修饰的变量会为每个线程创建一个独立的副本。每个线程对这些变量的读写操作都是线程独立的。__declspec(thread) int tlsVar = 0; void SomeFunction() { tlsVar++; std::cout << "Thread " << GetCurrentThreadId() << ": tlsVar = " << tlsVar << std::endl; }
在本例子中,
tlsVar
是一个线程局部变量,每个线程都有自己的tlsVar
实例SomeFunction
函数可以在多个线程中并发执行,每个线程都会修改自己的tlsVar
,而不会影响其他线程的tlsVar
。
在类中使用:
__declspec(thread)
也可以用于修饰类的成员变量,只要这些成员是静态的class MyClass { public: static __declspec(thread) int tlsMember; }; __declspec(thread) int MyClass::tlsMember = 0; void SomeFunction() { MyClass::tlsMember++; std::cout << "Thread " << GetCurrentThreadId() << ": tlsMember = " << MyClass::tlsMember << std::endl; }
MyClass::tlsMember
是一个静态的线程局部变量,每个线程都有自己的tlsMember
实例
在多文件中使用:被
__declspec(thread)
修饰的变量可以在多个编译单元(即多个源文件)中使用,但需要确保在每个源文件中正确声明和定义变量// header.h #ifndef HEADER_H #define HEADER_H extern __declspec(thread) int tlsVar; void SomeFunction(); #endif // source1.cpp #include "header.h" __declspec(thread) int tlsVar = 0; void SomeFunction() { tlsVar++; std::cout << "Thread " << GetCurrentThreadId() << ": tlsVar = " << tlsVar << std::endl; } // source2.cpp #include "header.h" void AnotherFunction() { tlsVar += 10; std::cout << "Thread " << GetCurrentThreadId() << ": tlsVar = " << tlsVar << std::endl; }
tlsVar
在source1.cpp
文件中定义,但它可以在source2.cpp
文件中使用。每个线程都有自己独立的tlsVar
副本
注意事项:__declspec(thread)
是Microsoft的扩展,主要用于Windows平台和支持它的编译器(如Microsoft Visual C++)。在跨平台开发中使用时需要小心,如果目标平台不支持这个关键字,代码将无法编译
实现线程无锁
//处理并发执行的函数
static void* ConcurrentAlloc(size_t size)
{
//通过TLS方法,每个线程可以无锁的获取自己专属的ThreadCache对象
if (pTLSThreadCache == nullptr)
{
pTLSThreadCache = new ThreadCache;
}
//获取线程id(检测两个线程是否分到两个不同的pTLSThreadCache)
//cout << std::this_thread::get_id() << ":" << pTLSThreadCache << endl;
return pTLSThreadCache->Allocate(size);
}
//释放ThreadCache
static void ConcurrentAlloc(void* ptr,size_t size)
{
//理论上释放时pTLSThreadCache不会为空
assert(pTLSThreadCache);
pTLSThreadCache->Deallocate(ptr,size);//调用Deallocate函数
}
下面是测试代码及过程(完整代码过多不再展示理解意思即可):
申请内存-Allocate
//调用ThreadCache中的申请内存对象
void* ThreadCache::Allocate(size_t size)
{
//范围
assert(size <= MAX_BYTES);
size_t allignSize = SizeClass::RoundUp(size);//获取对齐后的大小
size_t index = SizeClass::Index(size);//确认桶的位置
if (!_freeLists[index].Empty())//桶中的自由链表是否为空
{
return _freeLists[index].Pop();//头删相应位置的自由链表
}
else
{
return FetchFromCentralCache(index, allignSize);//向中心缓存处获取内容
}
}
释放内存-Deallocate
void ThreadCache::Deallocate(void* ptr, size_t size)
{
assert(ptr);
assert(size <= MAX_BYTES);//大于256KB的内存不应该在这里归还
//找对映射的自由链表桶,并将用完的对象插入进去
size_t index = SizeClass::Index(size);
_freeLists[index].Push(ptr);
}
从中心缓存中申请内存
注意事项:为了方便申请和释放内存,所以ThreadCache、CentralCache、PageCache三者的内存size与桶位置的映射关系是一样的
//向中心缓存申请
void* ThreadCache::FetchFromCentralCache(size_t index, size_t size)
{
//不断有size大小的内存需求,那么batchNum会不断增长直到上限,size越小上限越高,最高是512
size_t batchNum = std::min(_freeLists[index].MaxSize(), SizeClass::NumMoveSize(size));
if (_freeLists[index].MaxSize() == batchNum)
{
_freeLists[index].MaxSize() += 1;
}
return nullptr;
}
- batchNum:一次性批发的某自由链表中结点的个数
慢开始调节算法
基本概念:源自于TCP拥塞控制算法中的一种机制,用于在连接刚开始时逐渐增加发送窗口大小,在这里是为了实现小块内存多申请,大块内存少申请的目标,避免最开始一次性向central cache申请过多的内存,因为要太多可能用不完
//thread cache一次可以从central cache中获取的span的个数
static size_t NumMoveSize(size_t size)//size表示要申请的对象的大小
{
if (size == 0)
{
return 0;
}
int num = MAX_BYTES / size;//计算需要可能的span个数
if (num < 2)
{
num = 2;
}
if (num > 512)
{
num = 512;
}
//num的取值范围是[2,512]
return num;
}
~over~