【C++】-- 高并发内存池

news2025/1/12 1:33:12

高并发内存池

  • 项目介绍
    • 池化技术
    • 内存池
  • 定长内存池的实现
  • 整体框架
  • threadcache
    • threadcache整体设计
    • threadcache哈希桶映射对齐规则
    • TLS无锁访问
  • centralcache
    • centralcache整体设计
    • centralcache结构设计
    • centralcache的实现
  • pagecache
    • pagecache整体设计
    • pagecache中获取Span
  • 回收内存
    • centralcache回收内存
    • centralcache回收内存
    • pagecache回收内存
  • 项目完善
    • 大于256KB的大块内存申请
    • 使用定长内存池配合脱离使用new
    • 释放对象时优化为不传对象大小
    • 使用基数树进行优化
      • 使用基数树进行优化代码实现
  • 多线程环境下对比malloc测试
  • 项目源码

项目介绍

高并发的内存池 参考Google的开源项目 tcmalloc (Thread-Caching Malloc) 实现了高效的多线程内存管理 用于替换系统的内存分配相关函数malloc和free

该项目是模拟实现出一个mini版的tcmalloc 为了学习tcmalloc的精华

该主要涉及C/C++、数据结构(链表、哈希桶)、操作系统内存管理、单例模式、多线程、互斥锁等方面的技术

池化技术

池化技术 就是程序先向系统申请过量的资源 然后进行管理 以备不时之需

因为申请和释放资源都有较大的开销 不如提前申请一些资源放入 池 中 当需要资源时直接从 池 中获取 不需要时就将该资源重新放回 池 中即可 这样使用可以提高程序的运行效率 在计算机中 除了内存池之外 还有连接池 线程池 对象池等

内存池

内存池是指程序预先向操作系统申请一块足够大的内存 此后 当程序中需要申请内存时 不是直接向操作系统申请 而是直接从内存池中获取 当释放内存的时候 并不是真正将内存返回给操作系统 而是将内存返回给内存池 当程序退出时(或某个特定时间) 内存池才将之前申请的内存真正释放

内存池的作用

内存池主要解决的是效率的问题 它能够避免让程序频繁的向系统申请和释放内存 其次 内存池作为系统的内存分配器 还需要尝试解决内存碎片的问题

内部碎片和外部碎片

  • 外部碎片是一些空闲的小块内存区域 由于这些内存空间不连续 以至于合计的内存足够 但是不能满足一些内存分配申请需求
  • 内部碎片是由于一些对齐的需求 导致分配出去的空间中一些内存无法被使用

内存池尝试解决的是外部碎片的问题,同时也尽可能的减少内部碎片

malloc

C/C++中我们要动态申请内存并不是直接去堆申请的 而是通过malloc函数去申请 包括new也是封装了malloc函数

申请内存块时先调用malloc 再去向操作系统申请内存 malloc实际就是一个内存池
在这里插入图片描述

malloc的实现方式有很多种 一般不同编译器平台用的不同 Linux下的gcc用的是glibc中的ptmalloc。

定长内存池的实现

malloc就是一个通用的内存池 在什么场景下都可以使用 并不是针对某种场景专门设计的 所以malloc在什么场景下都不会有很高的性能

定长内存池就是对固定大小内存块的申请和释放的内存池 由于定长内存池只需要支持固定大小内存块的申请和释放 因此我们可以将其性能做到极致 并且在实现定长内存池时不需要考虑内存碎片等问题

定长内存池会作为高并发内存池的一个基础组件

如何实现定长

可以使用非类型模板参数,使得在该内存池中申请到的对象的大小都是N

template<size_t N>
class ObjectPool
{};

定长内存池也叫做对象池 在创建对象池时 对象池可以根据传入的对象类型的大小来实现 定长 因此我们可以通过使用模板参数来实现 定长

template<class T>
class ObjectPool
{};

如何直接向堆申请空间

在Windows下 可以调用VirtualAlloc函数 在Linux下 可以调用brk或mmap函数

#ifdef _WIN32
	#include <Windows.h>
#else   //Linux下的头文件
	    //...
#endif

//直接去堆上按页申请空间
inline static void* SystemAlloc(size_t kpage)
{
#ifdef _WIN32
	void* ptr = VirtualAlloc(0, kpage<<13, MEM_COMMIT | MEM_RESERVE, PAGE_READWRITE);
#else
	// linux下brk mmap等
#endif
	if (ptr == nullptr)                //申请失败抛异常
	{
		throw std::bad_alloc();
	}
	
	return ptr;
}

定长内存池中的成员变量

向堆申请到的大块内存 我们可以用一个指针来对其进行管理 还需要用一个变量来记录这块内存的长度

因为指针的类型决定了指针向前或向后走一步有多大距离 对于字符指针来说 当我们需要向后移动n个字节时 直接对字符指针进行加n操作即可

可以将这些释放回来的定长内存块链接成一个链表 为了管理释放回来的内存块的自由链表 我们还需要一个指向自由链表的指针。
在这里插入图片描述

因此,定长内存池当中包含三个成员变量:

  • _memory 指向大块内存的指针
  • _remainBytes 大块内存切分过程中剩余字节数
  • _freeList 还回来过程中链接的自由链表的头指针

内存池管理释放的对象

对于还回来的定长内存块 我们可以用链表将其链接起来 我们可以让内存块的前4个字节(32位平台)或8个字节(64位平台) 存储下一个内存块的起始地址

因此在向自由链表插入被释放的内存块时 进行链表的头插

在这里插入图片描述
如何让一个指针在32位平台下解引用后能向后访问4个字节 在64位平台下解引用后能向后访问8个字节

我们这里需要的是一个指向指针的指针 这里使用二级指针就行了

void*& NextObj(void* ptr)
{
	return (*(void**)ptr);
}

在释放对象时 我们应该显示调用该对象的析构函数清理该对象 如果不对其进行清理那么就可能会导致内存泄漏

//释放对象
void Delete(T* obj)
{
	//显示调用T的析构函数清理对象
	obj->~T();

	//将释放的对象头插到自由链表
	NextObj(obj) = _freeList;
	_freeList = obj;
}

内存池如何申请对象

当我们申请对象时 内存池应该优先把还回来的内存块对象再次重复利用 因此如果自由链表当中有内存块 就直接从自由链表头删一个内存块进行返回

如果自由链表当中没有内存块 我们就在大块内存中切出定长的内存块进行返回 当内存块切出后及时更新_memory指针的指向 以及_remainBytes的值

在这里插入图片描述
由于当内存块释放时我们需要将内存块链接到自由链表当中 因此我们必须保证切出来的对象至少能够存储下一个地址 所以当对象的大小小于当前所在平台指针的大小时 需要按指针的大小进行内存块的切分

当大块内存已经不足以切分出一个对象时 我们就应该调用封装的SystemAlloc函数 再次向堆申请一块内存空间 并及时更新_memory以及_remainBytes

//申请对象
T* New()
{
	T* obj = nullptr;

	//优先使用还回来的内存块对象
	if (_freeList != nullptr)
	{
		//从自由链表头删一个对象
		obj = (T*)_freeList;
		_freeList = NextObj(_freeList);
	}
	else
	{
		//保证对象能够存储得下地址
		size_t objSize = sizeof(T) < sizeof(void*) ? sizeof(void*) : sizeof(T);
		//剩余内存不够一个对象大小时,则重新开大块空间
		if (_remainBytes < objSize)
		{
			_remainBytes = 128 * 1024;
			_memory = (char*)SystemAlloc(_remainBytes >> 13);
			if (_memory == nullptr)
			{
				throw std::bad_alloc();
			}
		}
		
		//从大块内存中切出objSize字节的内存
		obj = (T*)_memory;
		_memory += objSize;
		_remainBytes -= objSize;
	}
	
	//定位new,显示调用T的构造函数初始化
	new(obj)T;

	return obj;
}

当内存块切分出来后 我们应该使用定位new 显示调用该对象的构造函数对其进行初始化

定长内存池整体代码

//定长内存池
template<class T>
class ObjectPool
{
public:
	//申请对象
	T* New()
	{
		T* obj = nullptr;

		if (_freeList != nullptr)
		{
			//从自由链表头删一个对象
			obj = (T*)_freeList;
			_freeList = NextObj(_freeList);
		}
		else
		{
			//保证对象能够存储得下地址
			size_t objSize = sizeof(T) < sizeof(void*) ? sizeof(void*) : sizeof(T);
			//剩余内存不够一个对象大小时,则重新开大块空间
			if (_remainBytes < objSize)
			{
				_remainBytes = 128 * 1024;
				
				_memory = (char*)SystemAlloc(_remainBytes >> 13);
				if (_memory == nullptr)
				{
					throw std::bad_alloc();
				}
			}
			//从大块内存中切出objSize字节的内存
			obj = (T*)_memory;
			_memory += objSize;
			_remainBytes -= objSize;
		}
		
		//定位new,显示调用T的构造函数初始化
		new(obj)T;

		return obj;
	}
	//释放对象
	void Delete(T* obj)
	{
		//显示调用T的析构函数清理对象
		obj->~T();

		//将释放的对象头插到自由链表
		NextObj(obj) = _freeList;
		_freeList = obj;
	}
private:
	char* _memory = nullptr;     //指向大块内存的指针
	size_t _remainBytes = 0;     //大块内存在切分过程中剩余字节数

	void* _freeList = nullptr;   //还回来过程中链接的自由链表的头指针
};

性能测试代码

struct TreeNode
{
	int _val;
	TreeNode* _left;
	TreeNode* _right;
	TreeNode()
		:_val(0)
		, _left(nullptr)
		, _right(nullptr)
	{}
};

void TestObjectPool()
{
	// 申请释放的轮次
	const size_t Rounds = 3;
	// 每轮申请释放多少次
	const size_t N = 1000000;
	std::vector<TreeNode*> v1;
	v1.reserve(N);

	//malloc和free
	size_t begin1 = clock();
	for (size_t j = 0; j < Rounds; ++j)
	{
		for (int i = 0; i < N; ++i)
		{
			v1.push_back(new TreeNode);
		}
		for (int i = 0; i < N; ++i)
		{
			delete v1[i];
		}
		v1.clear();
	}
	size_t end1 = clock();

	//定长内存池
	ObjectPool<TreeNode> TNPool;
	std::vector<TreeNode*> v2;
	v2.reserve(N);
	size_t begin2 = clock();
	for (size_t j = 0; j < Rounds; ++j)
	{
		for (int i = 0; i < N; ++i)
		{
			v2.push_back(TNPool.New());
		}
		for (int i = 0; i < N; ++i)
		{
			TNPool.Delete(v2[i]);
		}
		v2.clear();
	}
	size_t end2 = clock();

	cout << "new cost time:" << end1 - begin1 << endl;
	cout << "object pool cost time:" << end2 - begin2 << endl;
}

我们先用new申请若干个TreeNode对象 然后再用delete将这些对象释放掉 通过clock函数得到整个过程消耗的时间

然后将其中的new和delete替换为定长内存池当中的New和Delete 再通过clock函数得到该过程消耗的时间

在这里插入图片描述

整体框架

应用场景

现代很多的开发环境都是多核多线程 因此在申请内存的时 必然存在激烈的锁竞争问题 malloc在并发场景下可能会因为频繁的加锁和解锁导致效率降低 而该项目的原型tcmalloc实现的就是一种在多线程高并发场景下更胜一筹的内存池

在实现内存池时我们一般需要考虑到 效率问题 和 内存碎片 的问题 但对于高并发内存池来说 我们还需要考虑在多线程环境下的 锁竞争 问题

高并发内存池整体框架设计

在这里插入图片描述

高并发内存池的三个部分

  • thread cache 每个线程独有 用于小于等于256KB的内存分配 每个线程独享一个thread cache
  • central cache 所有线程所共享 当thread cache需要内存时会按需从central cache中获取内存 central cache也会在合适的时机对thread cache中的内存进行回收
  • page cache 页缓存中存储的内存是 以页为单位进行存储及分配 当central cache需要内存时 page cache会分配出一定数量的页分配给central cache 而page cache也会在合适的时机对central cache中的内存进行回收 并将回收的内存进行合并 组成更大的连续内存块 缓解内存碎片问题

