一、高并发内存池框架设计
高并发池框架设计,特别是针对内存池的设计,需要充分考虑多线程环境下:
- 性能问题
- 锁竞争问题
- 内存碎片问题
高并发内存池的整体框架设计旨在提高内存的申请和释放效率,减少锁竞争和内存碎片。
高并发内存池设计的基本框架包括**ThreadCache(线程缓存)、CentralCache(中心缓存)和PageCache(页缓存)**三个主要部分。主要组成部分包括:
接下来,我们先粗略介绍一下它们各自的结构和其之间的联系,有利于后续创建每一层时联系上下层编写所需的代码。
(一)ThreadCache的框架设计
功能:每个线程独有的内存缓存,用于小于一定大小(如256KB)的内存的分配。线程从这里申请内存不需要加锁,每个线程独享一个Cache,大大提高了并发性能。
结构:通常设计为哈希桶结构,每个桶是一个按桶位置映射大小的内存块对象的自由链表。这样,线程在申请或释放内存时,可以直接在自己的ThreadCache中进行,无需与其他线程竞争,无须加锁。
内存管理:支持不同大小的内存块申请,通过哈希映射和对齐规则,将不同大小的内存块映射到不同的哈希桶中。
1.ThreadCache申请内存
- 线程A向ThreadCache申请内存,申请的内存大小≤256KB;
- 线程A向ThreadCache申请内存就是从ThreadCache中获取空闲的内存块,从而寻找ThreadCache中的空闲内存块使用;
- 通过映射关系找到对应的哈希桶,查看对应的哈希桶中是否拥有空闲内存块。如果有则直接使用(Pop一个空闲内存块返回),如果没有则向上一层CentralCache对应的哈希桶中申请空闲内存块插入到自由链表中,再Pop给线程A;
- 由于每个线程都有属于自己的Thread,因此我们引入一个知识点TLS无锁访问使得每个线程都可以有属于自己的Thread。
2.ThreadCache释放内存
- 线程A释放内存,ThreadCache回收内存块,内存大小依旧局限于256KB;
- 正常回收到对应的自由链表中;
- 当某条自由链表过长时,将其释放给CentralCache。
哈希桶的结构:
- 由_freeList自由链表挂在对应的哈希桶位置组成;
- 每个哈希桶与其悬挂自由链表中的内存块存在映射规则(如图)。
3.如何划分哈希桶布局-完成ThreadCache中的小组件
比如在向 ThreadCache 申请内存时,我们申请size个字节的内存,需要去有该size字节的对应的哈希桶中查找是否有空闲的内存块,如果有则使用(如果没有,则要去CentralCache中申请内存块)。
因此,我们要考虑的变量一是对应的哈希桶,二是向CentralCache申请内存时,大小应该为多少。
- 在ThreadCache中如何通过申请size个字节,找到对应的哈希桶。
- 如何通过size个字节,找到向CentralCache申请的内存块大小(找size对齐后的alignSize)。
256KB划分情况如下(整体控制在最多10%左右的内存碎片浪费):
字节数 | 对齐数 | 哈希桶下标 |
---|---|---|
[ 1 , 128 ] | 8 | [ 0 , 16 ) |
[ 128+1 , 1024 ] | 16 | [ 16 , 72 ) |
[ 1024+1 , 8 * 1024 ] | 128 | [ 72 , 128 ) |
[ 8 * 1024+1 , 64 * 1024 ] | 1024 | [ 128 , 184 ) |
[ 64 * 1024+1 , 256 * 1024 ] | 8 * 1024 | [ 184 , 208 ) |
对齐数就是我们在分配内存时最后得到的内存大小一定是对齐数的倍数,比如我们申请129个字节,由表得知,它的对齐数是16字节,因此对齐后的大小应该是129÷16=8……1,还需要15个字节才能对齐,对齐后所需要的内存大小为129+15 = 144,也就是说浪费了15个字节。在前篇文章中,我们也已经介绍过内碎片的存在,这份浪费的15字节就是内碎片。
我们可以通过公式:
浪费率 = 浪费的字节数/对齐后的字节数;
得到 15÷144 ≈ 10%,依次类推,整体控制在10%左右的内存碎片浪费。对于1~128这个区间不做讨论。
要想得到某个区间最大浪费率,我们可以通过让分子变大,分母变小的方式来得到该区域的最大浪费率。
因此可以得到后几组对齐数的最大浪费率:
对齐数为128,对齐后字节数为1152:127÷1152 ≈ 11.02%;
对齐数为1024,对齐后字节数为9216:1023÷9216 ≈ 11.10%;
对齐数为8*1024=8192,对齐后字节数为1152:8192÷73728 ≈ 11.11%
4.对象的映射规则
通过对齐数得到对齐后的字节数
如申请内存为14字节,在0~128的区间中对齐数为8,得到对齐后的字节数16
通过对齐数得到对应的哈希桶
如申请的内存为15字节,在0~128的区间中对齐数为8,得到(15+(8-1))/8 -1 = 1,对应的哈希桶为一号桶
将两种映射规则放在类SizeClass中。
// 申请size字节,通过对齐数获取对齐后的字节数alignNum
/*static inline size_t _RoundUp(size_t size, size_t alignNum) // RoundUp 的子函数
{
// 找size对齐后的对齐后的字节数,也就是申请的内存大小alignNum
if (size % alignNum == 0)
return size;
else
return (bytes / alignNum + 1)*alignNum;
}*/
static inline size_t _RoundUp(size_t size, size_t alignNum)
{
return (size + (alignNum - 1)) & ~(alignNum - 1);
}
// size 的大小 size>=0 size<=256KB
static inline size_t RoundUp(size_t size)
{
assert(size <= MAX_BYTES); // MAX_BYTES = 256 * 1024;
if (size <= 128)
{
return _RoundUp(size, 8);
}
else if (size <= 1024)
{
return _RoundUp(size, 16);
}
else if (size <= 8 * 1024)
{
return _RoundUp(size, 128);
}
else if (size <= 64 * 1024)
{
return _RoundUp(size, 1024);
}
else if (size <= 256 * 1024)
{
return _RoundUp(size, 8 * 1024);
}
else
{
assert(false);// 出错了
return -1;
}
}
// 对象申请的内存大小与对应的哈希桶之间的映射
//方法一
// 申请内存与对应的哈希桶之间的映射
static inline size_t _Index(size_t size, size_t align_shift)
{
return ((size + (1 << align_shift) - 1) >> align_shift) - 1;
}
static inline size_t Index(size_t size)
{
assert(size <= MAX_BYTES);
static int group_array[4] = { 16, 56, 56, 56 };
if (size <= 128)
{
return _Index(size, 3);
}
else if (size <= 1024)
{
return _Index(size - 128, 4) + group_array[0];
}
else if (size <= 8 * 1024)
{
return _Index(size - 1024, 7) + group_array[0] + group_array[1];
}
else if (size <= 64 * 1024)
{
return _Index(size - 8 * 1024, 10) + group_array[0] + group_array[1] + group_array[2];
}
else if (size <= 256 * 1024)
{
return _Index(size - 64 * 1024, 13) + group_array[0] + group_array[1] + group_array[2] + group_array[3];
}
else
{
assert(false);
return -1;
}
}
//方法二
/*
static inline size_t _Index(size_t bytes, size_t align)
{
if (bytes % align == 0)
return bytes / align - 1;
else
return bytes / align;
}
static inline size_t Index(size_t Bytes)
{
assert(Bytes <= MAX_BYTES);
// 每个桶有多少个节点
static int group_array[4] = { 16, 56, 56, 56 };
if (Bytes <= 128) {
return _Index(Bytes, 8);
}
else if (Bytes <= 1024) {
return _Index(Bytes - 128, 16) + group_array[0];
}
else if (Bytes <= 8 * 1024) {
return _Index(Bytes - 1024, 128) + group_array[1] + group_array[0];
}
else if (Bytes <= 64 * 1024) {
return _Index(Bytes - 8 * 1024, 1024) + group_array[2] + group_array[1] + group_array[0];
}
else if (Bytes <= 256 * 1024) {
return _Index(Bytes - 64 * 1024, 8192) + group_array[3] + group_array[2] + group_array[1] + group_array[0];
}
else {
assert(false);
}
return -1;
}
*/
需要注意的是,SizeClass类当中的成员函数最好设置为静态成员函数,否则我们在调用这些函数时就需要通过对象去调用,并且对于这些可能会频繁调用的函数,可以考虑将其设置为内联函数。
(二)CentralCache的框架设计
CentralCache的结构与ThreadCache相似,映射关系也相似,但是由图得知仍然存在不同。
功能:CentralCache作为所有线程共享的内存池,确保了内存资源在多个线程之间的公平分配和高效利用。也就是整个进程中只有一个CentralCache,可以通过设计一个单例模式来实现,由于每次访问的都是共同的CentralCache,在进行读写的情况下,都要对其加锁处理(因为线程1在读的时候,可能线程2正在申请释放内存块,导致线程1获取到的内存块并不是可利用的空闲内存)
结构:采用哈希桶结构,但映射规则与ThreadCache相同。每个哈希桶下悬挂的是名为Span的变量,而每个span中都会指向一个自由链表,自由链表下悬挂的内存块大小与它的桶号一一对应。
内存管理:CentralCache负责为各个线程的ThreadCache分配内存。当ThreadCache中的内存不足时,会向CentralCache申请新的内存块。CentralCache会在适当的时候回收ThreadCache中不再使用的内存,以避免内存浪费和碎片化。
CentralCache与PageCache的框架结构中具有相似的Span结构,这也意味着又有一个共同点设计在Common.h文件中,在这里我们就不像ThreadCache中的自由链表与哈希桶的映射关系一样单独拿出来解释,因为我们对Span的设计,会随着各层之间的联系发生变化,比如增加多个变量,如果在这里就进行介绍,在后续的讲解中可能会遗忘。
1.CentralCache申请内存
- ThreadCache向CentralCache申请内存,首先还是先找到对应哈希桶中的空闲内存块,也就是说要分析每个Span,找到非空Span后,将一块(或一批量)内存块挂在ThreadCache对应自由链表下;
- 当对应哈希桶下,没有非空的Span时,意味着ThreadCache无法获取到空闲内存块,因此CentralCache需要向上一层的PageCache中申请空闲的Span利用,申请到这份NewSpan后,还需要对这份NewSpan做处理,将其划分为合适大小的内存块,悬挂在Span中的自由链表下进行管理,然后就可以将这份新的非空Span中的内存块划分给ThreadCache;
- 需要注意的是我们在申请内存访问CentralCache时,需要加上对应的桶锁,避免其他同时间对CentralCache的操作影响我们的结果。
2.CentralCache释放内存
- CentralCache释放内存有两个大方向,一个是从ThreadCache中回收内存块,一个是回收后的内存块使得部分Span处于完全空闲状态,可以释放给PageCache,增加空间利用;
- 回收从ThreadCache中释放的内存:当ThreadCache中某一自由链表下悬挂的空闲内存块过多时(什么样的判断标准),会由CentralCache进行回收,这些内存块都是有对应哈希桶中的Span下的内存块分配出去的,但是属于哪个Span是不确定的,因此在我们创建Span时就可以将内存块的地址与Span做映射关系,确保回收回的内存块可以找到对应的Span,至于怎么设计这份映射关系,后面再讨论;
- 将完全空闲的Span释放给PageCache:如何判断这个Span是完全空闲的?也就是它管理的自由链表中没有被使用的内存块,设计一个_usecount变量进行管理,当分配出去对象时,_usecount就++,当_usecount为0时就表示所有对象回到了Span中,则将Span释放回PageCache,需要相邻的空闲Span合并,挂在PageCache对应的哈希桶下;
- 但是当我们从PageCache中申请的Span时这份Span也是空闲的,如果与要释放的Span合并起来该怎么办?在物理空间中,无论我们怎么切分内存空间,它们在物理位置上并没有发生变化,因此在我们查找相邻空闲Span的地址时,可能会导致合并后的Span一部分在CentralCache中,一部分在PageCache,一部分难以访问到的情况,因为如果与CentralCache中相邻Span合并,这部分Span仍然处于CentralCache中并没有被释放到PageCache中(如下图所示);
- 也就是说想要合并Span有两个条件:空闲的Span和在PageCache中。思考:如何判断Span时空闲Span,如何判断Span是在PageCache中的?
- 将某个Span合并到PageCache中后,别忘了在CentralCache中对应的SpanList中删除哦,不然会出错的。
(三)PageCache的框架设计
功能:作为内存池的最上层缓存,以页为单位存储和分配内存。当CentralCache内存不足时,PageCache会向系统申请新的内存页,并切割成小块内存分配给CentralCache。当一个Span的几个跨度页的对象都回收以后,PageCache会回收CentralCache满足条件的Span对象,并且合并相邻的页组成更大的页,缓解内存碎片的问题。
结构:通常也采用哈希桶结构,但映射规则与ThreadCache和CentralCache不同,主要按页号进行映射。
内存管理:PageCache负责大块内存的分配和回收,以及与系统的内存交互。同时,也会合并相邻的空闲页,以减少内存碎片。
1.PageCach申请内存
- CentralCache从PageCache中申请内存,对应哈希桶恰好有Span,切割好内存后交给CentralCache;
- 向页数更大的哈希桶中查找Span,找到并切割合适的Span给CentralCache,要记得对切割前的Span和切割后的Span要清理和挂在对应的位置。比如从100页切割了42页大小的Span给CentralCache,那么原先的100页Span要从100page的哈希桶中删除,并将58页大小Span重新挂在58page的哈希桶中,记得更新Span的自身信息;
- 当PageCache中无合适的Span时,需要从系统中申请内存,通过使用brk、mmp或者是VirtualAlloc等方式从操作系统中申请128页Span,挂在对应哈希桶中。
- 注意PageCache与前两者的映射关系不同,PageCache中的第i号桶中挂的Span都是i页内存。
2.PageCach释放内存
- PageCache回收CentralCache释放的内存,无可合并的Span,将释放回的Span挂在对应哈希桶中;
- PageCache中有可合并的相邻的Span,合并直到无相邻Span或最大为128页后不再合并,减少内存碎片问题
- PageCache释放内存给操作系统。
二、ThreadCache(线程缓存)_内存设计
(一)处理哈希桶相关问题
首先创建一个ThreadCache哈希桶,我们需要先创建它其中的小组件。
从上列文字和图形中,我们了解到ThreadCache是由不同内存块组成的自由链表挂在对应哈希桶上的结构。
创建哈希桶中每个桶所属的自由链表,由于CentralCache也是哈希桶结构,我们的自由链表创建可以放在一个公共的头文件中,方便使用。将自由链表创建为类,管理切分好的小对象的自由链表。
// 这里是一个共同访问的Common.h头文件,会被TCMalloc不同层使用到的相同变量放在此处
#include <iostream>
#include <vector>
#include<assert.h>
using std::cout;
using std::endl;
void*& NextObj(void* obj) {
return *(void**)obj;
}
// 创建ThreadCache和CentralCache都会使用到的_freelist
// 线程会从自由链表从申请到内存或释放内存块回到自由链表
//
class FreeList
{
public:
// 线程申请内存就是从对应的自由链表中取出内存块(头删)
void Push(void* obj)
{
NextObj(obj) = _freeList;
_freeList = obj;
}
// 线程释放内存就是将内存块释放到对应的自由链表中(头插)
void* Pop()
{
assert(_freeList); // 自由链表为空时,无法取出空闲内存块
void* obj = _freeList;
_freeList = NextObj(_freeList);
return obj;
}
// 判断自由链表中是否存在内存
bool Empty()
{
if (_freeList == nullptr)
return true;
return false;
}
private:
void* _freeList = nullptr;
};
我们现在可以通过FreeLsit创建一个哈希桶了。
接下来是对哈希桶进行管理,对于哈希桶的映射关系,如果还是不了解,去查看ThreadCache的框架设计中的讲解
(二)ThreadCache–申请内存
// 放在公用头文件中
// thread cache和central cache自由链表哈希桶的表大小
static const int NFREELISTS = 208;
// 小于 256 * 1024 向 ThreadCache 申请
static const int MAX_BYTES = 256 * 1024;
// 大于则向 PageCache 或者 堆上申请,这里我们暂时不多思考,一步一步来
- 当内存申请size<=256KB,先获取到线程本地存储的ThreadCache对象,计算size映射的对齐后的对象大小和哈希桶自由链表下标 Index 。
- 申请内存时,如果ThreadCache对应哈希桶中的自由链表**(_freeLists[index])**有空闲内存块时,则直接Pop()一个内存对象返回。
- 申请内存时,如果ThreadCache对应哈希桶中的自由链表**(_freeLists[index])**没有空闲内存块时,则批量从CentralCache中获取对齐后大小的数量的对象,插入到自由链表并返回一个对象。
// ThreadCache.h
#pragma once
#include "CommonPool.h"
class ThreadCache
{
private:
FreeList _freeLists[NFREELISTS]; // 哈希桶
public:
// 申请和释放对象内存
void* ThreadAlloc(size_t size);
void* ThreadFree(void* obj,size_t size);
// 从CentalCache中申请size内存
void* FetchFromCentralCache(size_t index,size_t size);
};
定义ThreadCache 中的成员函数,并且完善其他类。
/* 申请内存 */
/* ThreadCache.cpp */
void* ThreadCache::ThreadAlloc(size_t size)
{
// size 一定是小于等于256KB,且申请获得的空间大小一定是向上对齐的
assert(size <= MAX_BYTES);
// 根据对象申请的大小,在对应的哈希桶中找到对应的内存块
// 如果有就直接使用,如果没有就需要先从CentralCache中申请对应内存块给ThreadCache
size_t alignBytes = SizeClass::RoundUp(size);
size_t index = SizeClass::Index(size);
// 接下来申请内存,注意我们要在对应的哈希桶中找到空闲的内存块使用
if (!_freeLists[index].Empty())
{
return _freeLists[index].Pop();// 从该哈希桶中的存放空闲内存块的自由链表中取出空闲内存块给对象
}
else
{
// 如果对应的哈希桶中没有空闲内存块,需要向centralcache中申请内存块分配给threadcache
// 申请到的内存块要分配给以alignBytes为对齐数,放在在下标位index的哈希桶中
return FetchFromCentralCache(index, alignBytes);
}
}
关于 FetchFromCentralCache(index, alignSize); ,需要联系到 CentralCache ,后续再讲解。
(三)线程局部存储–无锁访问ThreadCache
Thread Local Storage(线程局部存储)TLS
线程本地存储(Thread Local Storage) - 坦坦荡荡 - 博客园
如果一个变量是全局的,那么所有线程访问的是同一份,某一个线程对其修改会影响其他所有线程。如果希望每个线程访问到这个变量,并且不会影响其他线程中的这个变量,那么我们该怎么创建?
我们的高并发内存池就是 每个线程独有一个ThreadCache线程缓存,互不影响,也无需加锁,那么如何创建每个线程各自的ThreadCache?
因此我们引入一个概念:
如果我们需要一个变量在每个线程中都能访问,并且这个变量在每个线程中互不影响,这就是TLS。
线程局部存储TLS(Thread Local Storage)是一种变量的存储方法,这个变量在它所在的线程内是全局可访问的,但是不能被其他线程访问到,这样就保持了数据的线程独立性。
而熟知的全局变量,是所有线程都可以访问的,这样就不可避免需要锁来控制,增加了控制成本和代码复杂度。
TLS依情况而定有Windows下的,也有Linux下的。既有动态也有静态,这里我们选择静态TLS:
_declspec(thread) DWORD data=0;
声明了_declspec(thread)的变量,会为每一个线程创建一个单独的拷贝。
// TLS thread local storage(线程局部存储、线程本地存储)
// 通过TLS 每个线程无锁的获取自己的专属的ThreadCache对象
static _declspec(thread) ThreadCache* pTLSThreadCache = nullptr;
// thread 用于声明一个线程本地变量,
// _declspec(thread)的前缀是Microsoft添加给Visual C++编译器的一个修改符。
上列代码在ThreadCache.h头文件中声明了为每个线程创建一份ThreadCache。但也仅仅是声明,并不是为每个线程创建了属于自己的ThreadCache。
下列代码是对每个线程创建属于自己的ThreadCache,当该线程调用相关申请内存的接口时才会创建自己的ThreadCache。
//通过TLS,每个线程无锁的获取自己专属的ThreadCache对象
if (pTLSThreadCache == nullptr)
{
pTLSThreadCache = new ThreadCache;
}
(四)ThreadCache–封装使用及测试
1.对ThreadCache对象的使用进行封装
在使用过TLS后,对每一个线程创建一份ThreadCache时,将申请(释放)的功能进行再一次的封装,有利于后续对象申请(释放)空间直接使用。
// 我们创建一个新的头文件 Concurrent.h
// 在这里将threadcache封装起来以便对象申请释放内存时更方便使用
#include "Common.h"
#include "ThreadCache.h"
static void* concurrentAlloc(size_t size)
{
// 为每一个线程申请属于他自己的threadcache
if (PTLSThreadCache == nullptr)
{
PTLSThreadCache = new ThreadCache;
// 这里可以连同后面的测试 编写一段代码
// cout<< "Thread x creates a threadcache object."<< endl ;
}
// get_id()它用于获取当前线程的唯一标识符。
// 每个线程在程序执行期间都有一个唯一的标识符,这个函数返回一个表示当前线程ID的对象。
//cout << std::this_thread::get_id() << ":" << pTLSThreadCache << endl;
// 调用threadcache申请内存
return PTLSThreadCache->Allocate(size);
}
2.测试每个线程有属于自己的ThreadCache对象
在Test.cpp文件中,我们对上列代码进行测试,观察线程之间使用同一变量是否会互相影响,测试每一个线程是否获得了属于自己的ThreadCache对象。
// Test.cpp
#include <thread>
void Alloc1()
{
for (size_t i = 0; i < 5; ++i)
{
//申请6字节大小的内存对象
void* ptr = concurrentAlloc(6);
}
}
void Alloc2()
{
for (size_t i = 0; i < 5; ++i)
{
//申请9字节大小的内存对象
void* ptr = concurrentAlloc(9);
}
}
void TextpTLSThreadCache()
{
std::thread t1(Alloc1);
std::thread t2(Alloc2);
t1.join();
t2.join();
}
int main()
{
//测试每个线程是否有属于自己的ThreadCache
TextpTLSThreadCache();
return 0;
}
这种错误通常是因为在多个源文件中定义了相同的函数或变量,导致链接器在链接阶段发现重复定义而报错。造成原因:重复定义,头文件包含,静态库冲突。
可以通过使用 inline 关键字、static 关键字、分离声明和定义以及检查头文件保护机制来解决这个问题。
测试结果:
调试过程中可以看到两个线程互不影响地运行着。
从测试结果看,两个线程是同时进行的,并且互不影响。