目录
1、项目介绍
2、高并发内存池整体框架设计
3、thread cache
<1>thread cache 哈希桶对齐规则
<2>Thread Cache类设计
4、Central Cache
<1>Central Cache类设计
5、page cache
<1>Page Cache类设计
6、性能分析
<1>定长内存池实现
<2>基数树
7、项目源码及项目总结
1、项目介绍
应用技术
什么是内存池?
想必大家看到这几个字也应该自己能想出个大概,简单来说内存池是指程序预先从操作系统申请一块足够大内存,然后自己管理。此后,当程序中需要申请内存的时候,不是直接向操作系统申请,而是直接从内存池中获取;同理,当程序释放内存的时候,并不真正将内存返回给操作系统,而是返回内存池。当程序退出(或者特定时间)时,内存池才将之前申请的内存真正释放。
内存池主要解决了效率问题,避免频繁找操作系统申请内存。 其次如果作为系统的内存分配器的角度,还需要解决一下内存碎片的问题。
那么什么是内存碎片呢?
内存碎片分为外碎片和内碎片,内碎片我们在下文项目中具体解释(这里我们简单概述一下),这里我们主要看比较容易理解的外碎片如图:
现有768Byte的空间,但是如果我们要申请超过512Byete的空间却申请不出来,因为这两块空间碎片化不连续了。
内碎片:内部碎片是由于一些对齐的需求,导致分配出去的空间中一些内存无法被利用。(具体见下文哈希桶对齐规则)
注:我们下面实现的内存池主要是是尝试解决的是外部碎片的问题,同时也尽可能的减少内部碎片的产生。
malloc
2、高并发内存池整体框架设计
- 性能问题。
- 多线程环境下,锁竞争问题。
- 内存碎片问题。
3、thread cache
thread cache是哈希桶结构,每个桶是一个按桶位置映射大小的内存块对象的自由链表。每个线程都会有一个thread cache对象,这样每个线程在这里获取对象和释放对象时是无锁的。
thread cache支持小于等于256KB内存的申请,如果我们将每种字节数的内存块都用一个自由链表进行管理的话,那么此时我们就需要20多万个自由链表,光是存储这些自由链表的头指针就需要消耗大量内存,这显然是得不偿失的。
这时我们可以选择做一些平衡的牺牲,让这些字节数按照某种规则进行对齐(见下文对齐规则),例如我们让这些字节数都按照8字节进行向上对齐(考虑到32位和64位下指针大小),那么thread cache的结构就是下面这样的,此时当线程申请1~8字节的内存时会直接给出8字节,而当线程申请9~16字节的内存时会直接给出16字节,以此类推。
通过上图分析我们需要一个自由链表来管理内存块,下面我们来对自由链表进行封装(仅写出当前一些很容易想到的接口,后序需要我们在进行添加)。
class FreeList {
private:
void* _freelist = nullptr;
size_t _size = 0; //记录链表长度
size_t _MaxSize = 1; //控制慢增长
public:
void push(void* obj) {
assert(obj);
//头插
CurNext(obj) = _freelist;
_freelist = obj;
++_size;
}
void* popFront() {
assert(_freelist);
void* cur = _freelist;
_freelist = CurNext(_freelist);
--_size;
return cur;
}
bool Empty() {
return _freelist == nullptr;
}
size_t& Size() {
return _size;
}
size_t& MaxSize() {
return _MaxSize;
}
};
<1>thread cache 哈希桶对齐规则
字节数 | 对齐数 | 哈希桶下标 | 区间桶数量 |
[1,128] | 8byte对齐 | freelist[0,16) | 16 |
[128+1,1024] | 16byte对齐 | freelist[16,72) | 56 |
[1024+1,8*1024] | 128byte对齐 | freelist[72,128) | 56 |
[8*1024+1,64*1024] | 1024byte对齐 | freelist[128,184) | 56 |
[64*1024+1,256*1024] | 8*1024byte对齐 | freelist[184,208) | 24 |
上文中我们已经提到过内碎片这个概念,我们应该尽可能减少内碎片的产生。按照上面对齐规则的话整体控制在10%左右的内碎片浪费,第一个区间我们不做考虑因为1字节就算对齐到2字节也会产生50%的空间浪费,我们从第二个区间开始计算比如我们申请130个字节实际给到的是145字节实际多给了15字节。15/145 ≈ 0.103,浪费了大概在10左右,下面几个区间大家可以自己计算下浪费率也是大概在10%左右。
有了对齐规则我们还需要计算出相应的内存对应在哪一个桶中,如上表中每个区间都有一定桶的数量如[1,128]有16个桶那么我们申请1~8字节都对应在下标0号桶中,9~16字节都对应在下标1号桶中于是我们需要一个函数来处理计算。
对齐规则和下标映射规则编写
为了后序使用方便我们,将其封装在一个类当中。
static const size_t PAGE_SHIFT = 13;
static const size_t MAX_BYTES = 256 * 1024; //最大字节数
//计算对象大小对齐规则
class SizeClass {
public:
//20 8 --> 24
//110 8 --> 112
//容易想到的
//size_t _AlignSize(size_t bytes, size_t alignNum) {
// size_t alignSize;
// if (bytes % alignNum == 0) {
// alignSize = bytes;
// }
// else {
// alignSize = (bytes / alignNum + 1) * alignNum;
// }
// return alignSize;
//}
static inline size_t _AlignSize(size_t bytes, size_t alignNum){
return ((bytes + alignNum - 1) & ~(alignNum - 1));
}
//对齐大小计算
static inline size_t AlignSize(size_t bytes) {
//assert(bytes <= MAX_BYTES);
if (bytes <= 128) {
return _AlignSize(bytes, 8);
}
else if (bytes <= 1024) {
return _AlignSize(bytes, 16);
}
else if (bytes <= 8 * 1024) {
return _AlignSize(bytes, 128);
}
else if (bytes <= 64 * 1024) {
return _AlignSize(bytes, 1024);
}
else if (bytes <= 256 * 1024) {
return _AlignSize(bytes, 8 * 1024);
}
else {
return _AlignSize(bytes, 1 << PAGE_SHIFT); //页对齐
}
}
//容易想到的
//size_t _Index(size_t bytes, size_t alignNum) {
// if (bytes % alignNum == 0) {
// return bytes / alignNum - 1; //下标从0开始
// }
// else {
// return bytes / alignNum;
// }
//}
//20 3 --> 1
//130 4 --> 8
//好的实现方法
static inline size_t _Index(size_t bytes, size_t align_shift){
return ((bytes + (1 << align_shift) - 1) >> align_shift) - 1;
}
// 计算映射的哪一个自由链表桶
static inline size_t Index(size_t bytes){
assert(bytes <= MAX_BYTES);
static int group_array[4] = { 16, 56, 56, 56 }; // 每个区间有多少个链
if (bytes <= 128) {
return _Index(bytes, 3); //传2的次方
}
else if (bytes <= 1024) {
return _Index(bytes - 128, 4) + group_array[0];
}
else if (bytes <= 8 * 1024) {
return _Index(bytes - 1024, 7) + group_array[1] + group_array[0];
}
else if (bytes <= 64 * 1024) {
return _Index(bytes - 8 * 1024, 10) + group_array[2] + group_array[1] + group_array[0];
}
else if (bytes <= 256 * 1024) {
return _Index(bytes - 64 * 1024, 13) + group_array[3] + group_array[2] + group_array[1] + group_array[0];
}
else {
assert(false);
}
return -1;
}
};
<2>Thread Cache类设计
通过上述内存申请分析,可以想到我们需要申请内存函数和释放内存函数(释放内存函数分两种情况:链表长度较短直接将对象挂在链表中,链表长度过长释放链表)以及从中心缓存(central cache)获取对象的一个函数。如下定义:
class ThreadCache {
private:
FreeList _freelists[NFREELIST];
public:
void* Allocate(size_t size); //申请内存
void Deallocate(void* ptr, size_t size); //释放内存
void ListTooLong(FreeList& list, size_t size); //链表太长释放链表
//从中心缓存获取对象
void* FetchFromCentralCache(size_t index, size_t size);
};
//TLS thread local storage --> 线程局部存储
static _declspec(thread) ThreadCache* pTLSThreadCache = nullptr;
接口实现:
//申请内存
void* ThreadCache::Allocate(size_t size) {
assert(size <= MAX_BYTES);
//传过来申请字节数计算出对齐大小(实际给到的内存大小)
size_t alignSize = SizeClass::AlignSize(size);
size_t index = SizeClass::Index(size);
if (!_freelists[index].Empty()) {
return _freelists[index].popFront();
}
else {
//从中心缓存获取对象
//...
}
}
//释放内存
void ThreadCache::Deallocate(void* ptr, size_t size) {
assert(ptr);
assert(size <= MAX_BYTES);
//找到对应桶位置进行插入
size_t index = SizeClass::Index(size);
_freelists[index].push(ptr);
//当链表长度大于一次申请的最大内存值将其还给central cache
if (_freelists[index].Size() >= _freelists[index].MaxSize()) {
ListTooLong(_freelists[index], size);
}
}
//链表太长释放链表
void ThreadCache::ListTooLong(FreeList& list, size_t size) {
//首先从原链表中将这段链表删除,接着还给中心缓存
//在freelist中增加区间删除函数
void* start = nullptr;
void* end = nullptr;
list.PopRange(start, end, list.Size());
//将链表还给中心缓存
//...
}
//从中心缓存获取对象
void* ThreadCache::FetchFromCentralCache(size_t index, size_t size) {
//慢反馈调节算法
//1、最开始不会一次向central cache批量要太多,因为要太多有可能会用不完
//2、如果不断size大小内存需求,那么batchNum就会不断增长直到上限
//3、size越小一次向central cache要的batchNum越小
//4、size越大一次向central cache要的batchNum越大
size_t batchNum = std::min(_freelists[index].MaxSize(), SizeClass::NumMoveSize(size));
if (_freelists[index].MaxSize() == batchNum) {
_freelists[index].MaxSize() += 1;
}
//调用cnetral cache中获取对象接口
//...
}
线程局部存储TLS(Thread Local Storage)
要实现每个线程无锁的访问属于自己的thread cache,我们需要用到线程局部存储TLS(Thread Local Storage),这是一种变量的存储方法,使用该存储方法的变量在它所在的线程是全局可访问的,但是不能被其他线程访问到,这样就保持了数据的线程独立性。但不是每个线程被创建时就立马有了属于自己的thread cache,而是当该线程调用相关申请内存的接口时才会创建自己的thread cache。
在FreeList类中添加PopRange函数
void PopRange(void*& start, void*& end, size_t n) {
assert(n <= _size);
start = _freelist;
end = start;
for (int i = 0; i < n - 1; i++) {
end = CurNext(end);
}
_freelist = CurNext(end);
CurNext(end) = nullptr;
_size -= n;
}
在SizeClass类中添加NumMoveSize函数
// 一次thread cache从中心缓存获取多少个
//也就是计算可以给到你几个对象
//其中上限512,下限2也可以理解为限制桶中链表的长度
static size_t NumMoveSize(size_t size)
{
assert(size > 0);
// [2, 512],一次批量移动多少个对象的(慢启动)上限值
//对象越小,计算出的上限越高
//对象越大,计算出的上限越低
int num = MAX_BYTES / size;
if (num < 2)
num = 2;
if (num > 512)
num = 512;
return num;
}
4、Central Cache
<1>Central Cache类设计
根据上图我们可以知道central cache也是一个哈希桶结构,通中挂的是一个个的span并且是双链表,而sapn中又包含了freellist,如下定义:
首先定义一个SpanNode的节点来表示一个个的Span对象
struct SpanNode {
PAGE_ID _pageId = 0; //大块内存起始页号
size_t _n = 0; //页的数量
SpanNode* _next = nullptr;
SpanNode* _prev = nullptr;
void* _freeList = nullptr; //自由链表
size_t _size = 0; //切好小对象大小
size_t _useCount = 0; //分配给thread cache小块内存数量
bool _isUse = false; //是否正在被使用
};
接着实现一个双链表结构用来将Span挂接起来
class SpanList {
private:
SpanNode* _head = nullptr; //头节点
std::mutex _mtx; //桶锁
public:
SpanList() {
_head = new SpanNode;
_head->_next = _head;
_head->_prev = _head;
}
~SpanList() {
delete _head;
}
std::mutex& getMutex() {
return _mtx;
}
SpanNode* Begin() {
return _head->_next;
}
SpanNode* end() {
return _head;
}
bool Empty() {
return _head->_next == _head;
}
void Insert(SpanNode* pos, SpanNode* newSpan) {
assert(pos && newSpan);
SpanNode* prev = pos->_prev;
prev->_next = newSpan;
newSpan->_prev = prev;
pos->_prev = newSpan;
newSpan->_next = pos;
}
void Erase(SpanNode* pos) {
assert(pos);
assert(pos != _head);
SpanNode* prev = pos->_prev;
SpanNode* next = pos->_next;
prev->_next = next;
next->_prev = prev;
}
SpanNode* PopFront() {
SpanNode* ret = _head->_next;
Erase(ret);
return ret;
}
void PushFront(SpanNode* spanNode) {
Insert(Begin(), spanNode);
}
};
Central Cache类定义
//单例模式(懒汉)
class CentralCache {
private:
SpanList _SpanLists[NFREELIST];
private:
CentralCache() {}
CentralCache(const CentralCache&) = delete;
static CentralCache _sInst; //类外初始化
public:
static CentralCache* GetInStance() {
return &_sInst;
}
// 获取一个非空的spanNode
SpanNode* GetOneSpan(SpanList& list, size_t size);
// 从中心缓存获取一定数量的对象给thread cache
size_t FetchRangeObj(void*& start, void*& end, size_t batchNum, size_t size);
// 将一定数量的对象释放到span
void ReleaseListToSpans(void* start, size_t byte_size);
};
接口实现:
// 获取一个非空的spanNode
SpanNode* CentralCache::GetOneSpan(SpanList& list, size_t size) {
//查看当前桶中的每个SpanNode节点是否有未分配的对象
SpanNode* it = list.Begin();
while (it != list.end()) {
if (it->_freeList != nullptr) {
return it;
}
else {
it = it->_next;
}
}
//走到这里说明当前桶中每个节点没有未分配的对象了,找pageCache要
//...
}
// 从中心缓存获取一定数量的对象给thread cache
size_t CentralCache::FetchRangeObj(void*& start, void*& end, size_t batchNum, size_t size) {
size_t index = SizeClass::Index(size);
//这里会涉及多个线程同时从中心缓存获取内存的情况,故应该加锁
_SpanLists[index].getMutex().lock();
SpanNode* spanNode = GetOneSpan(_SpanLists[index], size);
assert(spanNode);
assert(spanNode->_freeList);
//从spanNode中获取batchNum个对象,如果不够有多少拿多少
start = spanNode->_freeList;
end = start;
int i = 0;
int ActualNum = 1;
while (i < batchNum - 1 && CurNext(end) != nullptr) {
end = CurNext(end);
++i;
++ActualNum;
}
//从spanNode将这段链表删除,并统计出该spanNode中自由链表已经使用的数量
spanNode->_freeList = CurNext(end);
CurNext(end) = nullptr;
spanNode->_useCount += ActualNum;
_SpanLists[index].getMutex().unlock();
return ActualNum;
}
关于当链表太长时要将内存还给SpanNode时情况稍微有点复杂,需要好好思考一下。因为还回来的链表中的每个节点的地址我们没办法确定是否连续的。这就导致有可能换回来一个链表但其中的节点分布于_SpanList的多个SpanNode节点中,这就需要对其筛选让其进入不同的桶中(这部分代码我们放到Page Cache中来实现,因为Central Cache中的SpanNode节点都是Page Cache分配给他的),这里我们先把大概逻辑顺一下如下图:
测试代码如下:
void TestAddressPage() {
PAGE_ID id1 = 3000;
PAGE_ID id2 = 3001;
char* p1 = (char*)(id1 << PAGE_SHIFT);
char* p2 = (char*)(id2 << PAGE_SHIFT);
while (p1 < p2) {
cout << (void*)p1 << " : " << ((PAGE_ID)p1 >> PAGE_SHIFT) << endl;
p1 += 8;
}
}
执行结果:
由上图我们可以看出,事实和我们的推论是一样的。释放内存代码如下图:
//将一定数量的对象释放到span
void CentralCache::ReleaseListToSpans(void* start, size_t byte_size) {
size_t index = SizeClass::Index(byte_size);
_SpanLists[index].getMutex().lock();
//将传过来的链表一个个头插进对应的spanNode中
while (start) {
void* next = CurNext(start);
//通过链表每个节点的地址来获取对应的spanNode地址
SpanNode* SpanNode = PageCache::GetInstance()->MapObjectToSpan(start);
CurNext(start) = SpanNode->_freeList;
SpanNode->_freeList = start;
SpanNode->_useCount--; //每头插回来一个已使用数量减一
//当SpanNode已使用数量为0时说明该节点中的自由链表节点都还回来了,继续归还给PageCache处理
if (SpanNode->_useCount == 0) {
//归还给PageCache
//...
}
start = next;
}
_SpanLists[index].getMutex().unlock();
}
5、page cache
首先,central cache的映射规则与thread cache保持一致,而page cache的映射规则与它们都不相同。page cache的哈希桶映射规则采用的是直接定址法,比如1号桶挂的都是1页的span,2号桶挂的都是2页的span,以此类推。
其次,central cache每个桶中的span被切成了一个个对应大小的对象,以供thread cache申请。而page cache当中的span是没有被进一步切小的,因为page cache服务的是central cache,当central cache没有span时,向page cache申请的是某一固定页数的span,而如何切分申请到的这个span就应该由central cache自己来决定。
注:上图中1page和3page桶下面挂的链在最开始的时候是没有的他们都是由128page(我们每次向系统申请的是固定大小128page)切分后挂到上面的(切分逻辑见下文代码)。
<1>Page Cache类设计
通过上面的分析我们可以知道Page Cache和Central Cache一样是被多个线程共享的故也应该将其设计为单例模式,上面在讲Central Cache释放逻辑是提到我们要根据自由链的节点地址找到其对应SpanNode节点故我们还应该添加其映射关系这里我们采用unordered_map进行映射。此外我们还需要对Central Cache提供对象的接口、回收SpanNode的接口、获取映射关系的接口,如下定义:
static const size_t NPAGES = 129; //下标是从0开始的
class PageCache {
private:
SpanList _pageLists[NPAGES];
std::unordered_map<PAGE_ID, SpanNode*> _idSpanNodeMap;
std::mutex _PageMtx;
private:
PageCache() {};
PageCache(const PageCache&) = delete;
static PageCache _sInst;
public:
static PageCache* GetInstance() {
return &_sInst;
}
std::mutex& GetMutex() {
return _PageMtx;
}
//获取n页SpanNode
SpanNode* NewSpan(size_t n);
//获取从对象到SpanNode的映射
SpanNode* MapObjectToSpanNode(void* obj);
//释放空闲SpanNode回到PageCache,并合并相邻的SpanNode
void ReleaseSpanNodeToPageCache(SpanNode* SpanNode);
};
代码实现:
当Page Cache中无内存时我们需要向系统取申请内存,故我们应该提供一个向系统申请内存的接口,如下
windows和Linux下如何直接向堆申请页为单位的大块内存:
// 去堆上按页申请空间
inline static void* SystemAlloc(size_t kpage)
{
#ifdef _WIN32
void* ptr = VirtualAlloc(0, kpage << 13, MEM_COMMIT | MEM_RESERVE, PAGE_READWRITE);
#else
// linux下brk mmap等
#endif
if (ptr == nullptr)
throw std::bad_alloc();
return ptr;
}
inline static void SystemFree(void* ptr)
{
#ifdef _WIN32
VirtualFree(ptr, 0, MEM_RELEASE);
#else
// linux下sbrk unmmap等
#endif
}
static const size_t NPAGES = 129; //下标是从0开始的
PageCache PageCache::_sInst;
//获取n页SpanNode
SpanNode* PageCache::NewSpan(size_t n) {
assert(n > 0);
//大于128页直接向堆申请内存
if (n > NPAGES - 1) {
void* ptr = SystemAlloc(n);
SpanNode* spanNode = new SpanNode;
spanNode->_pageId = (PAGE_ID)ptr >> PAGE_SHIFT; //右移13位相当于除8k
spanNode->_n = n;
_idSpanNodeMap[spanNode->_pageId] = spanNode; //建立映射方便释放内存
return spanNode;
}
//检查对应桶中是否有sapnNode
if (!_pageLists[n].Empty()) {
SpanNode* nSpanNode = _pageLists[n].PopFront();
//将其中节点建立映射关系映射
for (PAGE_ID i = 0; i < nSpanNode->_n; i++) {
_idSpanNodeMap[nSpanNode->_pageId + i] = nSpanNode;
}
return nSpanNode;
}
//检查后面桶中是否有SpanNode节点,有则进行切分
for (size_t i = n + 1; i < NPAGES; i++) {
if (!_pageLists[i].Empty()) {
//找到第i个桶不为空时先将其取出,切分出n个后在放回相应桶中
SpanNode* ISpanNode = _pageLists[i].PopFront();
//开辟出一个节点进行切分,然后返回
SpanNode* NSpanNode = new SpanNode;
NSpanNode->_n = n;
NSpanNode->_pageId = ISpanNode->_pageId;
ISpanNode->_pageId += n;
ISpanNode->_n -= n;
_pageLists[ISpanNode->_n].PushFront(ISpanNode);
//存储ISpanNode起始页号映射方便回收
_idSpanNodeMap[ISpanNode->_pageId] = ISpanNode;
_idSpanNodeMap[ISpanNode->_pageId + ISpanNode->_n - 1] = ISpanNode;
for (PAGE_ID i = 0; i < NSpanNode->_n; i++) {
_idSpanNodeMap[NSpanNode->_pageId + i] = NSpanNode;
}
return NSpanNode;
}
}
//走到这里说明后面的桶中没有剩余的SpanNode节点,找系统申请一页
SpanNode* bigSpanNode = new SpanNode;
void* ptr = SystemAlloc(NPAGES - 1);
bigSpanNode->_pageId = (PAGE_ID)ptr >> PAGE_SHIFT;
bigSpanNode->_n = NPAGES - 1;
_pageLists[bigSpanNode->_n].PushFront(bigSpanNode); //将申请的大页span放入哈希桶中
return NewSpan(n); //重新调用进行切分
}
//获取从对象到SpanNode的映射
SpanNode* PageCache::MapObjectToSpanNode(void* obj) {
PAGE_ID id = (PAGE_ID)obj >> PAGE_SHIFT;
std::unique_lock<std::mutex> lock(PageCache::GetInstance()->GetMutex()); //出作用域自动解锁
auto ret = _idSpanNodeMap.find(id);
if (ret != _idSpanNodeMap.end()) {
//找到了进行返回
return ret->second;
}
else {
//找不到说明出问题了直接报错
assert(false);
return nullptr;
}
}
//释放空闲SpanNode回到PageCache,并合并相邻的SpanNode
void PageCache::ReleaseSpanNodeToPageCache(SpanNode* spanNode) {
//大于128页直接还给堆
if (spanNode->_n > NPAGES - 1) {
//根据页号算出响应的地址,然后进行释放
void* ptr = (void*)(spanNode->_pageId << PAGE_SHIFT);
SystemFree(ptr);
return;
}
//向前合并
while (1) {
PAGE_ID prev = spanNode->_pageId - 1;
//找不到跳出循环
auto ret = _idSpanNodeMap.find(prev);
if (ret == _idSpanNodeMap.end()) {
break;
}
//prevSpanNode正在被使用跳出循环
SpanNode* prevSpanNode = ret->second;
if (prevSpanNode->_isUse == true) {
break;
}
//合并出超出128kb的spanNode不进行管理
if (prevSpanNode->_n + spanNode->_n > NPAGES - 1) {
break;
}
//进行合并
spanNode->_pageId = prevSpanNode->_pageId;
spanNode->_n += prevSpanNode->_n;
_pageLists[prevSpanNode->_n].Erase(prevSpanNode);
delete prevSpanNode;
}
//向后合并
while (1) {
PAGE_ID next = spanNode->_pageId - 1;
//找不到跳出循环
auto ret = _idSpanNodeMap.find(next);
if (ret == _idSpanNodeMap.end()) {
break;
}
//prevSpanNode正在被使用跳出循环
SpanNode* nextSpanNode = ret->second;
if (nextSpanNode->_isUse == true) {
break;
}
//合并出超出128kb的spanNode不进行管理
if (nextSpanNode->_n + spanNode->_n > NPAGES - 1) {
break;
}
//进行合并
spanNode->_n += nextSpanNode->_n;
_pageLists[nextSpanNode->_n].Erase(nextSpanNode);
delete nextSpanNode;
}
//放回哈希桶中
_pageLists[spanNode->_n].PushFront(spanNode);
spanNode->_isUse = false;
存储ISpanNode起始页号映射方便回收
_idSpanNodeMap[spanNode->_pageId] = spanNode;
_idSpanNodeMap[spanNode->_pageId + spanNode->_n - 1] = spanNode;
}
6、性能分析
参考了一段别人的测试代码,进行测试: 测试代码链接
这个时候不要去盲目改代码,我们可以在VS中找到性能分析来分析我们的程序看看是什么原因导致的,如下图:
由此我们可以看出,我们大部分时间都浪费在锁上面了而且是map映射的时候,那么有什么办法呢?
其实我们除了上面的锁浪费时间外,还有程序中的new我们可以替换成一个定长内存池(因为其效率比new要高一些,大家可以自行测试一下)。
<1>定长内存池实现
malloc其实就是一个通用的内存池,在什么场景下都可以使用,但这也意味着malloc在什么场景下都不会有很高的性能,因为malloc并不是针对某种场景专门设计的。定长内存池就是针对固定大小内存块的申请和释放的内存池,由于定长内存池只需要支持固定大小内存块的申请和释放,因此我们性能方面要比malloc高一些,并且在实现定长内存池时不需要考虑内存碎片等问题,因为我们申请/释放的都是固定大小的内存块。
代码如下:
template<class T>
class FiedMemoryPool {
private:
char* _memory = nullptr; //指向大块内存的指针
void* _freeList = nullptr;//还内存链接自由链表头指针
size_t _residueBytes = 0; //剩余字节数
public:
T* New() {
T* obj = nullptr;
//优先把还回来的对象重复利用
if (_freeList) {
void* next = *((void**)_freeList);
obj = (T*)_freeList;
_freeList = next;
}
else {
//剩余内存不够一个对象大小时重新开辟内存
if (_residueBytes < sizeof(T)) {
_residueBytes = 128 * 1024;
_memory = (char*)SystemAlloc(_residueBytes >> PAGE_SHIFT);
if (_memory == nullptr) {
throw std::bad_alloc();
}
}
//给目标分配内存
obj = (T*)_memory;
size_t objSize = sizeof(T) < sizeof(void*) ? sizeof(void*) : sizeof(T); //至少开辟一个指针大小
_memory += objSize;
_residueBytes -= objSize;
}
//用定位new显示调用其构造函数
new (obj)T;
return obj;
}
void Delete(T* obj) {
//显示调用其析构函数
obj->~T();
//头插进自由链表
*(void**)obj = _freeList;
_freeList = obj;
}
};
有了定长内存池我们只需要将代码中new出来的对象替换成用定长内存池来申请就可以了(new 底层依然是调用malloc)由于这部分替换起来比较简单就不在详细讲述了,具体代码参考文章末尾项目源代码。
<2>基数树
基数树之所以能够提升效率是因为基数树在使用时是不用加锁的,基数树的空间一旦开辟好了就不会发生变化,因此无论什么时候去读取某个页的映射,都是对应在一个固定的位置进行读取的。并且我们不会同时对同一个页进行读取映射和建立映射的操作,因为我们只有在释放对象时才需要读取映射。
基数树代码链接
有了基数树我们只需要将PageCache中unorder_map定义的对象用基数树来定义就可以了,具体代码实现参考文章末尾源代码实现。
使用上面两种方法对项目进行优化后在进行测试,如图:
由上图很明显可以看出,此时我们实现的内存池在并发环境下效率更胜一筹。
7、项目源码
项目源码
项目到这里就算完成了,感觉有用的话期待大家点赞关注,项目中有哪些地方有疑问的话欢迎大家评论留言。