三个部分的作用

  • thread cache 主要解决锁竞争的问题 每个线程独享自己的thread cache 当自己的thread cache中有内存时 只要在自己的thread cache申请内存

  • central cache 主要起到一个居中调度的作用 每个线程的thread cache需要内存时从central cache获取 而当thread cache的内存多了就会将内存还给central cache

  • page cache 负责提供以页为单位的大块内存 当central cache需要内存时就会去向page cache申请 而当page cache没有内存了就会直接去堆上按页申请内存块

每个线程都有一个属于自己的thread cache 所以线程在thread cache申请内存时是不需要加锁的 而一次性申请大于256KB内存的情况是很少的 因此大部分情况下申请内存时都是无锁的

多线程的thread cache可能会同时找central cache申请内存 此时就会涉及线程安全的问题 因此在访问central cache时是需要加锁的 但central cache实际上是一个哈希桶的结构 只有当多个线程同时访问同一个桶时才需要加锁

threadcache

threadcache整体设计

要支持申请和释放不同大小的内存块,那么我们就需要多个自由链表来管理释放回来的内存块 因此thread cache实际上一个哈希桶结构 每个桶中存放的都是一个自由链表

thread cache支持小于等于256KB内存的申请 如果我们将每种字节数的内存块都用一个自由链表进行管理的话 我们就需要20多万个自由链表,光是存储这些自由链表的头指针就需要消耗大量内存

这时我们可以让这些字节数按照某种规则进行对齐 当线程申请1~8字节的内存时会直接给出8字节 9 ~ 6字节会直接给出16字节

在这里插入图片描述
因此当线程要申请某一大小的内存块时 就需要经过某种计算得到对齐后的字节数 进而找到对应的哈希桶 如果该哈希桶中的自由链表中有内存块 那就从自由链表中头删一个内存块进行返回 如果该自由链表已经为空了 就需要向下一层的central cache进行获取

我们可以对自由链表这个结构进行封装 提供Push和Pop两个成员函数 对应的操作分别是将对象插入到自由链表(头插)和从自由链表获取一个对象(头删)

//管理切分好的小对象的自由链表
class FreeList
{
public:
	//将释放的对象头插到自由链表
	void Push(void* obj)
	{
		assert(obj);

		//头插
		NextObj(obj) = _freeList;
		_freeList = obj;
	}

	//从自由链表头部获取一个对象
	void* Pop()
	{
		assert(_freeList);

		//头删
		void* obj = _freeList;
		_freeList = NextObj(_freeList);
		return obj;
	}

private:
	void* _freeList = nullptr; //自由链表
};

thread cache实际就是一个数组 数组中存储的是一个个的自由链表 至于这个数组中到底存储了多少个自由链表 就需要看我们在进行字节数对齐时具体用的是什么映射对齐规则

threadcache哈希桶映射对齐规则

如何进行对齐

这些内存块是会被链接到自由链表上 因为我们要保证这些内存块无论是在32位平台下还是64位平台下都至少能够存储得下一个指针 因此一开始按8字节进行对齐是最合适的

	// [1,128]					8byte对齐	    freelist[0,16)
	// [128+1,1024]				16byte对齐	    freelist[16,72)
	// [1024+1,8*1024]			128byte对齐	    freelist[72,128)
	// [8*1024+1,64*1024]		1024byte对齐     freelist[128,184)
	// [64*1024+1,256*1024]		8*1024byte对齐   freelist[184,208)

空间浪费率

整体控制在10%左右的内碎片浪费

对齐和映射的相关函数

我们需要提供两个对应的函数 分别用于获取某一字节数对齐后的字节数 以及该字节数对应的哈希桶下标

//管理对齐和映射等关系
class SizeClass
{
public:
	//获取向上对齐后的字节数
	static inline size_t RoundUp(size_t bytes);
	//获取对应哈希桶的下标
	static inline size_t Index(size_t bytes);
};

SizeClass类当中的成员函数设置为静态成员函数 否则我们在调用这些函数时就需要通过对象去调用 并且对于这些可能会频繁调用的函数 可以考虑将其设置为内联函数

在获取某一字节数向上对齐后的字节数时 可以先判断该字节数属于哪一个区间 然后再通过调用一个子函数进行进一步处理

//获取向上对齐后的字节数
static inline size_t RoundUp(size_t bytes)
{
	if (bytes <= 128)
	{
		return _RoundUp(bytes, 8);
	}
	else if (bytes <= 1024)
	{
		return _RoundUp(bytes, 16);
	}
	else if (bytes <= 8 * 1024)
	{
		return _RoundUp(bytes, 128);
	}
	else if (bytes <= 64 * 1024)
	{
		return _RoundUp(bytes, 1024);
	}
	else if (bytes <= 256 * 1024)
	{
		return _RoundUp(bytes, 8 * 1024);
	}
	else
	{
		assert(false);
		return -1;
	}
}

编写一个子函数 该子函数需要通过对齐数计算出某一字节数对齐后的字节数

方法1

static inline size_t _RoundUp(size_t bytes, size_t alignNum)
{
	size_t alignSize = 0;
	if (bytes%alignNum != 0)
	{
		alignSize = (bytes / alignNum + 1)*alignNum;
	}
	else
	{
		alignSize = bytes;
	}
	return alignSize;
}

方法2(位运算效率高)

static inline size_t _RoundUp(size_t bytes, size_t alignNum)
{
	return ((bytes + alignNum - 1)&~(alignNum - 1));
}

在获取某一字节数对应的哈希桶下标时 也是先判断该字节数属于哪一个区间 然后再通过调用一个子函数进行进一步处理

//获取对应哈希桶的下标
static inline size_t Index(size_t bytes)
{
	//每个区间有多少个自由链表
	static size_t groupArray[4] = { 16, 56, 56, 56 };
	if (bytes <= 128)
	{
		return _Index(bytes, 3);
	}
	else if (bytes <= 1024)
	{
		return _Index(bytes - 128, 4) + groupArray[0];
	}
	else if (bytes <= 8 * 1024)
	{
		return _Index(bytes - 1024, 7) + groupArray[0] + groupArray[1];
	}
	else if (bytes <= 64 * 1024)
	{
		return _Index(bytes - 8 * 1024, 10) + groupArray[0] + groupArray[1] + groupArray[2];
	}
	else if (bytes <= 256 * 1024)
	{
		return _Index(bytes - 64 * 1024, 13) + groupArray[0] + groupArray[1] + groupArray[2] + groupArray[3];
	}
	else
	{
		assert(false);
		return -1;
	}
}

编写一个子函数来继续进行处理

//一般写法
static inline size_t _Index(size_t bytes, size_t alignNum)
{
	size_t index = 0;
	if (bytes%alignNum != 0)
	{
		index = bytes / alignNum;
	}
	else
	{
		index = bytes / alignNum - 1;
	}
	return index;
}

了提高效率下面也提供了一个用位运算来解决的方法 此时我们不是传入该字节数的对齐数 而是将对齐数写成2的n次方的形式后 将这个n值进行传入 比如对齐数是8 传入的就是3

static inline size_t _Index(size_t bytes, size_t alignShift)
{
	return ((bytes + (1 << alignShift) - 1) >> alignShift) - 1;
}

ThreadCache类

按照上述的对齐规则 thread cache中桶的个数 也就是自由链表的个数是208 以及thread cache允许申请的最大内存大小256KB

//小于等于MAX_BYTES,就找thread cache申请
//大于MAX_BYTES,就直接找page cache或者系统堆申请
static const size_t MAX_BYTES = 256 * 1024;
//thread cache和central cache自由链表哈希桶的表大小
static const size_t NFREELISTS = 208;

现在就可以对ThreadCache类进行定义了

class ThreadCache
{
public:
	//申请内存对象
	void* Allocate(size_t size);

private:
	FreeList _freeLists[NFREELISTS]; //哈希桶
};

在thread cache申请对象时 通过所给字节数计算出对应的哈希桶下标 如果桶中自由链表不为空 则从该自由链表中取出一个对象进行返回 但如果此时自由链表为空 那么就需要从central cache进行获取 这里的FetchFromCentralCache函数是thread cache类中的一个成员函数

//申请内存对象
void* ThreadCache::Allocate(size_t size)
{
	assert(size <= MAX_BYTES);
	size_t alignSize = SizeClass::RoundUp(size);
	size_t index = SizeClass::Index(size);
	if (!_freeLists[index].Empty())
	{
		return _freeLists[index].Pop();
	}
	else
	{
		return FetchFromCentralCache(index, alignSize);
	}
}

TLS无锁访问

每个线程都有一个自己独享的thread cache 我们不能将这个thread cache创建为全局的 因为全局变量是所有线程共享的

要实现每个线程无锁的访问属于自己的thread cache 我们需要用到线程局部存储TLS(Thread Local Storage) 这是一种变量的存储方法 使用该存储方法的变量只能当前线程访问不能被其他线程访问

//TLS - Thread Local Storage
static _declspec(thread) ThreadCache* pTLSThreadCache = nullptr;

当该线程调用申请内存的接口时会创建自己的thread cache

//通过TLS,每个线程无锁的获取自己专属的ThreadCache对象
if (pTLSThreadCache == nullptr)
{
	pTLSThreadCache = new ThreadCache;
}

centralcache

centralcache整体设计

当线程申请某一大小的内存时 如果thread cache中对应的自由链表不为空 那么直接取出一个内存块进行返回 但如果此时对应位置的自由链表为空 thread cache就需要向central cache申请内存

central cache的结构与thread cache一样 它们都是哈希桶结构 并且它们遵循的对齐映射规则一样 这样当thread cache的某个桶中没有内存了 就可以直接到central cache中对应的哈希桶里去取

central cache与thread cache的不同

thread cache是每个线程独享的 而central cache是所有线程共享的 每个线程的thread cache没有内存了都会去找central cache 因此在访问central cache时需要加锁

central cache在加锁时用的是桶锁 只有当多个线程同时访问central cache的同一个桶时才会存在锁竞争 如果是多个线程同时访问central cache的不同桶时就不会存在锁竞争

thread cache的每个桶中挂的是一个个切好的内存块,而central cache的每个桶中挂的是一个个的span

在这里插入图片描述
每个span管理的是一个以页为单位的大块内存 每个桶里面的若干span是按照带头双链表的形式链接起来 并且每个span里面有一个自由链表指针 这个自由链表里面挂的就是一个个切好了的内存块 根据其所在的哈希桶这些内存块被切成了对应的大小

centralcache结构设计

页号类型

#ifdef _WIN64
	typedef unsigned long long PAGE_ID;
#elif _WIN32
	typedef size_t PAGE_ID;
#else
	//linux
#endif

在windows操作系统下 32位下 _WIN32有定义 _WIN64没有定义 而在64位下 _WIN32和_WIN64都有定义 因此 在条件编译时 我们应该先判断_WIN64是否定义 再判断_WIN32是否定义

span的结构

central cache的每个桶里挂的是一个个的span span是一个管理以页为单位的大块内存

//管理以页为单位的大块内存
struct Span
{
	PAGE_ID _pageId = 0;        //大块内存起始页的页号
	size_t _n = 0;              //页的数量

	Span* _next = nullptr;      //双链表结构
	Span* _prev = nullptr;

	//size_t _objSize = 0;        //切好的小对象的大小
	
	size_t _useCount = 0;       //切好的小块内存,被分配给thread cache的计数
	void* _freeList = nullptr;  //切好的小块内存的自由链表

	//bool _isUse = false;        //是否在被使用
};

对于span管理的以页为单位的大块内存 我们需要知道这块内存具体在哪一个位置 便于之后page cache进行前后页的合并 因此span结构当中会记录所管理大块内存起始页的页号

