🎉项目:高并发内存池
博主主页:桑榆非晚ᴷ
博主能力有限,如果有出错的地方希望大家不吝赐教
给自己打气:成功没有快车道,幸福没有高速路。所有的成功,都来自不倦地努力和奔跑,所有的幸福都来自平凡的奋斗和坚持🥰🎉✨
高并发内存池
- 🎉项目:高并发内存池
- 一、项目介绍
- 1.1 项目内容
- 1.2 技术栈
- 二、内存池定义
- 2.1 池化技术
- 2.2 内存池
- 2.3 内存池带来的好处
- 2.4 malloc
- 三、项目编写
- 3.1 开胃菜---定长内存池的设计
- 3.2 高并发内存池整体框架设计
- 3.2.1 高并发内存池--thread cache
- 3.2.2 高并发内存池--central cache
- 3.2.3 高并发内存池--page cache
- 3.2.4 项目源码
- 四、项目的性能分析及其优化
- 4.1 项目瓶颈分析
- 4.2 项目性能优化
- 4.2.1 使用基数树进行优化
一、项目介绍
1.1 项目内容
当前项目是实现一个高并发的内存池,他的原型是google的一个开源项目tcmalloc,tcmalloc全称 Thread-CachingMalloc,即线程缓存的malloc,实现了高效的多线程内存管理,用于替代系统的内存分配相关的函数(malloc、free)。
我们这个项目是把tcmalloc最核心的框架简化后拿出来,模拟实现出一个自己的高并发内存池,目的就是学习tcmalloc的精华,学习tcmlloc的这种框架设计。这种方式有点类似我们之前学习STL容器的方式。但是相比STL容器部分,tcmalloc的代码量和复杂度上升了很多,大家要有心理准备。当前另一方面,难度的上升,我们的收获和成长也是在这个过程中同步上升。
tcmalloc源码
1.2 技术栈
这个项目会用到C/C++、C++11、数据结构(链表、哈希桶、基数树)、操作系统内存管理、单例模式、多线程、互斥锁等等方面的知识。
二、内存池定义
2.1 池化技术
所谓“池化技术”,就是程序先向系统申请过量的资源,然后自己管理,以备不时之需。之所以要申请过量的资源,是因为每次申请该资源都有较大的开销,不如提前申请好了,这样使用时就会变得非常快捷,大大提高程序运行效率。
在计算机中,有很多使用“池”这种技术的地方,除了内存池,还有连接池、线程池、对象池等。以服务器上的线程池为例,它的主要思想是:先启动若干数量的线程,让它们处于睡眠状态,当接收到客户端的请求时,唤醒池中某个睡眠的线程,让它来处理客户端的请求,当处理完这个请求,线程又进入睡眠状态,这样一来就可以提高程序的运行效率。
2.2 内存池
内存池是指程序预先从操作系统申请一块足够大内存。此后,当程序中需要申请内存的时候,不是直接向操作系统申请,而是直接从内存池中获取;同理,当程序释放内存的时候,并不真正将内存返回给操作系统,而是返回内存池。当程序退出(或者特定时间)时,内存池才将之前申请的内存真正释放。
2.3 内存池带来的好处
内存池主要解决的当然是效率的问题,其次如果作为系统的内存分配器的角度,还需要解决一下内存碎片的问题。那么什么是内存碎片呢?
再需要补充说明的是内存碎片分为外碎片和内碎片,上图体现的是外碎片问题。外部碎片是一些空闲的小的连续内存区域,这些内存空间不连续,以至于合计的内存足够,但是不能满足一些的内存分配申请需求(比如向上图的问题)。内部碎片是由于一些对齐的需求,导致分配出去的空间中一些内存无法被利用。内碎片问题,我们后面项目就会看到,那会再进行更准确的理解。
2.4 malloc
C/C++中我们要动态申请内存都是通过malloc去申请内存,但是我们要知道,实际我们不是直接去堆获取内存的,而malloc就是一个内存池。malloc() 相当于向操作系统“批发”了一块较大的内存空间,然后“零售”给程序用。当全部“售完”或程序有大量的内存需求时,再根据实际需求向操作系统“进货”。malloc的实现方式有很多种,一般不同编译器平台用的都是不同的。比如windows的vs系列用的微软自己写的一套,linux gcc用的glibc中的ptmalloc。下面有几篇关于这块的文章,大概可以去简单看看了解一下,关于ptmalloc,学完我们的项目以后,有兴趣大家可以去看看他的实现细节。
malloc的底层实现ptmalloc
三、项目编写
3.1 开胃菜—定长内存池的设计
用来描述定长内存池的三个属性分别为用来存储向系统申请内存空间的首地址_memory,上层向内存池申请定长内存后内存池还剩下的内存大小_leftBytes,最后就是上层free后要把对象内存空间挂接到自由链表_freeList当中,如下图,_freeList的前4个或者前8字节保存的就是下一个释放后对象内存的地址,那么如果上层要申请内存的时候优先去_freeList当中去获取对象内存,只有当_freeList为空的时候,才去_memory当中去申请对象内存。下面简单实现一下这段逻辑。
windows和Linux下如何直接向堆申请大块内存:
windows下向堆直接申请空间的接口
Linux下向堆直接申请空间的接口
#include <iostream>
#include <vector>
#include <ctime>
using std::cout;
using std::endl;
#ifdef _WIN32
#include <Windows.h>
#else
#include <unistd.h>
#endif
// 向操作系统直接申请空间
inline static void *SystemAlloc(size_t npage)
{
#ifdef _WIN32
void *ptr = VirtualAlloc(0, npage << 13, MEM_COMMIT | MEM_RESERVE, PAGE_READWRITE);
#else
void *ptr = sbrk(npage << 13);
#endif
if (ptr == nullptr || ptr == (void *)-1)
{
throw std::bad_alloc();
}
return ptr;
}
template<class T>
class ObjectPool
{
public:
T *New()
{
T *obj = nullptr;
// 优先把归还回来的内存块对象,再次重复利用
if (_freeList)
{
// 单链表的头删
void *next = *static_cast<void **>(_freeList);
obj = reinterpret_cast<T *>(_freeList);
_freeList = next;
}
else
{
// 保证一个对象的大小至少是一个指针的大小
size_t objSize = sizeof(T) < sizeof(void *) ? sizeof(void *) : sizeof(T);
// 防止剩下的空间不够开辟一个对象内存大小而导致内存越界
if (_remainBytes < objSize)
{
//_memory = static_cast<char *>(malloc(128 * 1024));
_memory = static_cast<char *>(SystemAlloc(8));
if (nullptr == _memory)
throw std::bad_alloc();
_remainBytes = 8 << 13;
}
obj = reinterpret_cast<T *>(_memory);
_memory += objSize;
_remainBytes -= objSize;
}
// 定位new,显示调用T的构造函数初始化
new(obj)T;
return obj;
}
void Delete(T *obj)
{
// 显示调用析构函数清理对象
obj->~T();
// 将归还回来的内存块头插到_freeList当中
// 解决32位和64位平台下指针大小不同的问题
*(reinterpret_cast<void **>(obj)) = _freeList;
_freeList = obj;
}
private:
// 为什么不用void*或者其他类型的指针呢?因为void*类型的指针进行++,--,解引用操作不确定
// 其他类型指针不如char*指针使用灵活
char *_memory = nullptr; // 指向大块内存的指针
size_t _remainBytes = 0; // 大块内存在切分过程中剩余的字节数
void *_freeList = nullptr; // 还回来过程中链接到自由链表的头指针
};
3.2 高并发内存池整体框架设计
concurrent memory pool主要由以下3个部分构成:
- thread cache:线程缓存是每个线程独有的,用于小于256KB的内存的分配,线程从这里申请内存不需要加锁,每个线程独享一cache,这也就是这个并发线程池高效的地方。
- central cache:中心缓存是所有线程所共享,thread cache是按需从central cache中获取的对象。central cache合适的时机回收thread cache中的对象,避免一个线程占用了太多的内存,而其他线程的内存吃紧,达到内存分配在多个线程中更均衡的按需调度的目的。central cache是存在竞争的,所以从这里取内存对象是需要加锁,首先这里用的是桶锁,其次只有thread cache的没有内存对象时才会找central cache,所以这里竞争不会很激烈。
- page cache:页缓存是在central cache缓存上面的一层缓存,存储的内存是以页为单位存储及分配的,central cache没有内存对象时,从page cache分配出一定数量的page,并切割成定长大小的小块内存,分配给central cache。当一个span的几个跨度页的对象都回收以后,page cache会回收central cache满足条件的span对象,并且合并相邻的页,组成更大的页,缓解内存碎片的问题。
3.2.1 高并发内存池–thread cache
thread cache图像模型:这里按照不同的内存区间相应的对齐数大小也不同,这样就会很好的减低内碎片的浪费,把内碎片的浪费尽量控制再10%左右,下面位不同内存区间的对齐规则。
// 整体控制在最多10%左右的内碎片浪费
// [1,128] 8byte对齐 freelist[0,16)
// [128+1,1024] 16byte对齐 freelist[16,72)
// [1024+1,8*1024] 128byte对齐 freelist[72,128)
// [8*1024+1,64*1024] 1024byte对齐 freelist[128,184)
// [64*1024+1,256*1024] 8*1024byte对齐 freelist[184,208)
由于区间[1,128]其所开的空间太小,所以它的内碎片浪费不能得到很好得解决,除非将对齐数减小,但是这里为了兼容64位和32位的操作系统选择了8字节对齐。下面来计算一下各个区间内碎片浪费的比率。
- [1,128] 8byte对齐,内碎片内存浪费比率:假设开了1byte的内存,由于对齐数为8byte,所以还为其的多开辟7个byte,而这7个byte就是由于内存对齐而造成的内碎片问题,所以这段区间的最大内碎片浪费为7 / 8= 87.5%,而且该区间内一共有128 / 8 = 16个自由链表桶,对应哈希表的[0,16)桶。
- [129, 1024] 18byte对齐,内碎片内存浪费比率:假设开辟129byte的内存,由于对齐数为16byte,所以还要为其多开辟15个byte,而这15个byte就是由于内存对齐而造成的内存碎片问题,所以这个段区间的最大内碎片浪费为15 / 144 = 10.4%,而且该区间内一共有 (1024 - 129 + 1)/ 16 = 56个自由链表桶,对应哈希表的[16, 72)桶。
- [1025, 8 * 1024] 128byte对齐,内碎片内存浪费比率:假设开辟1025byte的内存,由于对齐数为128byte,所以还要为其多开辟127个byte,而这127个byte就是由于内存对齐而造成的内存碎片问题,所以这个段区间的最大内碎片浪费为127 / 1152 = 11.0%,而且该区间内一共有 ( 8*1024 - 1025 + 1)/ 128 = 56个自由链表桶,对应哈希表的[72, 128)桶。
- [8 * 1024 + 1,64 * 1024] 1024byte对齐,内碎片内存浪费比率:假设开辟8 * 1024 + 1byte的内存,由于对齐数为1024byte,所以还要为其多开辟1023byte,而这1023byte就是由于内存对齐而造成的内存碎片问题,所以这个段区间的最大内碎片浪费为1023 / 9 * 1024 = 11.1%,而且该区间内一共有(64 * 1024 - 8 * 1024 - 1 + 1) / 1024 = 56个自由链表,对应哈希表[128, 184)桶。
- [64 * 1024 + 1, 256 * 1024] 8 * 1024byte对齐,内碎片内存浪费比率:假设开辟128 * 1024 + 1byte的内存,由于对齐数为8 * 1024byte,所以还要为其多开辟8 * 1024 - 1byte,而这8 * 1024 - 1byte就是由于内存对齐而造成的内存碎片问题,所以这个段区间的最大内碎片浪费为8 * 1024 - 1/ 72* 1024 = 11.1%,而且该区间内一共有(256 * 1024 - 64 * 1024 - 1 + 1) / 8 * 1024 = 24个自由链表,对应哈希表[184, 208)桶。
申请内存:
- 当内存申请size<=256KB,先获取到线程本地存储的thread cache对象,计算size映射的哈希桶自由链表下标i。
- 如果自由链表_freeLists[i]中有对象,则直接Pop一个内存对象返回。
- 如果_freeLists[i]中没有对象时,则批量从central cache中获取一定数量的对象,插入到自由链表并返回一个对象。
释放内存:
- 当释放内存小于256k时将内存释放回thread cache,计算size映射自由链表桶位置i,将对象Push到_freeLists[i]。
- 当链表的长度过长,则回收一部分内存对象到central cache。
class FreeList
{
public:
// 将单个对象内存以头插的方式插入到自由链表当中
void Push(void* obj)
{}
// 将一定数量的对象内存以头插的方式插入到自由链表当中
void PushRange(void* start, void* end, size_t n)
{}
// 将单个对象内存以头删的方式从自由链表当中删除
void* Pop()
{}
// 将多个对象内存以头删的方式从自由链表当中删除
void PopRange(void*& start, void*& end, size_t n)
{}
// 判断自由链表是非为空
bool Empty()
{}
// 获取一次向central cache申请对象的最大个数
size_t& MaxSize()
{}
// 获取自由链表中对象内存的个数
size_t Size()
{}
private:
void* _freeList = nullptr; // 自由链表
size_t _maxSize = 1; // 一次向central cache申请对象的最大个数
size_t _size = 0; // 自由链表中对象内存的个数
};
// thread cache本质是由一个哈希映射的对象自由链表构成,如上图所示
class ThreadCache
{
public:
// 申请和释放内存对象
void* Allocate(size_t size);
void Deallocate(void* ptr, size_t size);
// 从中心缓存获取对象
void* FetchFromCentralCache(size_t index, size_t size);
// 释放对象时,链表过长时,回收内存回到中心缓存
void ListTooLong(FreeList& list, size_t size);
private:
// NFREELIST 是自由链表的个数,有由上面分析一共有208个自由链表,也就是有208个哈希桶
FreeList _freeLists[NFREELIST];
};
自由链表的哈希桶跟对象大小的映射关系
// 小于等于MAX_BYTES,就找thread cache申请
// 大于MAX_BYTES,就直接找page cache或者系统堆申请
static const size_t MAX_BYTES = 256 * 1024;
// thread cache 和 central cache自由链表哈希桶的表个数
static const size_t NFREELISTS = 208;
// page cache 管理span list哈希表大小
static const size_t NPAGES = 129;
// 页大小转换偏移, 即一页定义为2^13,也就是8KB
static const size_t PAGE_SHIFT = 13;
// 地址大小类型,32位下是4byte类型,64位下是8byte类型
#ifdef _WIN32
typedef size_t ADDRES_INT;
#else
typedef unsigned long long ADDRES_INT;
#endif // _WIN32
// 页编号类型,32位下是4byte类型,64位下是8byte类型
#ifdef _WIN32
typedef size_t PageID;
#else
typedef unsigned long long PageID;
#endif // _WIN32
// 获取内存对象中存储的头4 or 8字节值,即链接的下一个对象的地址
inline void*& NextObj(void* obj)
{
return *((void**)obj);
}
// 管理对齐和映射等关系
class SizeClass
{
public:
// 整体控制在最多10%左右的内碎片浪费
// [1,128] 8byte对齐 freelist[0,16)
// [128+1,1024] 16byte对齐 freelist[16,72)
// [1024+1,8*1024] 128byte对齐 freelist[72,128)
// [8*1024+1,64*1024] 1024byte对齐 freelist[128,184)
// [64*1024+1,256*1024] 8*1024byte对齐 freelist[184,208)
static inline size_t _RoundUp(size_t bytes, size_t align)
{
return (((bytes)+align - 1) & ~(align - 1));
}
// 对齐大小计算
static inline size_t RoundUp(size_t bytes)
{
if (bytes <= 128){
return _RoundUp(bytes, 8);
}
else if (bytes <= 1024){
return _RoundUp(bytes, 16);
}
else if (bytes <= 8 * 1024){
return _RoundUp(bytes, 128);
}
else if (bytes <= 64 * 1024){
return _RoundUp(bytes, 1024);
}
else if (bytes <= 256 * 1024){
return _RoundUp(bytes, 8 * 1024);
}
else{
return _RoundUp(bytes, 1 << PAGE_SHIFT);
}
return -1;
}
static inline size_t _Index(size_t bytes, size_t align_shift)
{
return ((bytes + (1 << align_shift) - 1) >> align_shift) - 1;
}
// 计算映射的哪一个自由链表桶
static inline size_t Index(size_t bytes)
{
assert(bytes <= MAX_BYTES);
// 每个区间有多少个链
static int group_array[4] = { 16, 56, 56, 56 };
if (bytes <= 128){
return _Index(bytes, 3);
}
else if (bytes <= 1024){
return _Index(bytes - 128, 4) + group_array[0];
}
else if (bytes <= 8 * 1024){
return _Index(bytes - 1024, 7) + group_array[1] + group_array[0];
}
else if (bytes <= 64 * 1024){
return _Index(bytes - 8 * 1024, 10) + group_array[2] + group_array[1]
+ group_array[0];
}
else if (bytes <= 256 * 1024){
return _Index(bytes - 64 * 1024, 13) + group_array[3] +
group_array[2] + group_array[1] + group_array[0];
}
else{
assert(false);
}
return -1;
}
// 一次从中心缓存获取多少个
static size_t NumMoveSize(size_t size)
{
if (size == 0)
return 0;
// [2, 512],一次批量移动多少个对象的(慢启动)上限值
// 小对象一次批量上限高
// 小对象一次批量上限低
int num = MAX_BYTES / size;
if (num < 2)
num = 2;
if (num > 512)
num = 512;
return num;
}
// 计算一次向系统获取几个页
// 单个对象 8byte
// ...
// 单个对象 256KB
static size_t NumMovePage(size_t size)
{
size_t num = NumMoveSize(size);
size_t npage = num*size;
npage >>= PAGE_SHIFT;
if (npage == 0)
npage = 1;
return npage;
}
};
3.2.2 高并发内存池–central cache
central cache也是一个哈希桶结构,他的哈希桶的映射关系跟thread cache是一样的。不同的是他的每个哈希桶位置挂是SpanList链表结构,不过每个映射桶下面的span中的大内存块被按映射关系切成了一个个小内存块对象挂在span的自由链表中。
申请内存:
- 当thread cache中没有内存时,就会批量向central cache申请一些内存对象,这里的批量获取对 象的数量使用了类似网络tcp协议拥塞控制的慢开始算法;central cache也有一个哈希映射的 spanlist,spanlist中挂着span,从span中取出对象给thread cache,这个过程是需要加锁的,不过这里使用的是一个桶锁,尽可能提高效率。
- central cache映射的spanlist中所有span的都没有内存以后,则需要向page cache申请一个新的 span对象,拿到span以后将span管理的内存按大小切好作为自由链表链接到一起。然后从span 中取对象给thread cache。
- central cache的中挂的span中use_count记录分配了多少个对象出去,分配一个对象给thread cache,就++use_count。
释放内存:
- 当thread_cache过长或者线程销毁,则会将内存释放回central cache中的,释放回来时–use_count。当use_count减到0时则表示所有对象都回到了span,则将span释放回page cache,page cache中会对前后相邻的空闲页进行合并。
// 单例模式 饿汉模式
class CentralCache
{
public:
static CentralCache* GetInstance()
{
return &_sInst;
}
// 获取一个非空的span
Span* GetOneSpan(SpanList& list, size_t byte_size);
// 从中心缓存获取一定数量的对象给thread cache
size_t FetchRangeObj(void*& start, void*& end, size_t batchNum, size_t
size);
// 将一定数量的对象释放到span跨度
void ReleaseListToSpans(void* start, size_t byte_size);
private:
SpanList _spanLists[NFREELIST]; //
private:
CentralCache()
{}
CentralCache(const CentralCache&) = delete;
static CentralCache _sInst;
};
以页为单位的大内存管理span的定义及spanlist定义:
// Span管理一个跨度的大块内存
// 管理以页为单位的大块内存
// 管理多个连续页大块内存跨度结构
struct Span
{
PAGE_ID _pageId = 0; // 大块内存起始页的页号
size_t _n = 0; // 页的数量
Span* _next = nullptr; // 双向链表的结构
Span* _prev = nullptr;
size_t _objSize = 0; // 切好的小对象的大小
size_t _useCount = 0; // 切好小块内存,被分配给thread cache的计数
void* _freeList = nullptr; // 切好的小块内存的自由链表
bool _isUse = false; // 是否在被使用
};
// 带头双向循环链表
class SpanList
{
public:
SpanList()
{
_head = new Span;
_head->_next = _head;
_head->_prev = _head;
}
// 返回span的有效数据的头节点
Span* Begin()
{
return _head->_next;
}
// 返回span的无效头节点
Span* End()
{
return _head;
}
// 判断span链表是否为空
bool Empty()
{
return _head->_next == _head;
}
// 向span链表中头插一个span
void PushFront(Span* span)
{
Insert(Begin(), span);
}
// 从span链表中的头部获取一个span
Span* PopFront()
{
Span* front = _head->_next;
Erase(front);
return front;
}
// 再span链表中的pos位置之前插入newSpan
void Insert(Span* pos, Span* newSpan)
{
assert(pos);
assert(newSpan);
Span* prev = pos->_prev;
// prev newspan pos
prev->_next = newSpan;
newSpan->_prev = prev;
newSpan->_next = pos;
pos->_prev = newSpan;
}
// 删除span链表中位置为pos的span
void Erase(Span* pos)
{
assert(pos);
assert(pos != _head);
Span* prev = pos->_prev;
Span* next = pos->_next;
}
private:
Span* _head;
public:
std::mutex _mtx; // 桶锁
};
3.2.3 高并发内存池–page cache
申请内存:
- 当central cache向page cache申请内存时,page cache先检查对应位置有没有span,如果没有 则向更大页寻找一个span,如果找到则分裂成两个。比如:申请的是4页page,4页page后面没有挂span,则向后面寻找更大的span,假设在10页page位置找到一个span,则将10页page span分裂为一个4页pagespan和一个6页page span。
- 如果找到_spanList[128]都没有合适的span,则向系统使用mmap、brk或者是VirtualAlloc等方式 申请128页page span挂在自由链表中,再重复1中的过程。
- 需要注意的是central cache和page cache 的核心结构都是spanlist的哈希桶,但是他们是有本质 区别的,central cache中哈希桶,是按跟thread cache一样的大小对齐关系映射的,他的spanlist中挂的span中的内存都被按映射关系切好链接成小块内存的自由链表。而page cache 中的spanlist则是按下标桶号映射的,也就是说第i号桶中挂的span都是i页内存。
释放内存:
- 如果central cache释放回一个span,则依次寻找span的前后page id的没有在使用的空闲span, 看是否可以合并,如果合并继续向前寻找。这样就可以将切小的内存合并收缩成大的span,减少 内存碎片。
PageCache 代码框架:
// 1.page cache是一个以页为单位的span自由链表
// 2.为了保证全局只有唯一的page cache,这个类被设计成了单例模式。
class PageCache
{
public:
static PageCache* GetInstance()
{
return &_sInst;
}
// 获取从对象到span的映射
Span* MapObjectToSpan(void* obj);
// 释放空闲span回到Pagecache,并合并相邻的span
void ReleaseSpanToPageCache(Span* span);
// 获取一个K页的span
Span* NewSpan(size_t k);
std::mutex _pageMtx;
private:
SpanList _spanLists[NPAGES];
ObjectPool<Span> _spanPool;
//std::unordered_map<PAGE_ID, Span*> _idSpanMap;
//std::map<PAGE_ID, Span*> _idSpanMap;
TCMalloc_PageMap1<32 - PAGE_SHIFT> _idSpanMap;
PageCache()
{}
PageCache(const PageCache&) = delete;
static PageCache _sInst;
};
3.2.4 项目源码
四、项目的性能分析及其优化
4.1 项目瓶颈分析
在多线程环境下,用我们写的内存池和系统中的内存池进行性能对比,我们可以很明显的发现我们所写内存池还没有系统中的malloc、free的效率高,不是说在多线程环境下我们的内存池会比系统中的内存池效率会高吗?这里我们的内存池究竟是在那块出现了性能瓶颈,进行来我们来一起分析一下:
项目性能分析:
由上图可见,我们这个项目的性能消耗大部分都是因为加锁而引起的,为了挺高性能,我们必须要对加锁这种策略进行改造或者替换这个策略。这里我们参考tcmalloc中的优化策略,tcmalloc中使用基数树替换了加锁的策略,因为基数树这种数据结构在读取的时候是不需要加锁的,这样一来就可以避免了多线程并发的时候锁竞争带来的性能损耗了。
4.2 项目性能优化
4.2.1 使用基数树进行优化
这里我们使用tcmalloc中的基数树来对项目进行优化,tcmalloc中使用它的理由主要有两个:
- 很好的支持了<int, int>的映射关系
- 可以在读取该数据结构的不需要加锁,这也是提高性能的主要因素
下面为tcmalloc中基数树的源码,我们这里直接引用它并对其进行简单的改造:
#pragma once
#include"Common.h"
// Single-level array
template <int BITS>
class TCMalloc_PageMap1 {
private:
static const int LENGTH = 1 << BITS;
void** array_;
public:
typedef uintptr_t Number;
//explicit TCMalloc_PageMap1(void* (*allocator)(size_t)) {
explicit TCMalloc_PageMap1() {
//array_ = reinterpret_cast<void**>((*allocator)(sizeof(void*) << BITS));
size_t size = sizeof(void*) << BITS;
size_t alignSize = SizeClass::_RoundUp(size, 1 << PAGE_SHIFT);
array_ = (void**)SystemAlloc(alignSize >> PAGE_SHIFT);
memset(array_, 0, sizeof(void*) << BITS);
}
// Return the current value for KEY. Returns NULL if not yet set,
// or if k is out of range.
void* get(Number k) const {
if ((k >> BITS) > 0) {
return NULL;
}
return array_[k];
}
// REQUIRES "k" is in range "[0,2^BITS-1]".
// REQUIRES "k" has been ensured before.
//
// Sets the value 'v' for key 'k'.
void set(Number k, void* v) {
array_[k] = v;
}
};
// Two-level radix tree
template <int BITS>
class TCMalloc_PageMap2 {
private:
// Put 32 entries in the root and (2^BITS)/32 entries in each leaf.
static const int ROOT_BITS = 5;
static const int ROOT_LENGTH = 1 << ROOT_BITS;
static const int LEAF_BITS = BITS - ROOT_BITS;
static const int LEAF_LENGTH = 1 << LEAF_BITS;
// Leaf node
struct Leaf {
void* values[LEAF_LENGTH];
};
Leaf* root_[ROOT_LENGTH]; // Pointers to 32 child nodes
void* (*allocator_)(size_t); // Memory allocator
public:
typedef uintptr_t Number;
//explicit TCMalloc_PageMap2(void* (*allocator)(size_t)) {
explicit TCMalloc_PageMap2() {
//allocator_ = allocator;
memset(root_, 0, sizeof(root_));
PreallocateMoreMemory();
}
void* get(Number k) const {
const Number i1 = k >> LEAF_BITS;
const Number i2 = k & (LEAF_LENGTH - 1);
if ((k >> BITS) > 0 || root_[i1] == NULL) {
return NULL;
}
return root_[i1]->values[i2];
}
void set(Number k, void* v) {
const Number i1 = k >> LEAF_BITS;
const Number i2 = k & (LEAF_LENGTH - 1);
ASSERT(i1 < ROOT_LENGTH);
root_[i1]->values[i2] = v;
}
bool Ensure(Number start, size_t n) {
for (Number key = start; key <= start + n - 1;) {
const Number i1 = key >> LEAF_BITS;
// Check for overflow
if (i1 >= ROOT_LENGTH)
return false;
// Make 2nd level node if necessary
if (root_[i1] == NULL) {
//Leaf* leaf = reinterpret_cast<Leaf*>((*allocator_)(sizeof(Leaf)));
//if (leaf == NULL) return false;
static ObjectPool<Leaf> leafPool;
Leaf* leaf = (Leaf*)leafPool.New();
memset(leaf, 0, sizeof(*leaf));
root_[i1] = leaf;
}
// Advance key past whatever is covered by this leaf node
key = ((key >> LEAF_BITS) + 1) << LEAF_BITS;
}
return true;
}
void PreallocateMoreMemory() {
// Allocate enough to keep track of all possible pages
Ensure(0, 1 << BITS);
}
};
使用基数树优化后,我们写的高并发内存池比malloc块上了许多。
分析一下为什在使用基数树的情况下,读_idSpanMap不许加锁呢?
写只会在两个地方进行写
- NewSpan:
a. 将返回给上层的CentralCache的kSpan中的每一个PageId与kSpan进行映射。
b. 将切分后留在PageCache中的nSpan中的首位PageId与nSpan进行映射。- ReleaseSpanToPageCache:
a. 前后页合并后,会将合并后的大的span的首位PageId与span进行映射。读只会在三个地方进行读
- CurrentFree:
a. 根据要释放的指针找到该指针所处的PageId,然后根据PageId找到指针所处的Span,获取对象内存大小。
b. 同时,如果获取到对象内存大小大于256kb,直接将span在PageCache中进行释放。- ReleaseListToSpan:
a. 当ThreadCache层归还给freeList的内存对象个数大于等于一次向CentralCache申请内存对象个数时,会将freeList当中一定数量的内存对象归还给CentralCache。为了直到要归还给SpanList当中的哪个span,所以要读取idSpanMap(ptr -> PageId -> span)。- ReleaseSpanToPageCache:
a. 如果ThreadCache将CentralCache中的某一个spanList中的某一个span中的内存对象全部归还回来了,那么该span就会继续向下一层PageCache进行归还。在归还的过程中会进行前后页的合并,来缓解外碎片问题。那么就需要直到前面的span,和后面的span,所以要进行读取idSpanMap。写是否还需要加锁?
需要,虽然多个线程不会对同一个span进行NewSpan和ReleaseSpanToPageCache。但是多个线程会同时NewSpan和RealseSpanToPageCache,这样的化NewSpan和RealseSpanToPageCache会出现线程安全。
读是否需要加锁呢?
不需要。可以从两个方面进行思考。数据结构和数据内容。
使用基数树这种数据结构空间是提前开好的,不会在写的时候进行扩容。
因为读写是分离的,在代码编写真确的前提下,读之前一定是已经将内容写好了,因为读的时候都是释放内存对象的过程,在申请内存对象时进行写入。