🌠 作者:@阿亮joy.
🎆专栏:《项目设计》
🎇 座右铭:每个优秀的人都有一段沉默的时光,那段时光是付出了很多努力却得不到结果的日子,我们把它叫做扎根
目录
- 👉项目介绍👈
- 👉什么是内存池👈
- 池化技术
- 内存池
- 内存池主要解决的问题
- malloc
- 👉高并发内存池的设计👈
- 定长的内存池
- 高并发内存池整体框架设
- thread cache整体设计
- central cache整体设计
- page cache整体设计
- thread cache回收内存
- central cache回收内存
- page cache回收内存
- 大于256KB的大块内存申请问题
- 使用定长内存池配合脱离new
- 释放对象时优化为不传对象大小
- 多线程环境下对比malloc测试
- 使用基数树进行优化
- 👉扩展学习👈
👉项目介绍👈
当前项目是实现一个高并发的内存池,它的原型是 Google 的一个开源项目tcmalloc,tcmalloc 全称为 Thread-Caching Malloc,即线程缓存的 malloc,实现了高效的多线程内存管理,用于替代系统的内存分配相关的函数(malloc、free)。
我们这个项目是把 tcmalloc 最核心的框架简化后拿出来,模拟实现出一个自己的高并发内存池,目的就是学习 tcamlloc 的精华,这种方式有点类似我们之前学习 STL 容器的方式。但是相比 STL 容器部分,tcmalloc 的代码量和复杂度上升了很多,大家要有心理准备。当前另一方面,难度的上升,我们的收获和成长也是在这个过程中同步上升。
另一方面 tcmalloc 是全球大厂 Google 开源的,可以认为当时顶尖的 C++ 高手写出来的,它的知名度也是非常高的,不少公司都在用它,Go 语言直接用它做了自己内存分配器。所以很多程序员是熟悉这个项目的,那么有好处,也有坏处。好处就是把这个项目理解扎实了,会很受面试官的认可。坏处就是面试官可能也比较熟悉项目,对项目会问得比较深,比较细。如果你对项目掌握得不扎实,那么就容易碰钉子。
tcmalloc源代码
这个项目会用到 C/C++、数据结构(链表、哈希桶)、操作系统内存管理、单例模式、多线程、互斥锁等等方面的知识。这个项目的难度较大,不过没有什么关系,我们一点点来完成它。
👉什么是内存池👈
池化技术
所谓的池化技术就是程序先向系统申请过量的资源,然后自己管理,以备不时之需。之所以要申请过量的资源,是因为每次申请该资源都有较大的开销,不如提前申请好了,这样使用时就会变得非常快捷,大大提高程序运行效率。
在计算机中,有很多使用池化这种技术的地方,除了内存池,还有连接池、线程池、对象池等。以服务器上的线程池为例,它的主要思想是:先启动若干数量的线程,让它们处于睡眠状态,当接收到客户端的请求时,唤醒池中某个睡眠的线程,让它来处理客户端的请求,当处理完这个请求,线程又进入睡眠状态。
内存池
内存池是指程序预先从操作系统申请一块足够大内存。此后,当程序中需要申请内存的时候,不是直接向操作系统申请,而是直接从内存池中获取;同理,当程序释放内存的时候,并不真正将内存返回给操作系统,而是返回内存池。当程序退出(或者特定时间)时,内存池才将之前申请的内存真正释放。
内存池主要解决的问题
内存池主要解决的当然是效率的问题,其次如果作为系统的内存分配器的角度,还需要解决一下内存碎片的问题。那么什么是内存碎片呢?
需要再补充说明的是内存碎片分为外碎片和内碎片,上面我们讲的外碎片问题。外部碎片是一些空闲的连续内存区域太小,这些内存空间不连续,以至于合计的内存足够,但是不能满足一些的内存分配申请需求。内部碎片是由于一些对齐的需求,导致分配出去的空间中一些内存无法被利用。内碎片问题,我们后面项目就会看到,那会再进行更准确的理解。
malloc
C / C++ 中我们要动态申请内存都是通过 malloc 去申请内存,但是我们要知道,实际我们不是直接去堆获取内存的,而 malloc 就是一个内存池。malloc 相当于向操作系统批发了一块较大的内存空间,然后零售给程序用。当全部售完或程序有大量的内存需求时,再根据实际需求向操作系统进货。malloc 的实现方式有很多种,一般不同编译器平台用的都是不同的。比如 Windows 的 VS系列用的微软自己写的一套,Linux gcc用的 glibc 中的 ptmalloc。下面有几篇关于这块的文章,大概可以去简单地看看了解一下。关于 ptmalloc,学完项目以后,大家可以去看看他的实现细节。
拓展阅读:
一文了解,Linux内存管理,malloc、free 实现原理
malloc()背后的实现原理——内存池
malloc的底层实现(ptmalloc)
👉高并发内存池的设计👈
定长的内存池
作为 C / C++ 程序员,我们知道申请内存使用的是 malloc,malloc 其实就是一个通用的大众货,什么场景
下都可以用,但是什么场景下都可以用就意味着什么场景下都不会有很高的性能,下面我们就先来设计一个定长内存池做个开胃菜。当然,这个定长内存池在我们后面的高并发内存池中也是有价值的,所以学习它有两层目的,先熟悉一下简单内存池是如何控制的,第二他会作为我们后面内存池的一个基础组件。
特点:性能达到极致、不考虑内存碎片问题。
Windows 和 Linux 下如何直接向堆申请页为单位的大块内存:VirtualAlloc、brk 和 mmap。
// ObjectPool.h
// ObjectPool是针对某一类对象的定长内存池
#pragma once
#include <iostream>
#include <vector>
#include <time.h>
// 工程项目中不要将std整个命名空间展开,避免命名污染
using std::cout;
using std::endl;
// 操作系统的控制,如果是Windows系统则包含windows.h
// 如果是Linux系统,则包含brk系统调用对应的头文件
#ifdef _WIN32
#include <windows.h>
#else
//
#endif
// 因为要实现的高并发内存池要使用到定长
// 内存池,所以就不采用下方的实现方式了
// 定长内存池
//template <size_t N>
//class ObjectPool
//{};
// 直接去堆上按页申请空间(Windows的VirtualAlloc和Linux的brk)
// kpage是页数,假设一页是8KB
inline static void* SystemAlloc(size_t kpage)
{
#ifdef _WIN32
void* ptr = VirtualAlloc(0, kpage << 13, MEM_COMMIT | MEM_RESERVE, PAGE_READWRITE);
#else
// linux下brk和mmap等
#endif
if (ptr == nullptr)
throw std::bad_alloc(); // 申请内存失败则抛异常
return ptr;
}
template <class T>
class ObjectPool
{
public:
T* New()
{
T* obj = nullptr; // obj为申请到的内存
// 优先把还回来的内存重复使用
if (_freeList != nullptr)
{
// 将_freeList强转为void**,对其解引用即可
// 得到四个字节(32位)或八个字节(64位)的地址
void* next = *((void**)_freeList);
obj = (T*)_freeList;
_freeList = next; // 指向下一块内存
}
else
{
// 剩余内存_leftBytes不够一个对象的大小时,则只
// 能丢掉剩余内存,再去申请大块内存
if (_leftBytes < sizeof(T))
{
_leftBytes = 128 * 1024;
// 使用VirtualAlloc是为了完全和malloc脱离
//_memory = (char*)malloc(_leftBytes);
// 假设一页是8KB,右移13位即可算出页数
_memory = (char*)SystemAlloc(_leftBytes >> 13);
// 申请内存是否抛出bad_alloc异常
if (_memory == nullptr)
{
throw std::bad_alloc();
}
}
obj = (T*)_memory;
// 保证申请的内存大小至少是一个指针的大小
// 以保证能够存下一块内存的起始地址
size_t objSize = sizeof(T) < sizeof(void*) ? sizeof(void*) : sizeof(T);
_memory += objSize;
_leftBytes -= objSize;
}
// 定位new,显式调用T的构造函数初始化
new(obj) T;
return obj;
}
// 释放的内存头插到自由链表中
void Delete(T* obj)
{
// 显示调用T的析构函数清理对象
obj->~T();
// 将_freeList强转为void**,对其解引用
// 即可得到四个字节或八个字节的地址
// 头插
*(void**)obj = _freeList;
_freeList = obj;
}
private:
// 使用char*比较方便,因为其加一就是跳过一个字节
char* _memory = nullptr; // 指向大块内存的指针
size_t _leftBytes = 0; // 大块内存在切分过程中剩余字节数
void* _freeList = nullptr; // 自由链表的头指针,自由链表中存的是返回来的内存
};
性能对比
// Test.cpp
#include "ObjectPool.h"
struct TreeNode
{
int _val;
TreeNode* _left;
TreeNode* _right;
TreeNode()
:_val(0)
, _left(nullptr)
, _right(nullptr)
{}
};
void ObjectPoolTest()
{
// 申请释放的轮次
const size_t Rounds = 5;
// 每轮申请释放多少次
const size_t N = 1000000;
std::vector<TreeNode*> v1;
v1.reserve(N);
size_t begin1 = clock();
for (size_t j = 0; j < Rounds; ++j)
{
for (int i = 0; i < N; ++i)
{
v1.push_back(new TreeNode);
}
for (int i = 0; i < N; ++i)
{
delete v1[i];
}
v1.clear();
}
size_t end1 = clock();
std::vector<TreeNode*> v2;
v2.reserve(N);
ObjectPool<TreeNode> Pool;
size_t begin2 = clock();
for (size_t j = 0; j < Rounds; ++j)
{
for (int i = 0; i < N; ++i)
{
v2.push_back(Pool.New());
}
for (int i = 0; i < N; ++i)
{
Pool.Delete(v2[i]);
}
v2.clear();
}
size_t end2 = clock();
cout << "new cost time:" << end1 - begin1 << endl;
cout << "object pool cost time:" << end2 - begin2 << endl;
}
int main()
{
ObjectPoolTest();
return 0;
}
注:对象池申请到内存是不用释放的,也无法正常释放,因为申请到的内存已经被切分成一小块一小块的内存了。只要进程是正常结束的,操作系统会自动回收进程申请的内存。
高并发内存池整体框架设
现代很多的开发环境都是多核多线程,在申请内存的场景下,必然存在激烈的锁竞争问题。malloc 本身其实已经很优秀,而我们项目的原型 tcmalloc 在多线程高并发的场景下更胜一筹,所以这次我们实现的内存池需要考虑以下几方面的问题。
- 性能问题
- 多线程场景下,锁竞争的问题
- 内存碎片问题
Concurrent Memory Pool 主要由以下三个部分构成:
- thread cache:线程缓存是每个线程独有的,用于小于256KB 的内存的分配,线程从这里申请内存不需要加锁,每个线程独享一个 cache,这也就是这个并发线程池高效的地方。
- central cache:中心缓存是所有线程所共享,thread cache 是按需从 central cache 中获取内存的对象。central cache 会在合适的时机回收 thread cache 中的内存对象,避免一个线程占用了太多的内存,而其他线程的内存吃紧,达到内存分配在多个线程中更均衡的按需调度的目的。central cache 是存在竞争的,所以从这里取内存对象是需要加锁。首先这里用的是桶锁,其次只有 thread cache 没有内存对象时才会找 central cache,所以这里竞争不会很激烈。
- page cache:页缓存是在 central cache 缓存上面的一层缓存,存储的内存是以页为单位存储及分配的,central cache 没有内存对象时,从 page cache 分配出一定数量的 page,并切割成定长大小的小块内存,分配给 central cache。当一个 span 的几个跨度页的对象都回收以后,page cache 会回收 central cache 满足条件的 span 对象,并且合并相邻的页,组成更大的页,缓解内存碎片的问题。
thread cache整体设计
thread cache 是哈希桶结构,每个桶是一个按桶位置映射大小的内存块对象的自由链表。每个线程都会有一个 thread cache 对象,这样每个线程在这里获取对象和释放对象时是无锁的。
申请内存:
- 当申请的内存 size 小于等于 256 KB,先获取到线程本地存储的 thread cache 对象,计算 size 映射的哈希桶自由链表下标
i
。- 如果自由链表
_freeLists[i]
中有对象,则直接 Pop 一个内存对象返回。- 如果
_freeLists[i]
中没有对象时,则批量从 central cache 中获取一定数量的对象,插入到自由链表并返回一个对象。
释放内存:
- 当释放的内存 size 小于或等于 256 KB时,将内存释放回 thread cache,计算 size 映射自由链表桶位置
i
,将对象 Push 到_freeLists[i]
。- 当链表的长度过长,则回收一部分内存对象到 central cache。
TLS–thread local storage(线程局部存储):linux gcc下 tls。
Common.h 是公共的头文件,主要定义了自由链表和负责内存对齐规则的类;自由链表负责将还回来的内存管理起来,负责内存对齐的类主要是将申请的内存大小上调的对齐数的倍数以及将其映射到对应的哈希桶中去。
// Common.h
#pragma once
#include <iostream>
#include <vector>
#include <time.h>
#include <assert.h>
#include <thread>
// 工程项目中不要将std整个命名空间展开,避免命名污染
using std::cout;
using std::endl;
// 操作系统的控制,如果是Windows系统则包含windows.h
// 如果是Linux系统,则包含brk系统调用对应的头文件
#ifdef _WIN32
#include <windows.h>
#else
//
#endif
static const size_t MAX_BYTES = 256 * 1024; // 向thread cache申请内存的最大值
static const size_t NFREELIST = 208; // thread cache哈希桶的个数
// 直接去堆上按页申请空间(Windows的VirtualAlloc和Linux的brk)
// kpage是页数,假设一页是8KB
inline static void* SystemAlloc(size_t kpage)
{
#ifdef _WIN32
void* ptr = VirtualAlloc(0, kpage << 13, MEM_COMMIT | MEM_RESERVE, PAGE_READWRITE);
#else
// linux下brk mmap等
#endif
if (ptr == nullptr)
throw std::bad_alloc();
return ptr;
}
// static修饰函数保持在当前文件可见
// 该函数以后也要用到,所以设置将其为全局函数了
// 返回obj的下一个内存
static void*& NextObj(void* obj)
{
return *(void**)obj;
}
// 管理切分好的小块内存的自由链表
class FreeList
{
public:
// 将释放的内存头插到自由链表中
void Push(void* obj)
{
// 头插
NextObj(obj) = _freeList;
_freeList = obj;
}
// 申请内存:自由链表头删即可
void* Pop()
{
assert(_freeList); // 确保自由链表不为空
// 头删
void* obj = _freeList;
_freeList = NextObj(obj);
return obj;
}
bool Empty()
{
return _freeList == nullptr;
}
private:
void* _freeList = nullptr;
};
// 该类负责对象大小的内存对齐规则
class SizeClass
{
public:
// 整体控制在最多10%左右的内碎片浪费
// [1,128] 8byte对齐 freelist[0,16)
// [128+1,1024] 16byte对齐 freelist[16,72)
// [1024+1,8*1024] 128byte对齐 freelist[72,128)
// [8*1024+1,64*1024] 1024byte对齐 freelist[128,184)
// [64*1024+1,256*1024] 8*1024byte对齐 freelist[184,208)
// 总共208个哈希桶
// 将申请的内存bytes上调到align的倍数(align为对齐数)
static inline size_t _RoundUp(size_t bytes, size_t align)
{
return ((bytes + align - 1) & ~(align - 1));
}
//static inline size_t _RoundUp(size_t bytes, size_t align)
//{
// size_t alignSize = 0;
// if (bytes % align != 0)
// alignSize = (bytes / align + 1) * align;
// else
// alignSize = bytes;
// return alignSize;
//}
static inline size_t RoundUp(size_t size)
{
if (size <= 128)
{
return _RoundUp(size, 8);
}
else if (size <= 1024)
{
return _RoundUp(size, 16);
}
else if (size <= 8 * 1024)
{
return _RoundUp(size, 128);
}
else if (size <= 64 * 1024)
{
return _RoundUp(size, 1024);
}
else if (size <= 256 * 1024)
{
return _RoundUp(size, 8 * 1024);
}
else
{
// 走到这里说明申请的内存大于MAX_BYTES
assert(false);
return -1;
}
}
// align是对齐数
//size_t _Index(size_t bytes, size_t align)
//{
// return (bytes + align - 1) / align - 1;
//}
// 1 + 7 8
// 2 + 7 9
// ...
// 8 + 7 15
// 2通过左移alignShift可以得到align
static inline size_t _Index(size_t bytes, size_t alignShift)
{
return ((bytes + (1 << alignShift) - 1) >> alignShift) - 1;
}
// 计算映射的哪一个自由链表桶
static inline size_t Index(size_t bytes)
{
assert(bytes <= MAX_BYTES);
// 某个对齐数哈希桶的总个数
static int groupArrry[4] = { 16, 56, 56, 56 };
if(bytes <= 128)
return _Index(bytes, 3);
else if (bytes <= 1024)
return _Index(bytes - 128, 4) + groupArrry[0];
else if (bytes <= 8 * 1024)
return _Index(bytes - 1024, 7) + groupArrry[1] + groupArrry[0];
else if (bytes <= 64 * 1024)
return _Index(bytes - 8 * 1024, 10) + groupArrry[2] + groupArrry[1] + groupArrry[0];
else if (bytes <= 256 * 1024)
return _Index(bytes - 64 * 1024, 13) + groupArrry[3] + groupArrry[2] + groupArrry[1] + groupArrry[0];
else
{
assert(false);
return -1;
}
}
};
ConcurrentAlloc.h 中定义的是高并发内存池申请和释放内存的接口。
// ConcurrentAlloc.h
#pragma once
#include "Common.h"
#include "ThreadCache.h"
// 高并发内存池申请和释放内存的接口
static void* ConcurrentAlloc(size_t size)
{
// 通过TLS,每个线程就可以无锁地获取自己专属的ThreadCache对象
if (pTLSThreadCache == nullptr)
{
pTLSThreadCache = new ThreadCache;
}
// 下面的这句代码可以注释掉
cout << std::this_thread::get_id() << ":" << pTLSThreadCache << endl;
return pTLSThreadCache->Allocate(size);
}
// 一般Free接口是不需要指明释放内存的大小的,后面会优化
static void ConcurrentFree(void* ptr, size_t size)
{
assert(pTLSThreadCache);
pTLSThreadCache->Deallocate(ptr, size);
}
一个进程里可能会有多个线程,线程共享整个进程地址空间,每个线程有自己独立的栈、寄存器等。因为要保证每个线程独享一个 thread cache,而我们又不想加锁来实现,那么就可以通过线程局部来实现。
// ThreadCache.h
#pragma once
#include "Common.h"
class ThreadCache
{
public:
// 申请和释放内存对象
void* Allocate(size_t size);
void Deallocate(void* ptr, size_t size);
// 从中心缓存中获取对象
void* FetchFromCentralCache(size_t index, size_t size);
private:
FreeList _freeLists[NFREELIST];
};
// TLS thread local storage(线程局部存储)
static _declspec(thread) ThreadCache* pTLSThreadCache = nullptr;
// ThreadCache.cpp
#include "ThreadCache.h"
void* ThreadCache::Allocate(size_t size)
{
assert(size <= MAX_BYTES);
size_t alignSize = SizeClass::RoundUp(size);
size_t index = SizeClass::Index(size);
// 对应的桶中有内存,则直接去桶中取内存
// 如果没有,则向CentralCache申请内存
if (!_freeLists[index].Empty())
return _freeLists[index].Pop();
else
return FetchFromCentralCache(index, alignSize);
}
void ThreadCache::Deallocate(void* ptr, size_t size)
{
assert(ptr);
assert(size <= MAX_BYTES);
// 找对映射的自由链表桶,将释放的内存头插到自由链表中
size_t index = SizeClass::Index(size);
_freeLists[index].Push(ptr);
// 待完善:当自由链表过长时,将自由链表的
// 一部分内存还给central cache
}
void* ThreadCache::FetchFromCentralCache(size_t index, size_t size)
{
return nullptr; // 待完善
}
单元测试:测试线程局部存储是否真的可以抱枕每个线程都有独自的 ThreadCache 对象,可以通过调试来仔细观察。
void Alloc1()
{
for (size_t i = 0; i < 5; ++i)
{
void* ptr = ConcurrentAlloc(6);
}
}
void Alloc2()
{
for (size_t i = 0; i < 5; ++i)
{
void* ptr = ConcurrentAlloc(7);
}
}
void TLSTest()
{
std::thread t1(Alloc1);
t1.join();
std::thread t2(Alloc2);
t2.join();
}
central cache整体设计
central cache 也是一个哈希桶结构,它的哈希桶的映射关系跟 thread cache 是一样的。不同的是他的每个哈希桶位置挂是 SpanList 链表结构,不过每个映射桶下面的 span 中的大内存块被按映射关系切成了一个个小内存块对象挂在 span 的自由链表中。
申请内存:
- 当 thread cache 中没有内存时,就会批量向 central cache 申请一些内存对象,这里的批量获取对象的数量使用了类似网络 tcp 协议拥塞控制的慢开始算法;central cache 也有一个哈希映射的 spanlist,spanlist 中挂着 span,从 span 中取出对象给 thread cache。这个过程是需要加锁的,不过这里使用的是一个桶锁,尽可能提高效率。
- central cache 映射的 spanlist 中所有 span 的都没有内存以后,则需要向 page cache 申请一个新的 span 对象,拿到 span 以后将 span 管理的内存按大小切好作为自由链表链接到一起。然后从 span 中取对象给 thread cache。
- central cache 的中挂的 span 中 use_count 记录分配了多少个对象出去,分配一个对象给 thread cache,就 ++use_count。
释放内存:
当 thread_cache 过长或者线程销毁,则会将内存释放回central cache 中的,释放回来时 --use_count,线程返回来的内存也可以给其他线程使用。当 use_count 减到 0 时,则表示所有对象都回到了 span,则将span 释放回 page cache,page cache 中会对前后相邻的空闲页进行合并,以解决内存碎片问题。
Common.h 头文件中增加了 Span 和 SpanList 的定义,主要是服务于 CentralCache 的实现。
#include <thread>
#include <mutex>
// 64位平台下,_WIN64和_WIN32都有定义
#ifdef _WIN64
typedef unsigned long long PAGE_ID;
#elif _WIN32
typedef size_t PAGE_ID;
#else
// linux
#endif
// 管理切分好的小块内存的自由链表
class FreeList
{
public:
size_t& MaxSize()
{
return _maxSize;
}
// 将一段内存头插到自由链表中
void PushRange(void* start, void* end)
{
NextObj(end) = _freeList;
_freeList = start;
}
private:
size_t _maxSize = 1;
};
// 该类负责对象大小的内存对齐规则
class SizeClass
{
public:
// thread cache一次从中心缓存获取多少个内存对象
static size_t NumMoveSize(size_t size)
{
assert(size > 0);
// [2, 512],一次批量移动多少个对象的(慢启动)上限值
// 小对象一次批量上限高
// 小对象一次批量上限低
int num = MAX_BYTES / size;
if (num < 2)
num = 2;
if (num > 512)
num = 512;
return num;
}
};
// 管理多个连续页的大块内存跨度结构
struct Span
{
PAGE_ID _pageId = 0; // 大块内存起始页的页号
size_t _n = 0; // 页的数量
Span* _prev = nullptr; // 双向链表的结果
Span* _next = nullptr;
size_t _useCount = 0; // 切分好的小块内存被ThreadCache使用的计数
void* _freeList = nullptr; // 小块内存组成的自由链表
};
// 带头双向循环链表
class SpanList
{
public:
SpanList()
{
_head = new Span;
_head->_prev = _head;
_head->_next = _head;
}
void Insert(Span* pos, Span* newSpan)
{
assert(pos);
assert(newSpan);
// prev newSpan pos
Span* prev = pos->_prev;
prev->_next = newSpan;
newSpan->_prev = prev;
newSpan->_next = pos;
pos->_prev = newSpan;
}
void Erase(Span* pos)
{
assert(pos);
assert(pos != _head); // 防止删除哨兵位头节点
Span* prev = pos->_prev;
Span* next = pos->_next;
prev->_next = next;
next->_prev = prev;
}
private:
Span* _head; // 哨兵位头节点
public:
std::mutex _mtx; // 桶锁
};
CentralCache 也是一个哈希桶的结构,其映射的规则和 ThreadCache 的映射规则是一致的。因为多个线程是同用一个 CentralCache 的,所以要将 CentralCache 设计成单例模式,同时还需要对其进行加锁保护。注意:这个锁是桶锁。
// CentralCache.h
#pragma once
#include "Common.h"
class CentralCache
{
public:
static CentralCache* GetInstance()
{
return &_sInst;
}
// 获取一个非空的span
Span* GetOneSpan(SpanList& list, size_t size);
// 从中心缓存获取一定数量的size大小的内存对象给ThreadCache
size_t FetchRangeObj(void*& start, void*& end, size_t batchNum, size_t size);
private:
SpanList _spanLists[NFREELIST];
private:
// 单例模式保证多个线程共用一个CentralCache
CentralCache()
{}
CentralCache(const CentralCache&) = delete;
static CentralCache _sInst;
};
// CentralCache.cpp
#include "Common.h"
#include "CentralCache.h"
// 获取一个非空的span
Span* CentralCache::GetOneSpan(SpanList& list, size_t size)
{
// 待完善
return nullptr;
}
// 从中心缓存获取一定数量的size大小的内存对象给ThreadCache
// 注:start和end都是输出型参数
size_t CentralCache::FetchRangeObj(void*& start, void*& end, size_t batchNum, size_t size)
{
size_t index = SizeClass::Index(size);
// 访问_spanLists[index],需要将桶锁给加上
_spanLists[index]._mtx.lock();
Span* span = GetOneSpan(_spanLists[index], size);
assert(span);
assert(span->_freeList);
// 从span中获取batchNum个对象
// 如果不够batchNum个,则有多少拿多少
start = span->_freeList;
end = start;
size_t i = 0;
size_t actualNum = 1;
while (i < batchNum - 1 && NextObj(end) != nullptr)
{
++i;
++actualNum;
end = NextObj(end);
}
span->_freeList = NextObj(end);
NextObj(end) = nullptr;
span->_useCount += actualNum; // ThreadCache拿走了actualNum个内存对象
_spanLists[index]._mtx.unlock();
return actualNum;
}
// ThreadCache.cpp
// index和size都是已经算好了的
void* ThreadCache::FetchFromCentralCache(size_t index, size_t size)
{
// 慢开始反馈调节算法
// 1. 最开始不会一次向CentralCache要太多小块内存,因为要太多可能用不完
// 2. 如果不断有size大小的内存需求,那么batchNum就会不断增长,直到上限
// 3. size越大,一次向CentralCache要的batchNum就越小
// 4. size越小,一次向CentralCache要的batchNum就越大
size_t batchNum = min(SizeClass::NumMoveSize(size), _freeLists[index].MaxSize());
// 慢慢增加自由链表的最大长度
if (batchNum == _freeLists[index].MaxSize())
{
_freeLists[index].MaxSize() += 1;
}
// start和end是输出型参数
void* start = nullptr;
void* end = nullptr;
size_t actualNum = CentralCache::GetInstance()->FetchRangeObj(start, end, batchNum, size);
assert(actualNum > 0);
// 只获取到一个内存对象,直接返回即可
// 而如果申请到一段内存,需要头插到对应的自由链表中
if (actualNum == 1)
{
assert(start == end); // 确保只拿到一个size大小的内存对象
return start;
}
else
{
_freeLists[index].PushRange(NextObj(start), end);
return start;
}
}
page cache整体设计
申请内存:
- 当 central cache 向 page cache 申请内存时,page cache 先检查对应位置有没有 span,如果没有则向更大页寻找一个 span,如果找到则分裂成两个。比如:申请的是 4 页 page,4 页 page 后面没有挂 span,则向后面寻找更大的 span,假设在 10 页 page 位置找到一个 span,则将10 页 page span 分裂为一个 4 页 page span 和一个 6 页page span。
- 如果找到 _spanList[128] 都没有合适的 span,则向系统使用 mmap、brk 或者是 VirtualAlloc 等方式申请 128 页 page span 挂在自由链表中,再重复 1 中的过程。
- 需要注意的是 central cache 和 page cache 的核心结构都是 spanlist 的哈希桶,但是他们是有本质区别的,central cache 中哈希桶,是按跟 thread cache 一样的大小对齐关系映射的,它的 spanlist 中挂的 span 中的内存都被按映射关系切好链接成小块内存的自由链表。而 page cache 中的 spanlist 则是按下标桶号映射的,也就是说第 i 号桶中挂的 span 都是 i 页内存,也就是 page cache 中的 span 是不会被切分成小块内存的。
释放内存:
如果 central cache 释放回一个 span(span 的 _useCount 为 0),则依次寻找 span 的前后 page id 的没有在使用的空闲 span,看是否可以合并,如果合并继续向前寻找。这样就可以将切小的内存合并收缩成大的 span,减少内存碎片。
注意:PageCache 中的锁不是桶锁,而是一个将 PageCache 锁起来的大锁,防止多个线程同时访问 PageCache 从而出现线程安全问题。其实加桶锁也是可以的,但是加桶锁的话,会有频繁的加锁和解锁,导致效率降低。
Common.h
// Common.h
static const size_t NPAGES = 129; // ThreadCache申请最大的内存是256KB(128页可以被切分成4份)
static const size_t PAGE_SHIFT = 13; // 一页的大小(2的13次方)
class SizeClass
{
public:
// 计算一次向系统获取几个页
// 单个对象 8Byte
// ...
// 单个对象 256KB
// 单个对象越大,要的页数就越大
static size_t NumMovePage(size_t size)
{
size_t num = NumMoveSize(size);
size_t npage = num * size;
npage >>= PAGE_SHIFT;
if (npage == 0)
npage = 1;
return npage;
}
};
class SpanList
{
public:
Span* Begin()
{
return _head->_next;
}
Span* End()
{
return _head;
}
bool Empty()
{
return _head->_next == _head;
}
void PushFront(Span* span)
{
Insert(Begin(), span);
}
Span* PopFront()
{
Span* front = _head->_next;
Erase(front);
return front;
}
};
PageCache.h
// PageCache.h
#pragma once
#include "Common.h"
class PageCache
{
public:
static PageCache* GetInstance()
{
return &_sInst;
}
// 获取一个K页的span
Span* NewSpan(size_t k);
private:
PageCache()
{}
PageCache(const PageCache&) = delete;
SpanList _spanLists[NPAGES];
static PageCache _sInst; // 单例模式
public:
std::mutex _pageMtx;
};
// PageCache.cpp
#include "PageCache.h"
PageCache PageCache::_sInst;
// 获取一个k页的span
Span* PageCache::NewSpan(size_t k)
{
assert(k > 0 && k < NPAGES);
// 先检查第k个桶里面有没有span
if (!_spanLists[k].Empty())
{
return _spanLists[k].PopFront();
}
// 检查后面的桶有没有span,如果有可以将它进行切分
for (int i = k; i < NPAGES; ++i)
{
if (!_spanLists[i].Empty())
{
Span* nSpan = _spanLists[i].PopFront();
Span* kSpan = new Span;
// 在nSpan的头部切k页出来
kSpan->_pageId = nSpan->_pageId;
kSpan->_n = k;
nSpan->_pageId += k;
nSpan->_n -= k;
// 切分好后需要将nSpan头插到对应的哈希桶中
_spanLists[nSpan->_n].PushFront(nSpan);
return kSpan;
}
}
// 走到这个位置就说明后面没有大页的span了
// 这时就去找堆要一个128页的span
Span* bigSpan = new Span;
// 系统给内存也是按照一定的对齐规则来给的
void* ptr = SystemAlloc(NPAGES - 1);
// 将起始地址转成对应的页号
bigSpan->_pageId = (PAGE_ID)ptr >> PAGE_SHIFT;
bigSpan->_n = NPAGES - 1;
// 将申请到的128页的span头插
_spanLists[bigSpan->_n].PushFront(bigSpan);
return NewSpan(k); // 代码复用
}
CentralCache.cpp
// 获取一个非空的span
Span* CentralCache::GetOneSpan(SpanList& list, size_t size)
{
// 查看当前的spanlists中是否还有小块内存的span
Span* it = list.Begin();
while (it != list.End())
{
// 如果it->_freeList不为空,则说明这个Span中
// 的自由链表挂有小块内存
if (it->_freeList != nullptr)
return it;
else
it = it->_next;
}
// 先将CentralCache的桶锁解掉,如果其他
// 线程将内存还回来,不会出现阻塞的情况
list._mtx.unlock();
// 走到这里说明,spanlist中的span都没有小块内存了
// 需要向PageCache申请内存
// 给PageCache加上大锁,防止其他线程访问
PageCache::GetInstance()->_pageMtx.lock();
Span* span = PageCache::GetInstance()->NewSpan(SizeClass::NumMovePage(size));
PageCache::GetInstance()->_pageMtx.unlock();
// 对获取到的span进行切分的过程不需要加锁,因为其他线程无法拿到这个span
// 计算span的大块内存的起始地址和大块内存的大小(字节数)
char* start = (char*)(span->_pageId << PAGE_SHIFT);
size_t bytes = span->_n << PAGE_SHIFT;
char* end = start + bytes;
// 把大块内存切成自由链表链接起来
// 先切一块下来去做头,方便尾插
// 这里尾插可以做到内存连续
span->_freeList = start;
start += size;
void* tail = span->_freeList;
while (start < end)
{
NextObj(tail) = start;
tail = start; // tail = NextObj(tail);
start += size;
}
NextObj(tail) = nullptr;
// 大块内存切分好后,将大块内存头插到对应的哈希桶中
// 将span头插到桶中时,需要加锁
list._mtx.lock();
list.PushFront(span);
return span;
}
thread cache回收内存
当 thread cache 中的某个桶太长了,会将一部分的内存归还给 central cache,其他线程就可以向 central cache 申请该大小的内存或者合并出更大块的内存以缓解内存碎片问题。tcmalloc 中的 thread cache 回收内存考虑了两个因素:一是桶的长度,二是 thread cache 中总内存的大小。而我们的项目考虑比较简单一点,只考虑桶的长度。
因为释放 thread cache 中的内存,所以需要知道链表的长度是多长,那么就要记录链表的长度 _size了。
class FreeList
{
public:
void PopRange(void*& start, void*& end, size_t n)
{
assert(n <= _size);
start = _freeList;
end = start;
for (size_t i = 0; i < n - 1; ++i)
{
end = NextObj(end);
}
_freeList = NextObj(end);
NextObj(end) = nullptr;
_size -= n;
}
size_t Size()
{
return _size;
}
private:
size_t _size = 0;
};
ThreadCache.cpp
void ThreadCache::Deallocate(void* ptr, size_t size)
{
assert(ptr);
assert(size <= MAX_BYTES);
// 找对映射的自由链表桶,将释放的内存头插到自由链表中
size_t index = SizeClass::Index(size);
_freeLists[index].Push(ptr);
// 当链表的长度大于一次批量申请的内存时就
// 开始还一段list给central cache
if (_freeLists[index].Size() >= _freeLists[index].MaxSize())
{
ListTooLong(_freeLists[index], size);
}
}
// 释放对象时,链表过长时,回收内存回到中心缓存
void ThreadCache::ListTooLong(FreeList& list, size_t size)
{
// start和end是输出型参数
void* start = nullptr;
void* end = nullptr;
// PopRange函数会将end置成nullptr
list.PopRange(start, end, list.MaxSize());
// 尝试合并出更大的内存
CentralCache::GetInstance()->ReleaseListToSpans(start, size);
}
central cache回收内存
因为 thread cache 还回来的内存不知道是在哪一个 span 中的,所以我们要通过映射表找到这些内存对应的 span。
PageCache.h
// PageCache.h
class PageCache
{
public:
// 获取从对象到span的映射
Span* MapObjectToSpan(void* obj);
// 释放空闲span回到Pagecache,并合并相邻的span
void ReleaseSpanToPageCache(Span* span);
private:
std::unordered_map<PAGE_ID, Span*> _idSpanMap;
}
// PageCache.cpp
// 获取从对象到span的映射
Span* PageCache::MapObjectToSpan(void* obj)
{
PAGE_ID id = (PAGE_ID)obj >> PAGE_SHIFT;
auto ret = _idSpanMap.find(id);
if (ret != _idSpanMap.end())
{
return ret->second;
}
else
{
assert(false);
return nullptr;
}
}
// 获取一个k页的span
Span* PageCache::NewSpan(size_t k)
{
assert(k > 0 && k < NPAGES);
// 先检查第k个桶里面有没有span
if (!_spanLists[k].Empty())
{
Span* kSpan = _spanLists[k].PopFront();
// 建立id和span的映射,方便central cache回收内存时,查找对应的span
for (int i = 0; i < kSpan->_n; ++i)
{
_idSpanMap[kSpan->_pageId + i] = kSpan;
}
return kSpan;
}
// 检查后面的桶有没有span,如果有可以将它进行切分
for (int i = k + 1; i < NPAGES; ++i)
{
if (!_spanLists[i].Empty())
{
Span* nSpan = _spanLists[i].PopFront();
Span* kSpan = new Span;
// 在nSpan的头部切k页出来
kSpan->_pageId = nSpan->_pageId;
kSpan->_n = k;
nSpan->_pageId += k;
nSpan->_n -= k;
// 切分好后需要将nSpan头插到对应的哈希桶中
_spanLists[nSpan->_n].PushFront(nSpan);
// 建立id和span的映射,方便central cache回收内存时,查找对应的span
for (int i = 0; i < kSpan->_n; ++i)
{
_idSpanMap[kSpan->_pageId + i] = kSpan;
}
return kSpan;
}
}
// 走到这个位置就说明后面没有大页的span了
// 这时就去找堆要一个128页的span
Span* bigSpan = new Span;
// 系统给内存也是按照一定的对齐规则来给的
void* ptr = SystemAlloc(NPAGES - 1);
// 将起始地址转成对应的页号
bigSpan->_pageId = (PAGE_ID)ptr >> PAGE_SHIFT;
bigSpan->_n = NPAGES - 1;
// 将申请到的128页的span头插
_spanLists[bigSpan->_n].PushFront(bigSpan);
return NewSpan(k); // 代码复用
}
// 释放空闲span回到Pagecache,并合并相邻的span
void PageCache::ReleaseSpanToPageCache(Span* span)
{
// 待完善
}
CentralCache.h
class CentralCache
{
public:
// 将一定数量的对象释放到span跨度
void ReleaseListToSpans(void* start, size_t size);
};
// 将一定数量的对象释放到span跨度
void CentralCache::ReleaseListToSpans(void* start, size_t size)
{
size_t index = SizeClass::Index(size);
_spanLists[index]._mtx.lock();
while (start)
{
void* next = NextObj(start);
Span* span = PageCache::GetInstance()->MapObjectToSpan(start);
NextObj(start) = span->_freeList;
span->_freeList = start;
--span->_useCount;
// 说明span切分出去的所有小块内存都还回来了
// 这个span可以回收给page cache,以合并出更
// 大的页缓解内存碎片问题
if (span->_useCount == 0)
{
_spanLists[index].Erase(span);
span->_freeList = nullptr;
span->_prev = nullptr;
span->_next = nullptr;
// 释放span给page cache时,使用page cache的锁就可以了
// 这时把桶锁解掉,其他线程访问这个桶了
_spanLists[index]._mtx.unlock();
PageCache::GetInstance()->_pageMtx.lock();
// 释放空闲span回到Pagecache,并合并相邻的span
PageCache::GetInstance()->ReleaseSpanToPageCache(span);
PageCache::GetInstance()->_pageMtx.unlock();
_spanLists[index]._mtx.lock();
}
start = next;
}
_spanLists[index]._mtx.unlock();
}
page cache回收内存
span 还回来时,要尝试对 span 前后的页进行合并,以缓解外碎片问题!要进行合并,就要知道前后页所在的 span 是否在使用中,那么 span 就加多一个字段 _isUse 来表示 span 是否正在使用。注意:不能通过 _usrCount 是否等于 0 来判断 span 是否正在使用,因为有可能这个 span 是刚申请处理的,其 _useCount 等于 0。
// PageCache.cpp
// 获取一个k页的span
Span* PageCache::NewSpan(size_t k)
{
assert(k > 0 && k < NPAGES);
// 先检查第k个桶里面有没有span
if (!_spanLists[k].Empty())
{
Span* kSpan = _spanLists[k].PopFront();
// 建立id和span的映射,方便central cache回收内存时,查找对应的span
for (int i = 0; i < kSpan->_n; ++i)
{
_idSpanMap[kSpan->_pageId + i] = kSpan;
}
return kSpan;
}
// 检查后面的桶有没有span,如果有可以将它进行切分
for (int i = k + 1; i < NPAGES; ++i)
{
if (!_spanLists[i].Empty())
{
Span* nSpan = _spanLists[i].PopFront();
Span* kSpan = new Span;
// 在nSpan的头部切k页出来
kSpan->_pageId = nSpan->_pageId;
kSpan->_n = k;
nSpan->_pageId += k;
nSpan->_n -= k;
// 切分好后需要将nSpan头插到对应的哈希桶中
_spanLists[nSpan->_n].PushFront(nSpan);
// 建立nSpan的首尾页号与nSpan的映射,方便page cache合并查找
_idSpanMap[nSpan->_pageId] = nSpan;
_idSpanMap[nSpan->_pageId + nSpan->_n - 1] = nSpan;
// 建立id和span的映射,方便central cache回收内存时,查找对应的span
for (int i = 0; i < kSpan->_n; ++i)
{
_idSpanMap[kSpan->_pageId + i] = kSpan;
}
return kSpan;
}
}
// 走到这个位置就说明后面没有大页的span了
// 这时就去找堆要一个128页的span
Span* bigSpan = new Span;
// 系统给内存也是按照一定的对齐规则来给的
void* ptr = SystemAlloc(NPAGES - 1);
// 将起始地址转成对应的页号
bigSpan->_pageId = (PAGE_ID)ptr >> PAGE_SHIFT;
bigSpan->_n = NPAGES - 1;
// 将申请到的128页的span头插
_spanLists[bigSpan->_n].PushFront(bigSpan);
return NewSpan(k); // 代码复用
}
// 释放空闲span回到Pagecache,并合并相邻的span
void PageCache::ReleaseSpanToPageCache(Span* span)
{
// 对span前后的页尝试进行合并,以缓解内存碎片问题
// 向前合并
while (1)
{
PAGE_ID prevId = span->_pageId - 1;
auto ret = _idSpanMap.find(prevId);
//前面的页号没有,不合并了
if (ret == _idSpanMap.end())
break;
Span* prevSpan = ret->second;
// 前面的span正在使用,不合并了
if (prevSpan->_isUse == true)
break;
// 合并出超过128页的span没办法管理,不合并了
if (prevSpan->_n + span->_n > NPAGES - 1)
break;
// 进行合并
span->_pageId = prevSpan->_pageId;
span->_n += prevSpan->_n;
_spanLists[prevSpan->_n].Erase(prevSpan);
delete prevSpan;
}
// 向后合并
while (1)
{
PAGE_ID nextId = span->_pageId + span->_n;
auto ret = _idSpanMap.find(nextId);
// 后面的页号不存在
if (ret == _idSpanMap.end())
break;
Span* nextSpan = ret->second;
// 后面的span正在使用
if (nextSpan->_isUse == true)
break;
// 合并出超过128页的span没办法管理
if (nextSpan->_n + span->_n > NPAGES - 1)
break;
// 进行合并
span->_n += nextSpan->_n;
_spanLists[nextSpan->_n].Erase(nextSpan);
delete nextSpan;
}
_spanLists[span->_n].PushFront(span);
span->_isUse = false;
// 将span的首尾页号和span建立映射
_idSpanMap[span->_pageId] = span;
_idSpanMap[span->_pageId + span->_n - 1] = span;
}
大于256KB的大块内存申请问题
Common.h
// 将内存还给操作系统
inline static void SystemFree(void* ptr)
{
#ifdef _WIN32
VirtualFree(ptr, 0, MEM_RELEASE);
#else
// sbrk unmmap等
#endif
}
class SizeClass
{
// 将申请的内存bytes上调到align的倍数(align为对齐数)
static inline size_t _RoundUp(size_t bytes, size_t align)
{
return ((bytes + align - 1) & ~(align - 1));
}
static inline size_t RoundUp(size_t size)
{
if (size <= 128)
{
return _RoundUp(size, 8);
}
else if (size <= 1024)
{
return _RoundUp(size, 16);
}
else if (size <= 8 * 1024)
{
return _RoundUp(size, 128);
}
else if (size <= 64 * 1024)
{
return _RoundUp(size, 1024);
}
else if (size <= 256 * 1024)
{
return _RoundUp(size, 8 * 1024);
}
else
{
// 按照一页来对齐
return _RoundUp(size, 1 << PAGE_SHIFT);
}
}
};
ConcurrentAlloc.h
#pragma once
#include "Common.h"
#include "ThreadCache.h"
#include "PageCache.h"
// 高并发内存池申请和释放内存的接口
static void* ConcurrentAlloc(size_t size)
{
if (size > MAX_BYTES)
{
// size大于MAX_BYTES时,按照页进行对齐
size_t alignSize = SizeClass::RoundUp(size);
size_t kpage = alignSize >> PAGE_SHIFT;
PageCache::GetInstance()->_pageMtx.lock();
Span* span = PageCache::GetInstance()->NewSpan(kpage);
PageCache::GetInstance()->_pageMtx.unlock();
void* ptr = (void*)(span->_pageId << PAGE_SHIFT);
return ptr;
}
else
{
// 通过TLS,每个线程就可以无锁地获取自己专属的ThreadCache对象
if (pTLSThreadCache == nullptr)
{
pTLSThreadCache = new ThreadCache;
}
//cout << std::this_thread::get_id() << ":" << pTLSThreadCache << endl;
return pTLSThreadCache->Allocate(size);
}
}
// 一般Free接口是不需要指明释放内存的大小的,后面会优化
static void ConcurrentFree(void* ptr, size_t size)
{
if (size > MAX_BYTES)
{
Span* span = PageCache::GetInstance()->MapObjectToSpan(ptr);
PageCache::GetInstance()->_pageMtx.lock();
PageCache::GetInstance()->ReleaseSpanToPageCache(span);
PageCache::GetInstance()->_pageMtx.unlock();
}
else
{
assert(pTLSThreadCache);
pTLSThreadCache->Deallocate(ptr, size);
}
}
PageCache.cpp
// 获取一个k页的span
Span* PageCache::NewSpan(size_t k)
{
assert(k > 0);
// 大于128页的内存直接向操作系统申请
if (k > NPAGES - 1)
{
void* ptr = SystemAlloc(k);
// 建立页号和span的映射可以更好地释放该内存
Span* span = new Span;
span->_pageId = (PAGE_ID)ptr >> PAGE_SHIFT;
span->_n = k;
_idSpanMap[span->_pageId] = span;
return span;
}
// ...
}
// 释放空闲span回到Pagecache,并合并相邻的span
void PageCache::ReleaseSpanToPageCache(Span* span)
{
// 大于128页的内存直接返给操作系统
if (span->_n > NPAGES - 1)
{
void* ptr = (void*)(span->_pageId << PAGE_SHIFT);
SystemFree(ptr);
delete span;
return;
}
// ...
}
使用定长内存池配合脱离new
只需要将使用 new 的地方更换成我们实现的对象池即可,主要在 PageCache.cpp 中
释放对象时优化为不传对象大小
前面进行释放内存时,都需要指明内存的大小。那么现在我们只需要在 span 中再加一个字段 _objSize 就行了,在申请 span 的时候就确定这块 span 切分成小块内存的大小 _objSize。然后通过释放内存的地址 ptr 计算出所在的页号,然后就可以确定要释放内存的大小了。
多线程环境下对比malloc测试
#include"ConcurrentAlloc.h"
// ntimes 一轮申请和释放内存的次数
// rounds 轮次
void BenchmarkMalloc(size_t ntimes, size_t nworks, size_t rounds)
{
std::vector<std::thread> vthread(nworks);
std::atomic<size_t> malloc_costtime = 0;
std::atomic<size_t> free_costtime = 0;
for (size_t k = 0; k < nworks; ++k)
{
vthread[k] = std::thread([&, k]() {
std::vector<void*> v;
v.reserve(ntimes);
for (size_t j = 0; j < rounds; ++j)
{
size_t begin1 = clock();
for (size_t i = 0; i < ntimes; i++)
{
v.push_back(malloc(16));
//v.push_back(malloc((16 + i) % 8192 + 1));
}
size_t end1 = clock();
size_t begin2 = clock();
for (size_t i = 0; i < ntimes; i++)
{
free(v[i]);
}
size_t end2 = clock();
v.clear();
malloc_costtime += (end1 - begin1);
free_costtime += (end2 - begin2);
}
});
}
for (auto& t : vthread)
{
t.join();
}
size_t mallocCosttime = malloc_costtime;
size_t freeCosttime = free_costtime;
size_t tatolCosttime = mallocCosttime + freeCosttime;
printf("%u个线程并发执行%u轮次,每轮次malloc %u次: 花费:%u ms\n",
nworks, rounds, ntimes, mallocCosttime);
printf("%u个线程并发执行%u轮次,每轮次free %u次: 花费:%u ms\n",
nworks, rounds, ntimes, freeCosttime);
printf("%u个线程并发malloc&free %u次,总计花费:%u ms\n",
nworks, nworks * rounds * ntimes, tatolCosttime);
}
// 单轮次申请释放次数 线程数 轮次
void BenchmarkConcurrentMalloc(size_t ntimes, size_t nworks, size_t rounds)
{
std::vector<std::thread> vthread(nworks);
std::atomic<size_t> malloc_costtime = 0;
std::atomic<size_t> free_costtime = 0;
for (size_t k = 0; k < nworks; ++k)
{
vthread[k] = std::thread([&]() {
std::vector<void*> v;
v.reserve(ntimes);
for (size_t j = 0; j < rounds; ++j)
{
size_t begin1 = clock();
for (size_t i = 0; i < ntimes; i++)
{
v.push_back(ConcurrentAlloc(16));
//v.push_back(ConcurrentAlloc((16 + i) % 8192 + 1));
}
size_t end1 = clock();
size_t begin2 = clock();
for (size_t i = 0; i < ntimes; i++)
{
ConcurrentFree(v[i]);
}
size_t end2 = clock();
v.clear();
malloc_costtime += (end1 - begin1);
free_costtime += (end2 - begin2);
}
});
}
for (auto& t : vthread)
{
t.join();
}
size_t mallocCosttime = malloc_costtime;
size_t freeCosttime = free_costtime;
size_t tatolCosttime = mallocCosttime + freeCosttime;
printf("%u个线程并发执行%u轮次,每轮次concurrent alloc %u次: 花费:%u ms\n",
nworks, rounds, ntimes, mallocCosttime);
printf("%u个线程并发执行%u轮次,每轮次concurrent dealloc %u次: 花费:%u ms\n",
nworks, rounds, ntimes, freeCosttime);
printf("%u个线程并发concurrent alloc&dealloc %u次,总计花费:%u ms\n",
nworks, nworks * rounds * ntimes, tatolCosttime);
}
int main()
{
size_t n = 10000;
cout << "==========================================================" << endl;
BenchmarkConcurrentMalloc(n, 4, 10);
cout << endl << endl;
BenchmarkMalloc(n, 4, 10);
cout << "==========================================================" << endl;
return 0;
}
申请内存的大小是固定的
通过上面的两张图片可以看到,高并发内存池的效率并没有比 malloc 的效率高,甚至比 malloc 要慢,那它是慢在哪里呢?那么我们现在就来分析一下性能的瓶颈究竟在哪。
使用基数树进行优化
其实高并发内存池的性能较低,主要是因为锁的竞争和释放等等。
tcmalloc中针对 32、64 位系统设计了不同基数树(考虑这个映射表占用内存的大小):
- 32 位系统使用二级的 Radix Tree(PageMap2),Two-level radix tree
- 64位系统使用三级的 Radix Tree(PageMap3),Three-level radix tree
- PageMap 负责判断并调用 PageMap2 与 PageMap3
一层基数树和两层基数树都是用于 32 位系统,并且它们建立映射关系所需要的空间都是 2 MB。两者的区别是一层基数树直接将用于映射的数组直接开好,而二层基数树的第一层是直接开好的,而第二层的空间是用到了的时候才会开好。基数树的空间使用更加灵活,只有当需要用到某节点时才会去创建它。
为什么哈希表需要加锁?因为一个进程在读哈希表时,其他进程可以向哈希表中写入数据,这样就会导致线程安全问题,所以需要加锁进行保护。
而为什么基数树不用加锁呢?如下图所示:
👉项目完整代码👈
👉扩展学习👈
将高并发内存池的代码打包成静态库和动态库给别人使用。
注意:将代码打包成库时,代码中一定不能包含 main 函数。
实际中我们测试了,当前实现的并发内存池比 malloc / free 是更加高效的,那么我们能否替换到系统调用 malloc 呢?实际上是可以的。
不同平台替换方式不同。 基于 unix 的系统上的 glibc,使用了 weak alias 的方式替换。具体来说是因为这些入口函数都被定义成了 weak symbols,再加上 gcc 支持 alias attribute,所以替换就变成了这种通用形式:
void* malloc(size_t size) THROW attribute__ ((alias (tc_malloc)))
因此所有 malloc 的调用都跳转到了 tc_malloc 的实现。具体参考这里:GCC attribute 之weak,alias属性。
有些平台不支持这样的东西,需要使用 hook 的钩子技术来做。关于hook请看这里:hook。