至于每一个span管理的到底是多少个页 这并不是固定的 因此span结构当中有一个_n成员 该成员代表着该span管理的页的数量

每个span管理的大块内存 都会被切成相应大小的内存块挂到当前span的自由链表中 因此span结构中需要存储切好的小块内存的自由链表

_useCount表示当前span中切好的小块内存被分配给thread cache的计数 当某个span的_useCount计数变为0时 代表当前span切出去的内存块对象全部还回来了 此时central cache就可以将这个span还给page cache

每个桶当中的span是以双链表的形式组织起来的 当我们需要将某个span归还给page cache时 就可以很方便的将该span从双链表结构中移出

Span的双链表结构

//带头双向循环链表
class SpanList
{
public:
	SpanList()
	{
		_head = new Span;
		_head->_next = _head;
		_head->_prev = _head;
	}
	void Insert(Span* pos, Span* newSpan)
	{
		assert(pos);
		assert(newSpan);

		Span* prev = pos->_prev;

		prev->_next = newSpan;
		newSpan->_prev = prev;

		newSpan->_next = pos;
		pos->_prev = newSpan;
	}
	void Erase(Span* pos)
	{
		assert(pos);
		assert(pos != _head); //不能删除哨兵位的头结点

		Span* prev = pos->_prev;
		Span* next = pos->_next;

		prev->_next = next;
		next->_prev = prev;
	}
private:
	Span* _head;
public:
	std::mutex _mtx; //桶锁
};

从双链表删除的span会还给下一层的page cache 相当于只是把这个span从双链表中移除 因此不需要对删除的span进行delete操作

central cache的结构

central cache的映射规则和thread cache是一样的 因此central cache里面哈希桶的个数也是208 central cache每个哈希桶中存储是双链表结构

class CentralCache
{
public:
	//...
private:
	SpanList _spanLists[NFREELISTS];
};

central cache和thread cache的映射规则一样 当thread cache的某个桶没有内存了 可以直接去central cache对应的哈希桶进行申请

centralcache的实现

central cache的实现方式

每个线程都有一个属于自己的thread cache 用TLS来实现每个线程无锁的访问属于自己的thread cache 而central cache和page cache在整个进程中只有一个 可以将其设置为单例模式

单例模式可以保证系统中该类只有一个实例 并提供一个访问它的全局访问点 该实例被所有程序模块共享 单例模式又分为饿汉模式和懒汉模式

//单例模式
class CentralCache
{
public:
	//提供一个全局访问点
	static CentralCache* GetInstance()
	{
		return &_sInst;
	}
private:
	SpanList _spanLists[NFREELISTS];
private:
	CentralCache() //构造函数私有
	{}
	CentralCache(const CentralCache&) = delete; //防拷贝

	static CentralCache _sInst;
};

CentralCache类当中需要有一个CentralCache类型的静态的成员变量 当程序运行起来后就立马创建该对象 在此后的程序中就只有这一个单例

静态的成员函数在类外初始化

CentralCache CentralCache::_sInst;

慢开始反馈调节算法

当thread cache向central cache申请内存时 central cache应该给出多少个对象才合适

这里采用了一个慢开始反馈调节算法 当thread cache向central cache申请内存时 如果申请的是较小的对象 那么 多给一点 如果申请的是较大的对象 可以少给一点

通过下面这个函数 我们就可以根据所需申请的对象的大小计算出具体给出的对象个数 并且可以将给出的对象个数控制到2~512个之间

//管理对齐和映射等关系的类
class SizeClass
{
public:
	//thread cache一次从central cache获取对象的上限
	static size_t NumMoveSize(size_t size)
	{
		assert(size > 0);
	
		//对象越小,计算出的上限越高
		//对象越大,计算出的上限越低
		int num = MAX_BYTES / size;
		if (num < 2)
			num = 2;
		if (num > 512)
			num = 512;
	
		return num;
	}
};

就算申请的是小对象 一次性给出512个也是比较多的 我们可以在FreeList结构中增加一个叫做_maxSize的成员变量 该变量的初始值设置为1 并且提供一个公有成员函数用于获取这个变量

//管理切分好的小对象的自由链表
class FreeList
{
public:
	size_t& MaxSize()
	{
		return _maxSize;
	}

private:
	void* _freeList = nullptr; //自由链表
	size_t _maxSize = 1;
};

此时当thread cache申请对象时 我们会比较_maxSize和计算得出的值 取出其中的较小值作为本次申请对象的个数 如果本次采用的是_maxSize的值 那么会将thread cache中该自由链表的_maxSize的值进行加一

因此 thread cache第一次向central cache申请某大小的对象时 申请到的都是一个 但下一次thread cache再向central cache申请同样大小的对象时 最终就会申请到两个 直到该自由链表中_maxSize的值增长到超过计算出的值后就不会继续增长了

从中心缓存获取对象

每次thread cache向central cache申请对象时 我们先通过慢开始反馈调节算法计算出本次应该申请的对象的个数 然后再向central cache进行申请

如果thread cache最终申请到对象的个数就是一个 那么直接将该对象返回 因为thread cache要向central cache申请对象 其实由于某个线程向thread cache申请对象但thread cache当中没有 才导致thread cache要向central cache申请对象 因此central cache将对象返回给thread cache后 thread cache会再将该对象返回给申请对象的线程

如果thread cache最终申请到的是多个对象 那么将第一个对象返回之外 还需要将剩下的对象挂到thread cache对应的哈希桶当中

//从中心缓存获取对象
void* ThreadCache::FetchFromCentralCache(size_t index, size_t size)
{
	//慢开始反馈调节算法
	size_t batchNum = std::min(_freeLists[index].MaxSize(), SizeClass::NumMoveSize(size));
	if (batchNum == _freeLists[index].MaxSize())
	{
		_freeLists[index].MaxSize() += 1;
	}
	
	void* start = nullptr;
	void* end = nullptr;
	size_t actualNum = CentralCache::GetInstance()->FetchRangeObj(start, end, batchNum, size);
	
	assert(actualNum >= 1); //至少有一个

	if (actualNum == 1) //申请到对象的个数是一个,则直接将这一个对象返回即可
	{
		assert(start == end);
		return start;
	}
	else //申请到对象的个数是多个,还需要将剩下的对象挂到thread cache中对应的哈希桶中
	{
		_freeLists[index].PushRange(NextObj(start), end);
		return start;
	}
}

从中心缓存获取一定数量的对象

这里我们要从central cache获取n个指定大小的对象 这些对象从central cache对应哈希桶的某个span中取出来的 因此取出来的这n个对象是链接在一起的 我们只需要得到这段链表的头和尾即可 这里可以采用输出型参数进行获取

//从central cache获取一定数量的对象给thread cache
size_t CentralCache::FetchRangeObj(void*& start, void*& end, size_t n, size_t size)
{
	size_t index = SizeClass::Index(size);
	_spanLists[index]._mtx.lock();  //加锁
	
	//在对应哈希桶中获取一个非空的span
	Span* span = GetOneSpan(_spanLists[index], size);
	assert(span); 			  		//span不为空
	assert(span->_freeList);        //span当中的自由链表也不为空

	//从span中获取n个对象
	//如果不够n个,有多少拿多少
	start = span->_freeList;
	end = span->_freeList;
	size_t actualNum = 1;
	while (NextObj(end)&&n - 1)
	{
		end = NextObj(end);
		actualNum++;
		n--;
	}
	
	span->_freeList = NextObj(end);    //取完后剩下的对象继续放到自由链表
	NextObj(end) = nullptr;            //取出的一段链表的表尾置空
	span->_useCount += actualNum;      //更新被分配给thread cache的计数

	_spanLists[index]._mtx.unlock();   //解锁
	return actualNum;
}

由于central cache是所有线程共享的 所以我们在访问central cache中的哈希桶时 需要先给对应的哈希桶加上桶锁 在获取到对象后再将桶锁解掉

在向central cache获取对象时 先是在central cache对应的哈希桶中获取到一个非空的span 然后从这个span的自由链表中取出n个对象即可 但可能这个非空的span的自由链表当中对象的个数不足n个 这时该自由链表当中有多少个对象就给多少

插入一段范围的对象到自由链表

如果thread cache从central cache获取到的对象个数是大于1的 那么我们还需要将剩下的对象插入到thread cache中对应的哈希桶中 为了能让自由链表支持插入一段范围的对象 我们还需要在FreeList类中增加一个对应的成员函数

//管理切分好的小对象的自由链表
class FreeList
{
public:
	//插入一段范围的对象到自由链表
	void PushRange(void* start, void* end)
	{
		assert(start);
		assert(end);

		//头插
		NextObj(end) = _freeList;
		_freeList = start;
	}
private:
	void* _freeList = nullptr; //自由链表
	size_t _maxSize = 1;
};

pagecache

pagecache整体设计

page cache与central cache一样 它们都是哈希桶的结构 并且page cache的每个哈希桶中里挂的也是一个个的span 这些span也是按照双链表的结构链接起来的

page cache的映射规则与central cache 和 thread cache都不相同 page cache的哈希桶映射规则采用的是直接定址法

central cache每个桶中的span被切成了一个个对应大小的对象 以供thread cache申请 而page cache当中的span是没有被进一步切小的 当central cache没有span时 向page cache申请的是某一固定页数的span central cache自己进行切分

在这里插入图片描述
这里我们最大挂128页的span 为了让桶号与页号对应起来 我们可以将第0号桶空出来不用 因此我们需要将哈希桶的个数设置为129。

//page cache中哈希桶的个数
static const size_t NPAGES = 129;

因为线程申请单个对象最大是256KB 而128页可以被切成4个256KB的对象 因此比较合适

在page cache获取一个n页的span的过程

如果central cache要获取一个n页的span 那我们就可以在page cache的第n号桶中取出一个span返回给central cache即可 但如果第n号桶中没有span了 这时我们在后面的桶中寻找span

直接向堆申请以页为单位的内存时 我们应该尽量申请大块一点的内存块 因为此时申请到的内存是连续的 当线程需要内存时我们可以将其切小后分配给线程 当线程将内存释放后我们又可以将其合并成大块的连续内存

当第n号桶中没有span时 我们可以继续找第n+1号桶 可以将n+1页的span切分成一个n页的span和一个1页的span 这时我们就可以将n页的span返回 而将切分后1页的span挂到1号桶中 但如果后面的桶当中都没有span 这时我们就只能向堆申请一个128页的内存块 并将其用一个span结构管理起来 然后将128页的span切分成n页的span和128-n页的span 其中n页的span返回给central cache 而128-n页的span就挂到第128-n号桶中

我们每次向堆申请的都是128页大小的内存块 central cache要的这些span实际都是由128页的span切分出来的

page cache的实现

当每个线程的thread cache没有内存时都会向central cache申请 此时多个线程的thread cache如果访问的不是central cache的同一个桶 那么这些线程是可以同时进行访问 这时central cache的多个桶就可能同时向page cache申请内存 所以page cache也是存在线程安全问题的 因此在访问page cache时也必须要加锁

但是在page cache这里我们不能使用桶锁 因为当central cache向page cache申请内存时 page cache可能会将其他桶当中大页的span切小后再给central cache 此外 当central cache将某个span归还给page cache时 page cache也会尝试将该span与其他桶当中的span进行合并

在访问page cache时 我们可能需要访问page cache中的多个桶 如果page cache用桶锁就会出现大量频繁的加锁和解锁 因此我们在访问page cache时使用一个大锁将整个page cache给锁住

page cache在整个进程中也是只能存在一个的 因此需要将其设置为单例模式

//单例模式
class PageCache
{
public:
	//提供一个全局访问点
	static PageCache* GetInstance()
	{
		return &_sInst;
	}
private:
	SpanList _spanLists[NPAGES];
	std::mutex _pageMtx;     //大锁
private:
	PageCache()               //构造函数私有
	{}
	PageCache(const PageCache&) = delete; //防拷贝

	static PageCache _sInst;
};

当程序运行起来后我们就立马创建该对象

PageCache PageCache::_sInst;

pagecache中获取Span

获取一个非空的span

thread cache向central cache申请对象时 central cache需要先从对应的哈希桶中获取到一个非空的span 然后从这个非空的span中取出若干对象返回给thread cache

先遍历central cache对应哈希桶当中的双链表 如果该双链表中有非空的span 那么直接将该span进行返回即可 为了方便遍历这个双链表

//带头双向循环链表
class SpanList
{
public:
	Span* Begin()
	{
		return _head->_next;
	}
	Span* End()
	{
		return _head;
	}
private:
	Span* _head;
public:
	std::mutex _mtx; //桶锁
};

但如果遍历双链表后发现双链表中没有span 或该双链表中的span都为空 那么此时central cache就需要向page cache申请内存块

那具体是向page cache申请多大的内存块 我们可以根据具体所需对象的大小来决定 就像之前我们根据对象的大小计算出 thread cache一次向central cache申请对象的个数上限 现在我们是根据对象的大小计算出 central cache一次应该向page cache申请几页的内存块

我们可以先根据对象的大小计算出 thread cache一次向central cache申请对象的个数上限 然后将这个上限值乘以单个对象的大小 就算出了具体需要多少字节 最后再将这个算出来的字节数转换为页数 如果转换后不够一页 那么我们就申请一页 否则转换出来是几页就申请几页 也就是说 central cache向page cache申请内存时 要求申请到的内存尽量能够满足thread cache向central cache申请时的上限

class SizeClass
{
public:
	//central cache一次向page cache获取多少页
	static size_t NumMovePage(size_t size)
	{
		size_t num = NumMoveSize(size); //计算出thread cache一次向central cache申请对象的个数上限
		size_t nPage = num*size;        //num个size大小的对象所需的字节数

		nPage >>= PAGE_SHIFT;    		//将字节数转换为页数
		if (nPage == 0) 
		{				//至少给一页
			nPage = 1;
		}
		
		return nPage;
	}
};

代码中的PAGE_SHIFT代表页大小转换偏移 我们这里以页的大小为8K为例 PAGE_SHIFT的值就是13

//页大小转换偏移,即一页定义为2^13,也就是8KB
static const size_t PAGE_SHIFT = 13;

当central cache申请到若干页的span后 还需要将这个span切成一个个对应大小的对象挂到该span的自由链表当中

为了找到一个span所管理的内存块首先需要计算出该span的起始地址 我们可以用这个span的起始页号乘以一页的大小即可得到这个span的起始地址 然后用这个span的页数乘以一页的大小就可以得到这个span所管理的内存块的大小 用起始地址加上内存块的大小即可得到这块内存块的结束位置

明确了这块内存的起始和结束位置后 我们就可以进行切分了 根据所需对象的大小 每次从大块内存切出一块固定大小的内存块 尾插 到span的自由链表中即可 因为将切好的对象尾插到自由链表 这些对象看起来是按照链式结构链接起来的 而实际它们在物理上是连续的 这时当我们把这些连续内存分配给某个线程使用时 可以提高该线程的CPU缓存利用率

//获取一个非空的span
Span* CentralCache::GetOneSpan(SpanList& spanList, size_t size)
{
	//1、先在spanList中寻找非空的span
	Span* it = spanList.Begin();
	while (it != spanList.End())
	{
		if (it->_freeList != nullptr)
		{
			return it;
		}
		else
		{
			it = it->_next;
		}
	}

	//2、spanList中没有非空的span,只能向page cache申请
	Span* span = PageCache::GetInstance()->NewSpan(SizeClass::NumMovePage(size));
	//计算span的大块内存的起始地址和大块内存的大小(字节数)
	char* start = (char*)(span->_pageId << PAGE_SHIFT);
	size_t bytes = span->_n << PAGE_SHIFT;

	//把大块内存切成size大小的对象链接起来
	char* end = start + bytes;
	//先切一块下来去做尾,方便尾插
	span->_freeList = start;
	start += size;
	void* tail = span->_freeList;
	
	//尾插
	while (start < end)
	{
		NextObj(tail) = start;
		tail = NextObj(tail);
		start += size;
	}
	
	NextObj(tail) = nullptr; //尾的指向置空
	
	//将切好的span头插到spanList
	spanList.PushFront(span);

	return span;
}

当把span切好后 需要将这个切好的span挂到central cache的对应哈希桶中 因此SpanList类还需要提供一个接口 用于将一个span插入到该双链表中

//带头双向循环链表
class SpanList
{
public:
	void PushFront(Span* span)
	{
		Insert(Begin(), span);
	}
private:
	Span* _head;
public:
	std::mutex _mtx; //桶锁
};

获取一个k页的span

当我们调用上述的GetOneSpan从central cache的某个哈希桶获取一个非空的span时 如果遍历哈希桶中的双链表后发现双链表中没有span 或该双链表中的span都为空 那么此时central cache就需要向page cache申请若干页的span

因为page cache是直接按照页数进行映射的 因此我们要从page cache获取一个k页的span 就应该直接先去找page cache的第k号桶 如果第k号桶中有span 那我们直接头删一个span返回给central cache就行 需要再给SpanList类添加对应的Empty和PopFront函数

//带头双向循环链表
class SpanList
{
public:
	bool Empty()
	{
		return _head == _head->_next;
	}
	Span* PopFront()
	{
		Span* front = _head->_next;
		Erase(front);
		return front;
	}
private:
	Span* _head;
public:
	std::mutex _mtx; //桶锁
};

如果page cache的第k号桶中没有span 我们就应该继续找后面的桶 只要后面任意一个桶中有一个n页span 我们就可以将其切分成一个k页的span和一个n-k页的span 然后将切出来k页的span返回给central cache 再将n-k页的span挂到page cache的第n-k号桶中

但如果后面的桶中也都没有span 需要向堆申请一个128页的span 在向堆申请内存时直接调用我们封装的SystemAlloc函数

向堆申请内存后得到的是这块内存的起始地址 此时我们需要将该地址转换为页号 由于我们向堆申请内存时都是按页进行申请的 因此我们直接将该地址除以一页的大小即可得到对应的页号

//获取一个k页的span
Span* PageCache::NewSpan(size_t k)
{
	assert(k > 0 && k < NPAGES);
	//先检查第k个桶里面有没有span
	if (!_spanLists[k].Empty())
	{
		return _spanLists[k].PopFront();
	}
	
	//检查一下后面的桶里面有没有span,如果有可以将其进行切分
	for (size_t i = k + 1; i < NPAGES; i++)
	{
		if (!_spanLists[i].Empty())
		{
			Span* nSpan = _spanLists[i].PopFront();
			Span* kSpan = new Span;
			//在nSpan的头部切k页下来
			kSpan->_pageId = nSpan->_pageId;
			kSpan->_n = k;

			nSpan->_pageId += k;
			nSpan->_n -= k;
			//将剩下的挂到对应映射的位置
			_spanLists[nSpan->_n].PushFront(nSpan);
			return kSpan;
		}
	}
	
	//走到这里说明后面没有大页的span了,这时就向堆申请一个128页的span
	Span* bigSpan = new Span;
	void* ptr = SystemAlloc(NPAGES - 1);
	bigSpan->_pageId = (PAGE_ID)ptr >> PAGE_SHIFT;
	bigSpan->_n = NPAGES - 1;

	_spanLists[bigSpan->_n].PushFront(bigSpan);

	//尽量避免代码重复,递归调用自己
	return NewSpan(k);
}

我们可以先将申请到的128页的span挂到page cache对应的哈希桶中 然后再递归调用该函数 此时在往后找span时就一定会在第128号桶中找到该span 然后进行切分

当central cache向page cache申请内存时 central cache对应的哈希桶是处于加锁的状态 在访问page cache前 先把central cache对应的桶锁解掉 虽然此时central cache的这个桶当中是没有内存供其他thread cache申请 但thread cache除了申请内存还会释放内存 如果在访问page cache前将central cache对应的桶锁解掉 那么此时当其他thread cache想要归还内存到central cache的这个桶时就不会被阻塞

回收内存

centralcache回收内存

当thread cache中某个自由链表太长时 会将自由链表当中的这些对象还给central cache中的span

还给central cache的这些对象不一定都是属于同一个span的 central cache中的每个哈希桶当中可能都不止一个span 因此当我们计算出还回来的对象应该还给central cache的哪一个桶后 还需要知道这些对象到底应该还给这个桶当中的哪一个span

根据对象地址找到对应的span

某个页当中的所有地址除以页的大小都等该页的页号 但是我们还是不能知道这个对象到底属于哪一个span

我们可以建立页号和span之间的映射 由于这个映射关系在page cache进行span的合并时也需要用到 因此我们直接将其存放到page cache里面 这时我们就需要在PageCache类当中添加一个映射关系 这里可以用C++当中的unordered_map进行实现 并且添加一个函数接口 用于让central cache获取这里的映射关系

class PageCache
{
public:
	//获取从对象到span的映射
	Span* MapObjectToSpan(void* obj);
private:
	std::unordered_map<PAGE_ID, Span*> _idSpanMap;
};

每当page cache分配span给central cache时 都需要记录一下页号和span之间的映射关系 此后当thread cache还对象给central cache时 才知道应该具体还给哪一个span

因此当central cache在调用NewSpan接口向page cache申请k页的span时 page cache在返回这个k页的span给central cache之前 应该建立这k个页号与该span之间的映射关系

//获取一个k页的span
Span* PageCache::NewSpan(size_t k)
{
	assert(k > 0 && k < NPAGES);
	//先检查第k个桶里面有没有span
	if (!_spanLists[k].Empty())
	{
		Span* kSpan = _spanLists[k].PopFront();

		//建立页号与span的映射,方便central cache回收小块内存时查找对应的span
		for (PAGE_ID i = 0; i < kSpan->_n; i++)
		{
			_idSpanMap[kSpan->_pageId + i] = kSpan;
		}

		return kSpan;
	}
	
	//检查一下后面的桶里面有没有span,如果有可以将其进行切分
	for (size_t i = k + 1; i < NPAGES; i++)
	{
		if (!_spanLists[i].Empty())
		{
			Span* nSpan = _spanLists[i].PopFront();
			Span* kSpan = new Span;
			//在nSpan的头部切k页下来
			kSpan->_pageId = nSpan->_pageId;
			kSpan->_n = k;

			nSpan->_pageId += k;
			nSpan->_n -= k;
			//将剩下的挂到对应映射的位置
			_spanLists[nSpan->_n].PushFront(nSpan);

			//建立页号与span的映射,方便central cache回收小块内存时查找对应的span
			for (PAGE_ID i = 0; i < kSpan->_n; i++)
			{
				_idSpanMap[kSpan->_pageId + i] = kSpan;
			}

			return kSpan;
		}
	}
	
	//走到这里说明后面没有大页的span了,这时就向堆申请一个128页的span
	Span* bigSpan = new Span;
	void* ptr = SystemAlloc(NPAGES - 1);
	bigSpan->_pageId = (PAGE_ID)ptr >> PAGE_SHIFT;
	bigSpan->_n = NPAGES - 1;

	_spanLists[bigSpan->_n].PushFront(bigSpan);

	//尽量避免代码重复,递归调用自己
	return NewSpan(k);
}

此时我们就可以通过对象的地址找到该对象对应的span了 直接将该对象的地址除以页的大小得到页号 然后在unordered_map当中找到其对应的span即可

//获取从对象到span的映射
Span* PageCache::MapObjectToSpan(void* obj)
{
	PAGE_ID id = (PAGE_ID)obj >> PAGE_SHIFT; //页号
	auto ret = _idSpanMap.find(id);
	if (ret != _idSpanMap.end())
	{
		return ret->second;
	}
	else
	{
		assert(false);
		return nullptr;
	}
}

当我们要通过某个页号查找其对应的span时 该页号与其span之间的映射一定是建立过的 如果此时我们没有在unordered_map当中找到 则说明我们之前的代码逻辑有问题 因此当没有找到对应的span时可以直接用断言结束程序

centralcache回收内存

当thread cache还对象给central cache时 就可以依次遍历这些对象 将这些对象插入到其对应span的自由链表当中 并且及时更新该span的_usseCount计数

在thread cache还对象给central cache的过程中 如果central cache中某个span的_useCount减到0 说明这个span分配出去的对象全部都还回来了 就可以将这个span再进一步还给page cache

//将一定数量的对象还给对应的span
void CentralCache::ReleaseListToSpans(void* start, size_t size)
{
	size_t index = SizeClass::Index(size);
	_spanLists[index]._mtx.lock(); //加锁
	while (start)
	{
		void* next = NextObj(start); //记录下一个
		Span* span = PageCache::GetInstance()->MapObjectToSpan(start);
		//将对象头插到span的自由链表
		NextObj(start) = span->_freeList;
		span->_freeList = start;

		span->_useCount--; //更新被分配给thread cache的计数
		if (span->_useCount == 0) //说明这个span分配出去的对象全部都回来了
		{
			//此时这个span就可以再回收给page cache,page cache可以再尝试去做前后页的合并
			_spanLists[index].Erase(span);
			span->_freeList = nullptr; //自由链表置空
			span->_next = nullptr;
			span->_prev = nullptr;

			//释放span给page cache时,使用page cache的锁就可以了,这时把桶锁解掉
			_spanLists[index]._mtx.unlock(); //解桶锁
			
			PageCache::GetInstance()->_pageMtx.lock(); //加大锁
			PageCache::GetInstance()->ReleaseSpanToPageCache(span);
			PageCache::GetInstance()->_pageMtx.unlock(); //解大锁
			
			_spanLists[index]._mtx.lock(); //加桶锁
		}

		start = next;
	}

	_spanLists[index]._mtx.unlock(); //解锁
}

如果要把某个span还给page cache 我们需要先将这个span从central cache对应的双链表中移除 然后再将该span的自由链表置空 因为page cache中的span是不需要切分成一个个的小对象的 以及该span的前后指针也都应该置空 因为之后要将其插入到page cache对应的双链表中 但span当中记录的起始页号以及它管理的页数是不能清除的 否则对应内存块就找不到了

central cache还span给page cache时也存在锁的问题 此时需要先将central cache中对应的桶锁解掉 然后再加上page cache的大锁之后才能进入page cache进行相关操作 当处理完毕回到central cache时 除了将page cache的大锁解掉 还需要立刻获得central cache对应的桶锁 然后将还未还完对象继续还给central cache中对应的span

pagecache回收内存

如果central cache中有某个span的_useCount减到0 那么central cache就需要将这个span还给page cache

page cache只需将还回来的span挂到对应的哈希桶上就行了 但为了缓解内存碎片的问题 page cache还需要尝试将还回来的span与其他空闲的span进行合并

page cache进行前后页的合并

合并的过程可以分为向前合并和向后合并 如果还回来的span的起始页号是num 该span所管理的页数是n 那么在向前合并时 就需要判断第num-1页对应span是否空闲 如果空闲则可以将其进行合并 并且合并后还需要继续向前尝试进行合并 直到不能进行合并为止 而在向后合并时 就需要判断第num+n页对应的span是否空闲 如果空闲则可以将其进行合并 并且合并后还需要继续向后尝试进行合并 直到不能进行合并为止

因此page cache在合并span时 是需要通过页号获取到对应的span的 这就是我们要把页号与span之间的映射关系存储到page cache

当我们通过页号找到其对应的span时 这个span此时可能挂在page cache 也可能挂在central cache 而在合并时我们只能合并挂在page cache的span 因为挂在central cache的span当中的对象正在被其他线程使用

可是我们不能通过span结构当中的_useCount成员来判断某个span到底是在central cache还是在page cache 因为当central cache刚向page cache申请到一个span时 这个span的_useCount就是等于0 这时可能当我们正在对该span进行切分的时候 page cache就把这个span拿去进行合并 这不合理

我们可以在span结构中再增加一个_isUse成员 用于标记这个span是否正在被使用 而当一个span结构被创建时我们默认该span是没有被使用

//管理以页为单位的大块内存
struct Span
{
	PAGE_ID _pageId = 0;        //大块内存起始页的页号
	size_t _n = 0;              //页的数量

	Span* _next = nullptr;      //双链表结构
	Span* _prev = nullptr;

	//size_t _objSize = 0;        //切好的小对象的大小
	size_t _useCount = 0;       //切好的小块内存,被分配给thread cache的计数
	void* _freeList = nullptr;  //切好的小块内存的自由链表

	bool _isUse = false;        //是否在被使用
};

因此当central cache向page cache申请到一个span时 需要立即将该span的_isUse改为true

由于在合并page cache当中的span时 需要通过页号找到其对应的span 而一个span是在被分配给central cache时 才建立的各个页号与span之间的映射关系 因此page cache当中的span也需要建立页号与span之间的映射关系

与central cache中的span不同的是 在page cache中 只需建立一个span的首尾页号与该span之间的映射关系 因为当一个span在尝试进行合并时 如果是往前合并 那么只需要通过一个span的尾页找到这个span 如果是向后合并 那么只需要通过一个span的首页找到这个span 所以在进行合并时我们只需要用到span与其首尾页之间的映射关系就够了

当我们申请k页的span时 如果是将n页的span切成了一个k页的span和一个n-k页的span 我们除了需要建立k页span中每个页与该span之间的映射关系之外 还需要建立剩下的n-k页的span与其首尾页之间的映射关系

//获取一个k页的span
Span* PageCache::NewSpan(size_t k)
{
	assert(k > 0 && k < NPAGES);
	//先检查第k个桶里面有没有span
	if (!_spanLists[k].Empty())
	{
		Span* kSpan = _spanLists[k].PopFront();

		//建立页号与span的映射,方便central cache回收小块内存时查找对应的span
		for (PAGE_ID i = 0; i < kSpan->_n; i++)
		{
			_idSpanMap[kSpan->_pageId + i] = kSpan;
		}

		return kSpan;
	}
	
	//检查一下后面的桶里面有没有span,如果有可以将其进行切分
	for (size_t i = k + 1; i < NPAGES; i++)
	{
		if (!_spanLists[i].Empty())
		{
			Span* nSpan = _spanLists[i].PopFront();
			Span* kSpan = new Span;
			//在nSpan的头部切k页下来
			kSpan->_pageId = nSpan->_pageId;
			kSpan->_n = k;

			nSpan->_pageId += k;
			nSpan->_n -= k;
			//将剩下的挂到对应映射的位置
			_spanLists[nSpan->_n].PushFront(nSpan);
			//存储nSpan的首尾页号与nSpan之间的映射,方便page cache合并span时进行前后页的查找
			_idSpanMap[nSpan->_pageId] = nSpan;
			_idSpanMap[nSpan->_pageId + nSpan->_n - 1] = nSpan;

			//建立页号与span的映射,方便central cache回收小块内存时查找对应的span
			for (PAGE_ID i = 0; i < kSpan->_n; i++)
			{
				_idSpanMap[kSpan->_pageId + i] = kSpan;
			}

			return kSpan;
		}
	}
	
	//走到这里说明后面没有大页的span了,这时就向堆申请一个128页的span
	Span* bigSpan = new Span;
	void* ptr = SystemAlloc(NPAGES - 1);
	bigSpan->_pageId = (PAGE_ID)ptr >> PAGE_SHIFT;
	bigSpan->_n = NPAGES - 1;

	_spanLists[bigSpan->_n].PushFront(bigSpan);

	//尽量避免代码重复,递归调用自己
	return NewSpan(k);
}

此时page cache当中的span就都与其首尾页之间建立了映射关系 现在我们就可以进行span的合并了

//释放空闲的span回到PageCache,并合并相邻的span
void PageCache::ReleaseSpanToPageCache(Span* span)
{
	//对span的前后页,尝试进行合并,缓解内存碎片问题
	//1、向前合并
	while (1)
	{
		PAGE_ID prevId = span->_pageId - 1;
		auto ret = _idSpanMap.find(prevId);
		//前面的页号没有(还未向系统申请),停止向前合并
		if (ret == _idSpanMap.end())
		{
			break;
		}
		//前面的页号对应的span正在被使用,停止向前合并
		Span* prevSpan = ret->second;
		if (prevSpan->_isUse == true)
		{
			break;
		}
		//合并出超过128页的span无法进行管理,停止向前合并
		if (prevSpan->_n + span->_n > NPAGES - 1)
		{
			break;
		}
		//进行向前合并
		span->_pageId = prevSpan->_pageId;
		span->_n += prevSpan->_n;

		//将prevSpan从对应的双链表中移除
		_spanLists[prevSpan->_n].Erase(prevSpan);

		delete prevSpan;
	}
	//2、向后合并
	while (1)
	{
		PAGE_ID nextId = span->_pageId + span->_n;
		auto ret = _idSpanMap.find(nextId);
		//后面的页号没有(还未向系统申请),停止向后合并
		if (ret == _idSpanMap.end())
		{
			break;
		}
		//后面的页号对应的span正在被使用,停止向后合并
		Span* nextSpan = ret->second;
		if (nextSpan->_isUse == true)
		{
			break;
		}
		//合并出超过128页的span无法进行管理,停止向后合并
		if (nextSpan->_n + span->_n > NPAGES - 1)
		{
			break;
		}
		//进行向后合并
		span->_n += nextSpan->_n;

		//将nextSpan从对应的双链表中移除
		_spanLists[nextSpan->_n].Erase(nextSpan);

		delete nextSpan;
	}
	//将合并后的span挂到对应的双链表当中
	_spanLists[span->_n].PushFront(span);
	//建立该span与其首尾页的映射
	_idSpanMap[span->_pageId] = span;
	_idSpanMap[span->_pageId + span->_n - 1] = span;
	//将该span设置为未被使用的状态
	span->_isUse = false;
}

在向前或向后进行合并的过程中:

  • 如果没有通过页号获取到其对应的span 说明对应到该页的内存块还未申请 此时停止合并
  • 如果通过页号获取到了其对应的span 但该span处于被使用的状态 停止合并
  • 如果合并后大于128页 则不进行本次合并

在合并span时 由于这个span是在page cache的某个哈希桶的双链表当中的 因此在合并后需要将其从对应的双链表中移除 然后再将这个被合并了的span结构进行delete

在合并结束后 除了将合并后的span挂到page cache对应哈希桶的双链表当中 还需要建立该span与其首位页之间的映射关系 便于此后合并出更大的span

项目完善

大于256KB的大块内存申请

申请过程

每个线程的thread cache是用于申请小于等于256KB的内存的 而对于大于256KB的内存 我们可以考虑直接向page cache申请 但page cache中最大的页也就只有128页 因此如果是大于128页的内存申请 就只能直接向堆申请

x ≤ 256KB(32页) 向thread cache申请
32页< x ≤128页 向page cache申请
x >128页 向堆申请

//获取向上对齐后的字节数
static inline size_t RoundUp(size_t bytes)
{
	if (bytes <= 128)
	{
		return _RoundUp(bytes, 8);
	}
	else if (bytes <= 1024)
	{
		return _RoundUp(bytes, 16);
	}
	else if (bytes <= 8 * 1024)
	{
		return _RoundUp(bytes, 128);
	}
	else if (bytes <= 64 * 1024)
	{
		return _RoundUp(bytes, 1024);
	}
	else if (bytes <= 256 * 1024)
	{
		return _RoundUp(bytes, 8 * 1024);
	}
	else
	{
		//大于256KB的按页对齐
		return _RoundUp(bytes, 1 << PAGE_SHIFT);
	}
}

现在对于之前的申请逻辑就需要进行修改 当申请对象的大小大于256KB时 就不用向thread cache申请了 这时先计算出按页对齐后实际需要申请的页数 然后通过调用NewSpan申请指定页数的span

static void* ConcurrentAlloc(size_t size)
{
	if (size > MAX_BYTES) //大于256KB的内存申请
	{
		//计算出对齐后需要申请的页数
		size_t alignSize = SizeClass::RoundUp(size);
		size_t kPage = alignSize >> PAGE_SHIFT;

		//向page cache申请kPage页的span
		PageCache::GetInstance()->_pageMtx.lock();
		Span* span = PageCache::GetInstance()->NewSpan(kPage);
		PageCache::GetInstance()->_pageMtx.unlock();

		void* ptr = (void*)(span->_pageId << PAGE_SHIFT);
		return ptr;
	}
	else
	{
		//通过TLS,每个线程无锁的获取自己专属的ThreadCache对象
		if (pTLSThreadCache == nullptr)
		{
			pTLSThreadCache = new ThreadCache;
		}
		cout << std::this_thread::get_id() << ":" << pTLSThreadCache << endl;

		return pTLSThreadCache->Allocate(size);
	}
}

申请大于256KB的内存时 会直接调用page cache当中的NewSpan函数进行申请 因此这里我们需要再对NewSpan函数进行改造 当需要申请的内存页数大于128页时 就直接向堆申请对应页数的内存块 而如果申请的内存页数是小于128页 就在page cache中进行申请 因此当申请大于256KB的内存调用NewSpan函数时也是需要加锁

//获取一个k页的span
Span* PageCache::NewSpan(size_t k)
{
	assert(k > 0);
	if (k > NPAGES - 1) //大于128页直接找堆申请
	{
		void* ptr = SystemAlloc(k);
		Span* span = new Span;
		span->_pageId = (PAGE_ID)ptr >> PAGE_SHIFT;
		span->_n = k;
		//建立页号与span之间的映射
		_idSpanMap[span->_pageId] = span;
		return span;
	}
	
	//先检查第k个桶里面有没有span
	if (!_spanLists[k].Empty())
	{
		Span* kSpan = _spanLists[k].PopFront();

		//建立页号与span的映射,方便central cache回收小块内存时查找对应的span
		for (PAGE_ID i = 0; i < kSpan->_n; i++)
		{
			_idSpanMap[kSpan->_pageId + i] = kSpan;
		}

		return kSpan;
	}
	//检查一下后面的桶里面有没有span,如果有可以将其进行切分
	for (size_t i = k + 1; i < NPAGES; i++)
	{
		if (!_spanLists[i].Empty())
		{
			Span* nSpan = _spanLists[i].PopFront();
			Span* kSpan = new Span;
			//在nSpan的头部切k页下来
			kSpan->_pageId = nSpan->_pageId;
			kSpan->_n = k;

			nSpan->_pageId += k;
			nSpan->_n -= k;
			//将剩下的挂到对应映射的位置
			_spanLists[nSpan->_n].PushFront(nSpan);
			//存储nSpan的首尾页号与nSpan之间的映射,方便page cache合并span时进行前后页的查找
			_idSpanMap[nSpan->_pageId] = nSpan;
			_idSpanMap[nSpan->_pageId + nSpan->_n - 1] = nSpan;

			//建立页号与span的映射,方便central cache回收小块内存时查找对应的span
			for (PAGE_ID i = 0; i < kSpan->_n; i++)
			{
				_idSpanMap[kSpan->_pageId + i] = kSpan;
			}

			return kSpan;
		}
	}
	//走到这里说明后面没有大页的span了,这时就向堆申请一个128页的span
	Span* bigSpan = new Span;
	void* ptr = SystemAlloc(NPAGES - 1);
	bigSpan->_pageId = (PAGE_ID)ptr >> PAGE_SHIFT;
	bigSpan->_n = NPAGES - 1;

	_spanLists[bigSpan->_n].PushFront(bigSpan);

	//尽量避免代码重复,递归调用自己
	return NewSpan(k);
}

释放过程

当释放对象时 我们需要先找到该对象对应的span 但是在释放对象时我们只知道该对象的起始地址 这也就是我们在申请大于256KB的内存时 也要给申请到的内存建立span结构 并建立起始页号与该span之间的映射关系的原因 此时我们就可以通过释放对象的起始地址计算出起始页号 进而通过页号找到该对象对应的span

static void ConcurrentFree(void* ptr, size_t size)
{
	if (size > MAX_BYTES) //大于256KB的内存释放
	{
		Span* span = PageCache::GetInstance()->MapObjectToSpan(ptr);

		PageCache::GetInstance()->_pageMtx.lock();
		PageCache::GetInstance()->ReleaseSpanToPageCache(span);
		PageCache::GetInstance()->_pageMtx.unlock();
	}
	else
	{
		assert(pTLSThreadCache);
		pTLSThreadCache->Deallocate(ptr, size);
	}
}

因此page cache在回收span时也需要进行判断 如果该span的大小是小于等于128页的 那么直接还给page cache page cache会尝试对其进行合并 而如果该span的大小是大于128页 那么说明该span是直接向堆申请 直接将这块内存释放给堆 然后将这个span结构进行delete

//释放空闲的span回到PageCache,并合并相邻的span
void PageCache::ReleaseSpanToPageCache(Span* span)
{
	if (span->_n > NPAGES - 1) //大于128页直接释放给堆
	{
		void* ptr = (void*)(span->_pageId << PAGE_SHIFT);
		SystemFree(ptr);
		delete span;
		return;
	}
	//对span的前后页,尝试进行合并,缓解内存碎片问题
	//1、向前合并
	while (1)
	{
		PAGE_ID prevId = span->_pageId - 1;
		auto ret = _idSpanMap.find(prevId);
		//前面的页号没有(还未向系统申请),停止向前合并
		if (ret == _idSpanMap.end())
		{
			break;
		}
		//前面的页号对应的span正在被使用,停止向前合并
		Span* prevSpan = ret->second;
		if (prevSpan->_isUse == true)
		{
			break;
		}
		//合并出超过128页的span无法进行管理,停止向前合并
		if (prevSpan->_n + span->_n > NPAGES - 1)
		{
			break;
		}
		//进行向前合并
		span->_pageId = prevSpan->_pageId;
		span->_n += prevSpan->_n;

		//将prevSpan从对应的双链表中移除
		_spanLists[prevSpan->_n].Erase(prevSpan);

		delete prevSpan;
	}
	//2、向后合并
	while (1)
	{
		PAGE_ID nextId = span->_pageId + span->_n;
		auto ret = _idSpanMap.find(nextId);
		//后面的页号没有(还未向系统申请),停止向后合并
		if (ret == _idSpanMap.end())
		{
			break;
		}
		//后面的页号对应的span正在被使用,停止向后合并
		Span* nextSpan = ret->second;
		if (nextSpan->_isUse == true)
		{
			break;
		}
		//合并出超过128页的span无法进行管理,停止向后合并
		if (nextSpan->_n + span->_n > NPAGES - 1)
		{
			break;
		}
		//进行向后合并
		span->_n += nextSpan->_n;

		//将nextSpan从对应的双链表中移除
		_spanLists[nextSpan->_n].Erase(nextSpan);

		delete nextSpan;
	}
	//将合并后的span挂到对应的双链表当中
	_spanLists[span->_n].PushFront(span);
	//建立该span与其首尾页的映射
	_idSpanMap[span->_pageId] = span;
	_idSpanMap[span->_pageId + span->_n - 1] = span;
	//将该span设置为未被使用的状态
	span->_isUse = false;
}

使用定长内存池配合脱离使用new

tcmalloc是要在高并发场景下替代malloc进行内存申请 因此tcmalloc在实现的时 其内部是不能调用malloc函数 我们当前的代码中存在通过new获取到的内存 而new在底层实际上就是封装了malloc

为了完全脱离掉malloc函数 此时我们之前实现的定长内存池就起作用了 代码中使用new时基本都是为Span结构的对象申请空间 而span对象基本都是在page cache层创建的 因此我们可以在PageCache类当中定义一个_spanPool 用于span对象的申请和释放

//单例模式
class PageCache
{
public:
	//...
private:
	ObjectPool<Span> _spanPool;
};

然后将代码中使用new的地方替换为调用定长内存池当中的New函数 将代码中使用delete的地方替换为调用定长内存池当中的Delete函数

//申请span对象
Span* span = _spanPool.New();
//释放span对象
_spanPool.Delete(span);

此外,每个线程第一次申请内存时都会创建其专属的thread cache 而这个thread cache目前也是new出来的 我们也需要对其进行替换

//通过TLS,每个线程无锁的获取自己专属的ThreadCache对象
if (pTLSThreadCache == nullptr)
{
	static std::mutex tcMtx;
	static ObjectPool<ThreadCache> tcPool;
	tcMtx.lock();
	pTLSThreadCache = tcPool.New();
	tcMtx.unlock();
}

将用于申请ThreadCache类对象的定长内存池定义为静态的 保持全局只有一个 让所有线程创建自己的thread cache时 都在个定长内存池中申请内存就行了

从该定长内存池中申请内存时需要加锁 防止多个线程同时申请自己的ThreadCache对象而导致线程安全问题

在SpanList的构造函数中也用到了new 因为SpanList是带头循环双向链表 所以在构造期间我们需要申请一个span对象作为双链表的头结点

//带头双向循环链表
class SpanList
{
public:
	SpanList()
	{
		_head = _spanPool.New();
		_head->_next = _head;
		_head->_prev = _head;
	}
private:
	Span* _head;
	static ObjectPool<Span> _spanPool;
};

由于每个span双链表只需要一个头结点 因此将这个定长内存池定义为静态的 保持全局只有一个 让所有span双链表在申请头结点时 都在一个定长内存池中申请内存就行了

释放对象时优化为不传对象大小

当我们使用free函数释放内存时 只需要传入指向这块内存的指针

而我们目前实现的内存池 在释放对象时除了需要传入指向该对象的指针 还需要传入该对象的大小

  • 如果释放的是大于256KB的对象,需要根据对象的大小来判断这块内存到底应该还给page cache还是应该直接还给堆
  • 如果释放的是小于等于256KB的对象 需要根据对象的大小计算出应该还给thread cache的哪一个哈希桶

我们需要建立对象地址与对象大小之间的映射 由于现在可以通过对象的地址找到其对应的span 而span的自由链表中挂的都是相同大小的对象 因此我们可以在Span结构中再增加一个_objSize成员 该成员代表着这个span管理的内存块被切成的一个个对象的大小

//管理以页为单位的大块内存
struct Span
{
	PAGE_ID _pageId = 0;        //大块内存起始页的页号
	size_t _n = 0;              //页的数量

	Span* _next = nullptr;      //双链表结构
	Span* _prev = nullptr;

	size_t _objSize = 0;        //切好的小对象的大小
	size_t _useCount = 0;       //切好的小块内存,被分配给thread cache的计数
	void* _freeList = nullptr;  //切好的小块内存的自由链表

	bool _isUse = false;        //是否在被使用
};

而所有的span都是从page cache中拿出来的 因此每当我们调用NewSpan获取到一个k页的span时 就应该将这个span的_objSize保存下来

Span* span = PageCache::GetInstance()->NewSpan(SizeClass::NumMovePage(size));
span->_objSize = size;

在central cache中获取非空span时 如果central cache对应的桶中没有非空的span 此时会调用NewSpan获取一个k页的span 另一处是当申请大于256KB内存时 会直接调用NewSpan获取一个k页的span

此时当我们释放对象时 就可以直接从对象的span中获取到该对象的大小 准确来说获取到的是对齐以后的大小

static void ConcurrentFree(void* ptr)
{
	Span* span = PageCache::GetInstance()->MapObjectToSpan(ptr);
	size_t size = span->_objSize;
	if (size > MAX_BYTES) //大于256KB的内存释放
	{
		PageCache::GetInstance()->_pageMtx.lock();
		PageCache::GetInstance()->ReleaseSpanToPageCache(span);
		PageCache::GetInstance()->_pageMtx.unlock();
	}
	else
	{
		assert(pTLSThreadCache);
		pTLSThreadCache->Deallocate(ptr, size);
	}
}

读取映射关系时的加锁问题

我们将页号与span之间的映射关系是存储在PageCache类当中 当我们访问这个映射关系时是需要加锁 因为STL容器是不保证线程安全的

对于当前代码 如果我们此时正在page cache进行相关操作 那么访问这个映射关系是安全的 因为当进入page cache之前是需要加锁 因此可以保证此时只有一个线程在进行访问

但如果我们是在central cache访问这个映射关系 或是在调用ConcurrentFree函数释放内存时访问这个映射关系 那么就存在线程安全的问题 因为此时可能其他线程正在page cache当中进行某些操作 并且该线程此时可能也在访问这个映射关系 因此当我们在page cache外部访问这个映射关系时是需要加锁的

实际就是在调用page cache对外提供访问映射关系的函数时需要加锁 这里我们可以考虑使用C++当中的unique_lock

//获取从对象到span的映射
Span* PageCache::MapObjectToSpan(void* obj)
{
	PAGE_ID id = (PAGE_ID)obj >> PAGE_SHIFT; //页号

	std::unique_lock<std::mutex> lock(_pageMtx); //构造时加锁,析构时自动解锁
	auto ret = _idSpanMap.find(id);
	if (ret != _idSpanMap.end())
	{
		return ret->second;
	}
	else
	{
		assert(false);
		return nullptr;
	}
}

使用基数树进行优化

基数树实际上就是一个分层的哈希表 根据所分层数不同可分为单层基数树、二层基数树、三层基数树等

单层基数树

单层基数树实际采用的就是直接定址法 每一个页号对应span的地址就存储数组中在以该页号为下标的位置

最坏的情况下我们需要建立所有页号与其span之间的映射关系 因此这个数组中元素个数应该与页号的数目相同 数组中每个位置存储的就是对应span的指针

//单层基数树
template <int BITS>
class TCMalloc_PageMap1
{
public:
	typedef uintptr_t Number;
	explicit TCMalloc_PageMap1()
	{
		size_t size = sizeof(void*) << BITS; //需要开辟数组的大小
		size_t alignSize = SizeClass::_RoundUp(size, 1 << PAGE_SHIFT); //按页对齐后的大小
		array_ = (void**)SystemAlloc(alignSize >> PAGE_SHIFT); //向堆申请空间
		memset(array_, 0, size); //对申请到的内存进行清理
	}
	void* get(Number k) const
	{
		if ((k >> BITS) > 0) //k的范围不在[0, 2^BITS-1]
		{
			return NULL;
		}
		return array_[k]; //返回该页号对应的span
	}
	void set(Number k, void* v)
	{
		assert((k >> BITS) == 0); //k的范围必须在[0, 2^BITS-1]
		array_[k] = v; //建立映射
	}
private:
	void** array_; //存储映射关系的数组
	static const int LENGTH = 1 << BITS; //页的数目
};

此时当我们需要建立映射时就调用set函数 需要读取映射关系时 就调用get函数

二层基数树

这里还是以32位平台下 一页的大小为8K为例 此时存储页号最多需要19个比特位 而二层基数树实际上就是把这19个比特位分为两次进行映射

比如用前5个比特位在基数树的第一层进行映射 映射后得到对应的第二层 然后用剩下的比特位在基数树的第二层进行映射 映射后最终得到该页号对应的span指针

//二层基数树
template <int BITS>
class TCMalloc_PageMap2
{
private:
	static const int ROOT_BITS = 5;                //第一层对应页号的前5个比特位
	static const int ROOT_LENGTH = 1 << ROOT_BITS; //第一层存储元素的个数
	static const int LEAF_BITS = BITS - ROOT_BITS; //第二层对应页号的其余比特位
	static const int LEAF_LENGTH = 1 << LEAF_BITS; //第二层存储元素的个数
	//第一层数组中存储的元素类型
	struct Leaf
	{
		void* values[LEAF_LENGTH];
	};
	Leaf* root_[ROOT_LENGTH]; //第一层数组
public:
	typedef uintptr_t Number;
	explicit TCMalloc_PageMap2()
	{
		memset(root_, 0, sizeof(root_)); //将第一层的空间进行清理
		PreallocateMoreMemory(); //直接将第二层全部开辟
	}
	void* get(Number k) const
	{
		const Number i1 = k >> LEAF_BITS;        //第一层对应的下标
		const Number i2 = k & (LEAF_LENGTH - 1); //第二层对应的下标
		if ((k >> BITS) > 0 || root_[i1] == NULL) //页号值不在范围或没有建立过映射
		{
			return NULL;
		}
		return root_[i1]->values[i2]; //返回该页号对应span的指针
	}
	void set(Number k, void* v)
	{
		const Number i1 = k >> LEAF_BITS;        //第一层对应的下标
		const Number i2 = k & (LEAF_LENGTH - 1); //第二层对应的下标
		assert(i1 < ROOT_LENGTH);
		root_[i1]->values[i2] = v; //建立该页号与对应span的映射
	}
	//确保映射[start,start_n-1]页号的空间是开辟好了的
	bool Ensure(Number start, size_t n)
	{
		for (Number key = start; key <= start + n - 1;)
		{
			const Number i1 = key >> LEAF_BITS;
			if (i1 >= ROOT_LENGTH) //页号超出范围
				return false;
			if (root_[i1] == NULL) //第一层i1下标指向的空间未开辟
			{
				//开辟对应空间
				static ObjectPool<Leaf> leafPool;
				Leaf* leaf = (Leaf*)leafPool.New();
				memset(leaf, 0, sizeof(*leaf));
				root_[i1] = leaf;
			}
			key = ((key >> LEAF_BITS) + 1) << LEAF_BITS; //继续后续检查
		}
		return true;
	}
	void PreallocateMoreMemory()
	{
		Ensure(0, 1 << BITS); //将第二层的空间全部开辟好
	}
};

因此在二层基数树中有一个Ensure函数 当需要建立某一页号与其span之间的映射关系时 需要先调用该Ensure函数确保用于映射该页号的空间是开辟了的 如果没有开辟则会立即开辟

而在32位平台下 就算将二层基数树第二层的数组全部开辟出来也就消耗了2 M 的空间 内存消耗也不算太多 因此我们可以在构造二层基数树时就把第二层的数组全部开辟出来

三层基数树

上面一层基数树和二层基数树都适用于32位平台 而对于64位的平台就需要用三层基数树了 三层基数树与二层基数树类似 三层基数树实际上就是把存储页号的若干比特位分为三次进行映射

此时只有当要建立某一页号的映射关系时 再开辟对应的数组空间 而没有建立映射的页号就可以不用开辟其对应的数组空间 此时就能在一定程度上节省内存空间

//三层基数树
template <int BITS>
class TCMalloc_PageMap3
{
private:
	static const int INTERIOR_BITS = (BITS + 2) / 3;       //第一、二层对应页号的比特位个数
	static const int INTERIOR_LENGTH = 1 << INTERIOR_BITS; //第一、二层存储元素的个数
	static const int LEAF_BITS = BITS - 2 * INTERIOR_BITS; //第三层对应页号的比特位个数
	static const int LEAF_LENGTH = 1 << LEAF_BITS;         //第三层存储元素的个数
	struct Node
	{
		Node* ptrs[INTERIOR_LENGTH];
	};
	struct Leaf
	{
		void* values[LEAF_LENGTH];
	};
	Node* NewNode()
	{
		static ObjectPool<Node> nodePool;
		Node* result = nodePool.New();
		if (result != NULL)
		{
			memset(result, 0, sizeof(*result));
		}
		return result;
	}
	Node* root_;
public:
	typedef uintptr_t Number;
	explicit TCMalloc_PageMap3()
	{
		root_ = NewNode();
	}
	void* get(Number k) const
	{
		const Number i1 = k >> (LEAF_BITS + INTERIOR_BITS);         //第一层对应的下标
		const Number i2 = (k >> LEAF_BITS) & (INTERIOR_LENGTH - 1); //第二层对应的下标
		const Number i3 = k & (LEAF_LENGTH - 1);                    //第三层对应的下标
		//页号超出范围,或映射该页号的空间未开辟
		if ((k >> BITS) > 0 || root_->ptrs[i1] == NULL || root_->ptrs[i1]->ptrs[i2] == NULL)
		{
			return NULL;
		}
		return reinterpret_cast<Leaf*>(root_->ptrs[i1]->ptrs[i2])->values[i3]; //返回该页号对应span的指针
	}
	void set(Number k, void* v)
	{
		assert(k >> BITS == 0);
		const Number i1 = k >> (LEAF_BITS + INTERIOR_BITS);         //第一层对应的下标
		const Number i2 = (k >> LEAF_BITS) & (INTERIOR_LENGTH - 1); //第二层对应的下标
		const Number i3 = k & (LEAF_LENGTH - 1);                    //第三层对应的下标
		Ensure(k, 1); //确保映射第k页页号的空间是开辟好了的
		reinterpret_cast<Leaf*>(root_->ptrs[i1]->ptrs[i2])->values[i3] = v; //建立该页号与对应span的映射
	}
	//确保映射[start,start+n-1]页号的空间是开辟好了的
	bool Ensure(Number start, size_t n)
	{
		for (Number key = start; key <= start + n - 1;)
		{
			const Number i1 = key >> (LEAF_BITS + INTERIOR_BITS);         //第一层对应的下标
			const Number i2 = (key >> LEAF_BITS) & (INTERIOR_LENGTH - 1); //第二层对应的下标
			if (i1 >= INTERIOR_LENGTH || i2 >= INTERIOR_LENGTH) //下标值超出范围
				return false;
			if (root_->ptrs[i1] == NULL) //第一层i1下标指向的空间未开辟
			{
				//开辟对应空间
				Node* n = NewNode();
				if (n == NULL) return false;
				root_->ptrs[i1] = n;
			}
			if (root_->ptrs[i1]->ptrs[i2] == NULL) //第二层i2下标指向的空间未开辟
			{
				//开辟对应空间
				static ObjectPool<Leaf> leafPool;
				Leaf* leaf = leafPool.New();
				if (leaf == NULL) return false;
				memset(leaf, 0, sizeof(*leaf));
				root_->ptrs[i1]->ptrs[i2] = reinterpret_cast<Node*>(leaf);
			}
			key = ((key >> LEAF_BITS) + 1) << LEAF_BITS; //继续后续检查
		}
		return true;
	}
	void PreallocateMoreMemory()
	{}
};

因此当我们要建立某一页号的映射关系时 需要先确保存储该页映射的数组空间是开辟好了的 也就是调用代码中的Ensure函数 如果对应数组空间未开辟则会立马开辟对应的空间

使用基数树进行优化代码实现

现在我们用基数树对代码进行优化 此时将PageCache类当中的unorder_map用基数树进行替换即可 由于当前是32位平台 因此这里可以用1层基数树

//单例模式
class PageCache
{
public:
	//...
private:
	//std::unordered_map<PAGE_ID, Span*> _idSpanMap;
	TCMalloc_PageMap1<32 - PAGE_SHIFT> _idSpanMap;
};

此时当我们需要建立页号与span的映射时 就调用基数树当中的set函数

_idSpanMap.set(span->_pageId, span);

而当我们需要读取某一页号对应的span时 就调用基数树当中的get函数

Span* ret = (Span*)_idSpanMap.get(id);

并且现在PageCache类向外提供的 用于读取映射关系的MapObjectToSpan函数内部就不需要加锁了

//获取从对象到span的映射
Span* PageCache::MapObjectToSpan(void* obj)
{
	PAGE_ID id = (PAGE_ID)obj >> PAGE_SHIFT; //页号
	Span* ret = (Span*)_idSpanMap.get(id);
	assert(ret != nullptr);
	return ret;
}

多线程环境下对比malloc测试

在多线程场景下对比malloc进行测试

测试代码

// ntimes 一轮申请和释放内存的次数
// rounds 轮次
void BenchmarkMalloc(size_t ntimes, size_t nworks, size_t rounds)
{
	std::vector<std::thread> vthread(nworks);
	std::atomic<size_t> malloc_costtime = 0;
	std::atomic<size_t> free_costtime = 0;

	for (size_t k = 0; k < nworks; ++k)
	{
		vthread[k] = std::thread([&, k]() {
			std::vector<void*> v;
			v.reserve(ntimes);

			for (size_t j = 0; j < rounds; ++j)
			{
				size_t begin1 = clock();
				for (size_t i = 0; i < ntimes; i++)
				{
					//v.push_back(malloc(16));
					v.push_back(malloc((16 + i) % 8192 + 1));
				}
				size_t end1 = clock();

				size_t begin2 = clock();
				for (size_t i = 0; i < ntimes; i++)
				{
					free(v[i]);
				}
				size_t end2 = clock();
				v.clear();

				malloc_costtime += (end1 - begin1);
				free_costtime += (end2 - begin2);
			}
		});
	}

	for (auto& t : vthread)
	{
		t.join();
	}

	printf("%u个线程并发执行%u轮次,每轮次malloc %u次: 花费:%u ms\n",
		nworks, rounds, ntimes, malloc_costtime);

	printf("%u个线程并发执行%u轮次,每轮次free %u次: 花费:%u ms\n",
		nworks, rounds, ntimes, free_costtime);

	printf("%u个线程并发malloc&free %u次,总计花费:%u ms\n",
		nworks, nworks*rounds*ntimes, malloc_costtime + free_costtime);
}


// 单轮次申请释放次数 线程数 轮次
void BenchmarkConcurrentMalloc(size_t ntimes, size_t nworks, size_t rounds)
{
	std::vector<std::thread> vthread(nworks);
	std::atomic<size_t> malloc_costtime = 0;
	std::atomic<size_t> free_costtime = 0;

	for (size_t k = 0; k < nworks; ++k)
	{
		vthread[k] = std::thread([&]() {
			std::vector<void*> v;
			v.reserve(ntimes);

			for (size_t j = 0; j < rounds; ++j)
			{
				size_t begin1 = clock();
				for (size_t i = 0; i < ntimes; i++)
				{
					//v.push_back(ConcurrentAlloc(16));
					v.push_back(ConcurrentAlloc((16 + i) % 8192 + 1));
				}
				size_t end1 = clock();

				size_t begin2 = clock();
				for (size_t i = 0; i < ntimes; i++)
				{
					ConcurrentFree(v[i]);
				}
				size_t end2 = clock();
				v.clear();

				malloc_costtime += (end1 - begin1);
				free_costtime += (end2 - begin2);
			}
		});
	}

	for (auto& t : vthread)
	{
		t.join();
	}

	printf("%u个线程并发执行%u轮次,每轮次concurrent alloc %u次: 花费:%u ms\n",
		nworks, rounds, ntimes, malloc_costtime);

	printf("%u个线程并发执行%u轮次,每轮次concurrent dealloc %u次: 花费:%u ms\n",
		nworks, rounds, ntimes, free_costtime);

	printf("%u个线程并发concurrent alloc&dealloc %u次,总计花费:%u ms\n",
		nworks, nworks*rounds*ntimes, malloc_costtime + free_costtime);
}

int main()
{
	size_t n = 10000;
	cout << "==========================================================" << endl;
	BenchmarkConcurrentMalloc(n, 4, 10);
	cout << endl << endl;

	BenchmarkMalloc(n, 4, 10);
	cout << "==========================================================" << endl;

	return 0;
}

测试结果
在这里插入图片描述

项目源码

高并发内存池项目,gitee地址

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/691726.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

【C/C++练习】经典的快慢指针问题---移除元素

&#x1f4d6;题目描述 题目出处&#xff1a;移除元素 &#x1f516;示例 &#x1f4d6;题解 对于本题我将按照由易到难的顺序为大家分享三种解题思路&#xff0c;并逐一分析它们的优劣&#xff0c;以及注意事项。 &#x1f516;思路一&#xff1a;暴力求解 我想暴力求解应该…

零-云尚办公项目学习

对于云尚办公项目的学习 1、这是尚硅谷推出的新的OA项目 云尚办公系统是一套自动办公系统&#xff0c;系统主要包含&#xff1a;管理端和员工端 管理端包含:权限管理、审批管理、公众号菜单管理 员工端:采用微信公众号操作&#xff0c;包含&#xff1a;办公审批、微信授权登…

数字通信中的编码(学习笔记)

编码种类 RZ(Return Zero Code)编码 也称为归零码&#xff0c;就是在 一个周期内&#xff0c;用二进制传输数据位&#xff0c;在数据脉冲结束后&#xff0c;需要维持一段时间的低电平。 RZ编码又分为两种&#xff1a; 单极性归零码 低电平表示0&#xff0c;正电平表示1&…

【Java用法】windows10系统下修改jar中的文件并重新打包成jar文件然后运行

windows10系统下修改jar中的文件并重新打包成jar文件然后运行 一、背景描述二、操作步骤2.1 解压jar包2.2 修改配置文件2.3 重新打成jar包2.4 确认是否修改成功2.5 运行程序 一、背景描述 测试环境&#xff08;Linux&#xff09;的代码&#xff08;jar包&#xff09;拉取到本地…

AI数字人:语音驱动面部模型及超分辨率重建Wav2Lip-HD

1 Wav2Lip-HD项目介绍 数字人打造中语音驱动人脸和超分辨率重建两种必备的模型&#xff0c;它们被用于实现数字人的语音和图像方面的功能。通过Wav2Lip-HD项目可以快速使用这两种模型&#xff0c;完成高清数字人形象的打造。 项目代码地址&#xff1a;github地址 1.1…

可再生能源与能源存储技术的结合和互补

在全球对可再生能源的需求日益增长的背景下&#xff0c;如何将可再生能源与能源存储技术相结合&#xff0c;实现能源的高效利用和持续供应成为了一个重要的议题。本文将探讨可再生能源与能源存储技术的结合与互补关系&#xff0c;分析其对能源领域的影响以及未来发展的前景。 …

CSS常用样式

文章目录 字体样式文本样式颜色和背景样式对齐方式下划线、上划线、删除线设置行高 列表样式背景样式背景颜色背景图片背景重复背景大小 鼠标样式伪类样式设置透明度 字体样式 所有样式都写在<style>标签内&#xff0c;里面加选择器 <!DOCTYPE html> <html>…

别小看可拖拽式表单设计器,降本增效就靠它啦!

在经济快速发展的当下&#xff0c;办公已然进入流程化发展阶段。不少企业希望实现降本增效的办公效果&#xff0c;大家不妨可以了解下可拖拽式表单设计器。通过简单的拖拉拽就能实现应用组建&#xff0c;创建属于自己的快速开发框架平台&#xff0c;不仅省下培养专业程序人工的…

安科瑞电化学储能电能管理系统解决方案

1.概述 在我国新型电力系统中&#xff0c;新能源装机容量逐年提高&#xff0c;但是新能源比如光伏发电、风力发电是不稳定的能源&#xff0c;所以要维持电网稳定&#xff0c;促进新能源发电的消纳&#xff0c;储能将成为至关重要的一环&#xff0c;是分布式光伏、风电等新能源…

抖音本地生活团购软件开发

抖音本地生活团购软件开发需要考虑以下几个方面&#xff1a; 功能设计&#xff1a;根据本地生活团购服务特点&#xff0c;设计相应的功能模块&#xff0c;如商家入驻、商品展示、订单管理、支付等。 技术选型&#xff1a;选择适合该项目的技术和框架&#xff0c;如移动…

【MySQL经典练习题】1. 多列数据求最大值

用 SQL 从多行数据里选出最大值或最小值很容易——通过 GROUP BY 子句对合适的列进行聚合操作&#xff0c;并使用 MAX 或 MIN 聚合函数就可以求出。 那么&#xff0c;从多列数据里选出最大值该怎么做呢&#xff1f; 目录 1、建表SQL 2、查询SQL &#xff08;1&…

LabVIEW开发汽车装配挡风玻璃清洗机灌装机

LabVIEW开发汽车装配挡风玻璃清洗机灌装机 挡风玻璃清洗机灌装机用于填充车内的肥皂槽。该项目在汽车行业实施。可编程逻辑控制器用于许多类型的行业&#xff0c;它使系统灵活。以前使用继电器逻辑&#xff0c;但由于其局限性&#xff0c;用PLC代替了。PLC用于模拟和数字逻辑信…

Thymeleaf介绍及其在Spring Boot中的使用

&#x1f4d6; Thymeleaf简介 &#x1f4da; Thymeleaf的定义 Thymeleaf 是一款现代化的服务器端 Java 模板引擎&#xff0c;适用于 Web 和独立应用场景。它具备处理 HTML、XML、JavaScript、CSS 以及纯文本的能力。Thymeleaf 的核心目标是为开发者提供一种优雅且自然的模板设…

vue 图片上传到腾讯云对象存储组件封装(完善版)

vue 上传图片到腾讯云对象存储 1、 引入cos-js-sdk-v52、封装uploadcos.js3、封装图片上传组件、调用上传方法4、页面使用组件 之前总结过 vue 封装图片上传组件到腾讯云对象存储&#xff0c;后来又加了一些功能&#xff0c;在图片过大时进行压缩&#xff0c;压缩完成之后&…

基于NXP i.MX 6ULL——MQTT通信协议的开发案例

前 言 本指导文档适用开发环境&#xff1a; Windows开发环境&#xff1a;Windows 7 64bit、Windows 10 64bit Linux开发环境&#xff1a;Ubuntu 18.04.4 64bit 拟机&#xff1a;VMware15.1.0 U-Boot&#xff1a;U-Boot-2020.04 Kernel&#xff1a;Linux-5.4.70 Linux S…

深入理解ThreadPoolExecutor线程池工作原理源码解析

文章目录 0. 前言1. 生命周期管理1.1 创建1.2 执行1.2.1 任务执行入口1.2.2 addWorker解析1.2.3 Worker类解析 1.3 关闭1.4 终止阶段 2. 总结 0. 前言 背景&#xff1a;最近技术交流群里有个新同学&#xff0c;面试的时候被问到线程池相关的问题&#xff0c;答的不是很好&#…

LinkedIn领英如何创建公司主页?附领英产品专区创建方法

领英常见问题-如何创建公司主页&#xff1f; 领英不仅可以创建个人主页&#xff0c;还可以以企业的身份创建公司主页。 公司主页就相当于自己的官网&#xff0c;可以发布动态&#xff0c;展示公司信息&#xff0c;做官网外链&#xff0c;对公司来讲也是一种品牌形象宣传&…

PDF转换软件有哪些?分享免费好用的PDF转换工具!

PDF是在办公和学习中常用的文件格式&#xff0c;它包含文字、图片、数据等各种信息&#xff0c;可以说是功能丰富。然而&#xff0c;有时我们需要将PDF转换为PPT格式以便于演示&#xff0c;或者将其转换为Word格式以节省内存空间。这时候就需要使用PDF转换软件。下面我将介绍一…

20230618_ISP-pipeline-hdrplus_contrast

原理&#xff1a; global映射曲线&#xff0c;通过这个曲线控制黑的更黑&#xff0c;白的更白 b&#xff1a;黑电平 s&#xff1a;强度&#xff0c;值越大越接近yx&#xff1b;越小对比度越强 代码&#xff1a; 简单的映射表&#xff0c;没什么好讲的 效果&#xff1a; before&…

【Spring】设计思想

一、Spring 是什么&#xff1f; Spring是一个开源的Java框架&#xff0c;有着活跃而庞大的社区&#xff08;例如&#xff1a;Apache&#xff09;&#xff0c;Spring 提供了一系列的工具和库&#xff0c;可以帮助开发者构建高效、可靠、易于维护的企业级应用程序。Spring的核心…