【C++项目设计】tcmalloc高并发内存池

news2024/12/24 9:40:09

🧸🧸🧸各位大佬大家好,我是猪皮兄弟🧸🧸🧸
在这里插入图片描述

文章目录

  • 一、项目介绍
  • 二、池化技术与内存池
    • 池化技术
    • 内存池
    • 内存池需要解决的问题
  • 三、malloc
  • 四、定长内存池(了解内存池&&后面的小组件)
    • 结构定义&&自由链表
    • 避免使用malloc开辟空间
    • 定长内存池申请空间的逻辑
    • 定长内存池&&malloc、free性能分析
  • 五、高并发内存池整体框架设计
  • thread cache
    • thread cache整体设计
    • thread cache哈希桶映射对齐规则
    • 内碎片空间浪费率
    • ThreadCahce类
    • thread cache的TLS无锁访问(高并发的原因所在)
  • central cache
    • central cache 整体设计
    • central cache结构设计
    • span结构
    • span的双链表结构
    • central cache的结构
    • central cache核心实现
    • 满开始反馈调节算法
  • page cache
    • page cache整体设计
    • 在page cache获取一个n也的span过程
    • page cache的实现方式
    • page cache 中获取span
  • 六、申请内存过程联调
  • thread cache回收内存
  • central cache 回收内存
  • page cache 回收内存
  • 七、释放内存过程联调
  • 大于256KB的大块内存线程申请问题
  • 使用定长内存池脱离使用new
  • 释放内存时怎么优化成不需要传大小
  • 多线程环境下对比malloc测试
  • 性能瓶颈分析
  • 针对性能瓶颈使用基数树进行优化
  • 使用基数树进行优化代码实现
    • 读取基数树映射关系不需要加锁的原因
  • 项目源码

一、项目介绍

本项目是实现一个高并发的内存池,是把google的一个开源项目tcmalloc的核心给实现了下来,tcmalloc全称
Thread-Caching Malloc,即线程缓存的malloc,实现了高效的多线程内存管理,它是用于替代系统的内存
分配相关的函数(如malloc、free)。

  1. tcmalloc知名度是非常高的,Go语言就直接用它做了自己的内存分配器。
  2. 该项目是对tcmalloc中最核心的框架的一个简化,模拟实现出mini版的高并发内存池
  3. 该项目主要涉及C/C++、数据结构(链表、哈希桶)、操作系统内存管理、单例模式、多线程、TLS线程局部存储、互斥锁、基数树等方面的技术。

二、池化技术与内存池

池化技术

池化技术 (Pool) 是一种很常见的编程技巧,在请求量大时能明显优化应用性能,降低系统频繁申请的资源开销。我们日常工作中常见的有数据库连接池、线程池、对象池等,它们的特点都是将 “昂贵的”、“费时的” 的资源维护在一个特定的 “池子” 中,规定其最小连接数、最大连接数、阻塞队列等配置,方便进行统一管理和复用,通常还会附带一些探活机制、强制回收、监控一类的配套功能。
提前保存大量的资源,以备不时之需以及重复使用。池化技术应用广泛,如内存池,线程池,连接池等等。
对连接或线程的复用,并对复用的数量、时间等进行控制,从而使得系统的性能和资源消耗达到最优状态。

内存池

内存池是指程序预先从操作系统申请一块足够大内存,此后,当程序中需要申请内存的时候,不是直接
向操作系统申请,而是直接从内存池中获取
;同理,当程序释放内存的时候,并不真正将内存返回给操作系统,而是返回内存池。当程序退出(或者特定时间)时,内存池才将之前申请的内存真正释放。

内存池需要解决的问题

内存池是需要我们自己去进行管理的,最难受受的就是它的碎片问题
内存碎片又分为内碎片和外碎片

内碎片:我们在使用过程中这部分空间申请了又无法被使用的空间
内碎片的问题都不是很严重,因为我们使用了之后,他还会还回来,
可能下一个人再使用的时候就不存在内碎片问题了。

外碎片:因为某些原因,有足够多的空间,但是因为这些空间的碎片化,导致无法申请的问题
外碎片才是最让人头疼的
在这里插入图片描述

三、malloc

在C/C++当中,我们是要去malloc空间的(new的底层也是malloc),但是注意了,我们调用malloc并不是直接向堆申请空间。malloc本身也是一个内存池。malloc只规定了他会完成什么功能,但他的实现方案有很多种,比如windows的VS系列就是自己写的一套方案。Linux的gcc用的glibc的ptmalloc
在这里插入图片描述

四、定长内存池(了解内存池&&后面的小组件)

malloc是一个通用的函数,在什么场景下都可以用,这就意味着,它的效率并不会太高
而我们根据场景下专门设计的内存池就会达到极致的性能
在这里插入图片描述

定长内存池
定长内存池其实就是预先申请一大部分空间,然后由我自己去管理。
我们可以根据类型去决定长度,也可以通过非类型模板参数去决定长度

定长内存池如何对不用的空间进行管理呢?

结构定义&&自由链表

我们的定长内存池需要3个字段

  1. _remainBytes用来表示我这个大数组中还有多少剩余没分配的空间
  2. _memory就是用来表示大块空间的起始地址的,用的时候我_memory+=size就可以了
  3. _freeList自由链表,用来回收内存
    在这里插入图片描述
template<class T>
class ObjectPool
{
private:
	char* _memory = nullptr;     //指向大块内存的指针
	size_t _remainBytes = 0;     //大块内存在切分过程中剩余字节数

	void* _freeList = nullptr;   //还回来过程中链接的自由链表的头指针
};

我们需要使用自由链表去管理还回来的空间:
自由链表也就是我们需要用头指针大小个字节去存储下一个结点的首地址。
而我们知道指针的大小与机器有关,32位下指针大小4字节,64位下指针大小8字节

  1. 我们可以采用条件编译的方式去决定指针的大小
  2. *(void**)ptr ,我们知道,指针类型的意义之一就是代表了一次解引用能访问多大的空间

所以,我们采用如下的方式进行自由链表的连接

static void*& NextObj(void* ptr)
{
	return (*(void**)ptr);
}

这也就决定了我们给对象开空间必须开4或者8字节以上的空间,那么这就有可能产生内碎片的问题,比如它像申请一字节的空间,但是我给了4字节,他也没用上。(虽然我们自由链表用了)

避免使用malloc开辟空间

因为我们tcmalloc设计出来本身就是要拿来替代malloc,所以我们需要避免使用malloc来开辟空间

1.在Windows下,我们可以选择采用VirtualMalloc()来直接向堆申请空间
2.在Linux下,我们可以选择brk()或者mmap()来向堆申请空间

#ifdef _WIN32
#include <Windows.h>
#else
	//...
#endif

//直接去堆上申请按页申请空间
inline static void* SystemAlloc(size_t kpage)
{
#ifdef _WIN32
	void* ptr = VirtualAlloc(0, kpage << PAGE_SHIFT, MEM_COMMIT | MEM_RESERVE, PAGE_READWRITE);
#else
	// linux下brk mmap等
#endif
	if (ptr == nullptr)
		throw std::bad_alloc();
	return ptr;
}

//直接将内存还给堆
inline static void SystemFree(void* ptr)
{
#ifdef _WIN32
	VirtualFree(ptr, 0, MEM_RELEASE);
#else
	//linux下sbrk unmmap等
#endif
}

定长内存池申请空间的逻辑

申请空间
①先看看_freeList自由链表中有没有结点,有的话直接Pop弹出,拿来用就可以了
②没有的话就去_memory大块内存要
_memory不足够的话,也就是_remainBytes<size就找重新堆申请大块空间

所以定长内存池就可以简单实现了

template<class T>
class ObjectPool
{
public:
	//申请对象
	T* New()
	{
		T* obj = nullptr;
		//优先把还回来的内存块对象,再次重复利用
		if (_freeList != nullptr)
		{
			obj = (T*)_freeList;
			_freeList = NextObj(_freeList);
		}
		else
		{
			//指针个大小的地址
			size_t objSize = sizeof(T) < sizeof(void*) ? sizeof(void*) : sizeof(T);
			//剩余内存不够一个对象大小时,则重新开大块空间
			if (_remainBytes < objSize)
			{
				_remainBytes = 128 * 1024;
				//_memory = (char*)malloc(_remainBytes);
				_memory = (char*)SystemAlloc(_remainBytes >> 13);
				if (_memory == nullptr)
				{
					throw std::bad_alloc();
				}
			}
			//从大块内存中切出objSize字节的内存
			obj = (T*)_memory;
			_memory += objSize;
			_remainBytes -= objSize;
		}
		new(obj)T;		//定位new,申请的空间显示初始化
		return obj;
	}
	//释放对象
	void Delete(T* obj)
	{
		obj->~T();
		//将释放的对象头插到自由链表
		NextObj(obj) = _freeList;
		_freeList = obj;
	}
private:
	char* _memory = nullptr;     //大块内存起始地址
	size_t _remainBytes = 0;     //大块内存 剩余字节数
	void* _freeList = nullptr;   //自由链表
};

定长内存池&&malloc、free性能分析

我们通过多轮的n次申请和释放来进行malloc、free和定长内存池New和Delete来进行性能测试

struct treenode
{
	int _val;
	treenode* _left;
	treenode* _right;
	treenode()
		:_val(0)
		, _left(nullptr)
		, _right(nullptr)
	{}
};


void testobjectpool()
{
	// 申请释放的轮次
	const size_t rounds = 3;
	// 每轮申请释放多少次
	const size_t n = 1000000;
	std::vector<treenode*> v1;
	v1.reserve(n);

	//malloc和free
	size_t begin1 = clock();
	for (size_t j = 0; j < rounds; ++j)
	{
		for (int i = 0; i < n; ++i)
		{
			v1.push_back(new treenode);
		}
		for (int i = 0; i < n; ++i)
		{
			delete v1[i];
		}
		v1.clear();
	}
	size_t end1 = clock();

	//定长内存池
	objectpool<treenode> tnpool;
	std::vector<treenode*> v2;
	v2.reserve(n);
	size_t begin2 = clock();
	for (size_t j = 0; j < rounds; ++j)
	{
		for (int i = 0; i < n; ++i)
		{
			v2.push_back(tnpool.new());
		}
		for (int i = 0; i < n; ++i)
		{
			tnpool.delete(v2[i]);
		}
		v2.clear();
	}
	size_t end2 = clock();

	cout << "new cost time:" << end1 - begin1 << endl;
	cout << "object pool cost time:" << end2 - begin2 << endl;
}

因为是定长内存池,可以说在申请和释放的过程已经把性能拉到了极致,因为它并没有什么处理,就是申请到简单的内存后,内存地址的++ - -
在这里插入图片描述
就是因为malloc是通货,而我们定长内存池是专门针对申请定长对象而设计的,因此在这种特殊场景下的效率更高。

五、高并发内存池整体框架设计

现代很多的开发环境都是多线程的,在申请内存的场景下,必然存在激烈的锁竞争的问题。malloc本身已经很优秀,也是一个内存池,但是tcmalloc在多线程并发场景下会更胜一筹。我们实现的高并发内存池需要考虑以下几个方面

  1. 性能问题
  2. 多线程环境下,锁竞争问题(性能瓶颈)
  3. 内存碎片问题(内碎片&&外碎片)

高并发内存池Concurrent Memory Pool主要由以下三个部分构成:

  1. thread cache
  2. central cache
  3. page cache
    在这里插入图片描述

解释如下:
thread cache是每个 线程独有的(利用线程的TLS特性),用于小于256KB内存的分配,每个线程独享一个cache,这也是这个并发线程池高效的地方。
![在这里插入图片描述](https://img-blog.csdnimg.cn/47fa6af61f76439ebaf22f38eabeb470.png

central cache 中心缓存是所有线程共享的,thread cache按需向central cache获取对象,central cache在合适的时候回收部分对象(避免一个线程占用太多)。中心缓存起到了一个均衡调度的作用。因为central central不与其他桶进行交互的性质呢,我们选择采用桶锁
在这里插入图片描述

page cache,页缓存是在central cache上面的一层缓存,存储的内存是以页为单位进行分配的,central cache没有对象时,会向page cache申请多个页的跨度Span,然后申请到后进行切分成小块进行存储。在满足条件的时候进行回收。并采用分裂和合并的机制来达到更高效和缓解内存碎片的问题(外碎片)
在这里插入图片描述

其他的东西到时候再细说


thread cache

thread cache整体设计

定长内存池虽然效率已经很高很高,但是定长内存池只支持固定大小块的内存申请和释放。定长内存池中只需要一个自由链表管理释放回来的内存块。现在我们要支持申请和释放不同大小的内存块,那么我们就需要多个自由链表来管理释放回来的内存块,因此thread cache实际上一个哈希桶结构,每个桶中存放的都是一个自由链表。

不过我们设计的是只要是<=256KB的内存都找thread cache,但是如果是间隔1byte我们就要给一个桶来存储的话,那么就意味着我们需要20多万个自由链表来进行存储。光是存储这些自由链表的头指针就会消耗大量内存,显然是不可行的。

这时我们可以选择做一些平衡的牺牲,比如让这些字节数按某种大小进行对齐。由此,设计出以下的方案

	//[1,128]              8byte对齐       freelist[0,16)
	//[128+1,1024]         16byte对齐      freelist[16,72)
	//[1024+1,8*1024]      128byte对齐     freelist[72,128)
	//[8*1024+1,64*1024]   1024byte对齐    freelist[128,184)
	//[64*1024+1,256*1024] 8*1024byte对齐  freelist[184,208)

这样设计之后,会让整体的内碎片控制在10%左右。并且不会产生太多的自由链表头指针。减少空间的开销。我们用一点点可控的内碎片就换取了大量的空间,并且这些内碎片其实是构不成什么大问题的,用完了还要还回去。
在这里插入图片描述
因此,因为哈希桶大小的设计,我们在申请空间的时候,比如申请7KB,就会调用特定的算法,提升到8KB去申请(按照我们桶的大小去申请)。然后如果这个桶中有,直接头删一块拿去用就行,如果没有的话,就需要向上一层central cache申请大块内存了。


因为项目的复杂性,就需要有更好的封装去供我们调用,比如自由链表的插入和删除的封装
//自由链表封装,这是单个自由链表,thread cahce有多个
class FreeList
{
public:
	//将释放掉的小空间头插入FreeList
	void Push(void* obj)
	{
		assert(obj);
		//头插
		NextObj(obj) = _freeList;
		_freeList = obj;
	}

	//自由链表头部获取对象,头删
	void* Pop()
	{
		assert(_freeList);
		//头删
		void* obj = _freeList;
		_freeList = NextObj(_freeList);
		return obj;
	}

private:
	//自由链表头指针
	void* _freeList = nullptr; 
};

thread cache哈希桶映射对齐规则

为什么要进行对齐上面已经说了,每个字节都去搞个桶开销太大,我们需要制定合适的映射规则。

因为是自由链表,所以我们得保证最小的大小至少能存的下一个指针,而且又因为可移植性,我们要考虑32位和64位平台的情况,所以选择至少能存下8字节的大小。

但是如果都按8字节进行对齐的话,需要建立256 * 1024 / 8 = 32768个桶,这个数量还是很多的。所以我们又考虑不同范围的字节数按照不同的方式对齐。(因为这样可以控制内碎片的比例)

字节数对齐数哈希桶下标
[1,128]8[0,16)
[128+1, 1024]16[16,72)
[1024+1, 8*1024]128[72,128)
[81024+1, 641024]1024[128,184)
[641024+1, 2561024]8*1024[184,208)

内碎片空间浪费率

按照我们上面的对齐规则,我们可以把内碎片的空间浪费率控制到10%左右(1-128我们不考虑,因为太小,如果申请1字节就算只给他开辟2字节,浪费率也是50%)

浪费率 = 浪费的字节数 / 该对齐字节数

比如说129~1024,对齐数是16,那么最多浪费掉15个字节,所以 P = 15 / 144 ≈ 10.42%
再比如1024-8*1024, 对齐数128,那么最多浪费掉127字节,所以 P = 127 / 1152 ≈ 11.02%

所以我们能够把内碎片控制在10%左右,是非常可观的(因为我们大大减少了桶的数量)

ThreadCahce类

由上可知,我们thread cache哈希桶中桶的数量也就是自由链表的数量已经减少到了 208个,而且我们申请 0-256KB的内存都可以直接像ThreadCahce申请了。

所以给出类的定义


//<=256KB,向thread cache申请
//>256KB,向page cache或者系统堆申请
static const size_t MAX_BYTES = 256 * 1024;
//thread cache和central cache自由链表哈希桶的表大小
static const size_t NFREELISTS = 208;//根据我们的设计

class ThreadCache
{
public:
	//申请内存对象
	void* Allocate(size_t size);

private:
	FreeList _freeLists[NFREELISTS]; //哈希桶
};

在thread cache申请对象时,所给的字节数通过算法算出对应的哈希桶下标。比如申请7字节,算法算出大小为8这个桶,也就是第一个桶。如果桶不为nullptr,那么获取对象即可,如果桶为nullptr,那么需要向上一层central cache获取

//申请内存对象
void* ThreadCache::Allocate(size_t size)
{
	assert(size <= MAX_BYTES);
	size_t alignSize = SizeClass::RoundUp(size);//对齐规则算出大小
	size_t index = SizeClass::Index(size);//根据大小找到哈希桶下标
	if (!_freeLists[index].Empty())
	{
		return _freeLists[index].Pop();
	}
	else
	{
		return FetchFromCentralCache(index, alignSize);
	}

thread cache的TLS无锁访问(高并发的原因所在)

在操作系统当中,我们知道C,C++多线程需要使用原生线程库,而对于线程,(linux)OS只给出了轻量级进程的概念。linux下的策略是唯一的,因为线程和进程的相似性,linux采用复用的方式来完成线程(没有独特的线程结构),而是采用多PCB(线程)跑在进程的进程地址空间上。

这也就让原生线程库帮我们区分线程和进程。所以也就肯定意味着原生线程库要帮我们做一部分线程的管理。而在原生线程库中,它管理的主要是线程id(地址的方式),线程局部存储TLS、线程私有栈。

所以每一个线程都有自己的一个独享的thread cache。这就是TLS线程局部存储。使用该存储方法的变量在它所在的线程是全局可访问的。因为是每个线程独有一份,所以保证了数据的独立性,所以我们就可以以无锁的方式去申请和释放空间。tcmalloc高并发的原因也就是在这里。

使用TLS方法

//TLS -- Thread Local Storage
static __declspec(thread) ThreadCache* pTLSThreadCache=nullptr;


//TLS,线程专属一个threadcache,无锁的申请和释放
if (pTLSThreadCache == nullptr)
{
	pTLSThreadCache = new ThreadCache;//new到TLS当中保存
}

central cache

central cache 整体设计

当线程申请thread cache的某个桶时,如果thread cace中对应的自由链表不为空,那么直接头删出一个内存快供线程使用,而如果此时自由链表为空,那么这是thread cache就需要向central cache申请内存了。

central cache的结构和thread cache是类似的,都是哈希桶的结构,并且遵循的映射规则都是类似的,这样做的好处就在于,当thread cache的某个桶中没有内存了,可以直接到central cache对应的哈希桶去取内存

不过呢,thread cache是每个线程独享的,而central cache因为要协调整个申请,释放的过程,所以是所有线程所共享的,每个线程没内存了都会去找central cache,所以呢,central cache需要加锁访问。而这时呢,因为我们central cache 和thread cache的同样大小的哈希桶设计,让我们只用桶锁就可以完成上锁,而不用锁住整个哈希桶。因为大小关系是对应起来的。并不会向其他大小的桶发起申请。

然后呢,central cache为了协调请求和释放。每个桶中挂的是一个一个的span,称之为跨度。
在这里插入图片描述
每个span管理的都是以页为单位的内存,每个桶里面的若干个span是按照双向循环链表的方式挂起来的。central cache当中的span里面是按照桶的大小切分好的。(提前说明:上一层page cache的span是没有切分的。)

central cache结构设计

每个程序运行起来之后都是进程,在32位平台下,进程地址空间的大小是232,在64位平台下,进程地址空间的大小就是 264

    而页的大小一般为4K或者8K,我们就以8K为标准。8K就是213,那么32位平台下,就被分成了232 / 213 = 219 个页,在64位下就是被分成 264 / 213 = 251个页,页号本质与地址是一样的,只是单位不一样罢了。

    因为页号在不同平台的取值范围不同,所以我们不能简单的使用一个无符号整型来存储页号,这时我们需要借助条件编译来解决

//需要把_WIN64写在前面,因为64位平台,
//_WIN64和_WIN32的定义都有,所以要先让他ifdef _WIN64
#ifdef _WIN64
	typedef unsigned long long PAGE_ID;
#elif _WIN32
	typedef size_t PAGE_ID;
#else
	//linux
#endif

span结构

    我们知道central cache的每个桶挂的都是一个个的span,span是一个管理以页为单位的大块内存块,span的结构如下:

//管理以页为单位的大块内存
struct Span
{
	PAGE_ID _pageId = 0;//起始页的页号
	size_t _n = 0;//页的数量
	
	Span* _next = nullptr;//双链表结构
	Span* _prev = nullptr;

	void* _freeList = nullptr;
	//切好的小块内存的自由链表
	size_t _useCount = 0;       
	//切好的小块内存分配给thread cache的计数
};

说明:
    对于span管理的以页为单位的大块内存,我们就用起始页号和页的数量来标定,因为页号和进程地址空间的地址其实是对应的,比如页号为1就代表了0-8K的这一块内存。,这也便于后面的合并,合并就是看页号连不连续。

    为了方便thread cache的申请,我们之前说了需要在central cache中,把申请下来的span里面按照桶的大小做一个切分,切分成自由链表,后序直接从自由链表头部拿n个去使用就可以了。

    _useCount就是管理这个span中被使用的小块有多少,因为不仅要申请,还要释放,当_useCount计数器变为0时代表当前span切出去的内存块全部回来了,那么这时也就说明负载其实不高了,就可以把这一块还给page cache。

    每个桶中的span是以双链表的形式组织起来的,因为我们后序要把span还给page cache,就会涉及到找到前后的span,然后移除我自己。单链表的话时间复杂度就比较高。

span的双链表结构

根据上面的描述,central cache的每个哈希桶里面存的是span的双链表结构,我们可以对它进行封装,比如申请下来的span我们进行Insert,释放span我们进行Erase。然后呢就是每个SpanList我们都要加一个桶锁

//带头双向循环链表
class SpanList
{
public:
	SpanList()
	{
		_head = new Span;
		_head->_next = _head;
		_head->_prev = _head;
	}
	void Insert(Span* pos, Span* newSpan)
	{
		assert(pos);
		assert(newSpan);

		Span* prev = pos->_prev;

		prev->_next = newSpan;
		newSpan->_prev = prev;

		newSpan->_next = pos;
		pos->_prev = newSpan;
	}
	void Erase(Span* pos)
	{
		assert(pos);
		assert(pos != _head); //不能删除哨兵位的头结点

		Span* prev = pos->_prev;
		Span* next = pos->_next;

		prev->_next = next;
		next->_prev = prev;
	}
private:
	Span* _head;
public:
	std::mutex _mtx; //桶锁
};

central cache的结构

所以呢,我们central cache就是一个和thread cache对齐方式相同的哈希桶,只不过保存的是一个个span,里面是按照对齐方式切分好的。

class CentralCache
{
public:
	//...
private:
	SpanList _spanLists[NFREELISTS];
	//NFREELISTS就是我们之前定义的宏,会按照对齐方式产生多少个桶
};

central cache核心实现

    central cache 在整个进程当中只有一个,对于这种只能创建一个对象的类,我们可以将其设置为单例模式。

    单例模式可以保证系统中该类只会有一个实例。并会提供一个全局的访问点,该实例被所有模块共享。单例模式又分为饿汉和懒汉两种模式,懒汉相对复杂,我们这里使用饿汉就足够了。(饿汉在main开始之前就会存在,所以也就不会有线程安全问题(指的是不会向懒汉那样使用时才去创建,而又会同时使用。))

//central cache单例模式
class CentralCache
{
public:
	//提供一个全局访问点
	static CentralCache* GetInstance()
	{
		return &_sInst;
	}
private:
	SpanList _spanLists[NFREELISTS];
private:
	CentralCache() //构造函数私有
	{}
	CentralCache(const CentralCache&) = delete; 
	CentralCache& operator=(const CentralCache&) = delete;
	//防拷贝

	static CentralCache _sInst;
};

因为单例模式,我们要控制创建对象,那么首先要做的就是构造函数私有化和防拷贝

CentralCache类当中还需要有一个CentralCache类型的静态的成员变量,当程序运行起来后我们就立马new该对象,在此后的程序中就只有这一个单例了。

然后我们提供一个静态成员函数来让外部获取该对象

满开始反馈调节算法

当thread cache向central cache申请内存是,central cache一次应该给出多少小块内存供thread cache申请。

如果给的太少,那么短时间用完了之后又会来申请,一次给的太多了,有用不完浪费、所以我们这里采用了一个慢开始调节算法。

我们可以设置最少给2个内存块,最大给512个内存块

//管理对齐和映射等关系
class SizeClass
{
public:
	//thread cache一次从central cache获取对象的上限
	static size_t NumMoveSize(size_t size)
	{
		assert(size > 0);
	
		//对象越小,计算出的上限越高
		//对象越大,计算出的上限越低
		int num = MAX_BYTES / size;
		if (num < 2)
			num = 2;
		if (num > 512)
			num = 512;
	
		return num;
	}
};

但是一次如果给出512个还是很多的,所以我们可以再维护一个成员变量_maxSize,该变量初始值为1,并且提供一个公有的函数去获取这个变量。所以thread cache中每个自由链表都会有自己的_maxSize;

class FreeList
{
public:
	size_t& MaxSize()
	{
		return _maxSize;
	}

private:
	void* _freeList = nullptr; //自由链表
	size_t _maxSize = 1;
};

此时当thread cache再来申请的时候。我们就选择Min(NumMoveSize(),MaxSize());如果采用的是_maxSize,那么对_maxSize进行加一;

所以,thread cache第一次向central cache申请某大小的对象时,申请到的都是一个,但下一次thread cache再向central cache申请同样大小的对象时,因为该自由链表中的_maxSize增加了,最终就会申请到两个。直到该自由链表中_maxSize的值,增长到超过计算出的值后就不会继续增长了,此后申请到的对象个数就是NumMoveSize()计算出的个数。(这有点像网络中拥塞控制的机制,不过拥塞控制前面是以指数的方式增长的,而且到达阈值之后还会线性增长,只能说相似。)

从central cache获得内存块

//从中心缓存获取对象
void* ThreadCache::FetchFromCentralCache(size_t index, size_t size)//哪个桶(算法算出来),多少个
{
	//慢开始反馈调节算法
	//1、最开始不会一次向central cache一次批量要太多
	//2、如果你不断有size大小的内存需求,那么——_batchNum就会不断增长,直到上限
	size_t batchNum = std::min(_freeLists[index].MaxSize(), SizeClass::NumMoveSize(size));//选择两个之中的最小
	if (batchNum == _freeLists[index].MaxSize())
	{
		_freeLists[index].MaxSize() += 1;
	}
	void* start = nullptr;
	void* end = nullptr;
	size_t actualNum = CentralCache::GetInstance()->FetchRangeObj(start, end, batchNum, size);
	assert(actualNum >= 1); //至少有一个

	if (actualNum == 1) //申请到对象的个数是一个,则直接将这一个对象返回即可
	{
		assert(start == end);
		return start;
	}
	else //申请到对象的个数是多个,还需要将剩下的对象挂到thread cache中对应的哈希桶中,因为只用一个
	{
		_freeLists[index].PushRange(NextObj(start), end);
		return start;
	}
}

从central cache的某个span中获取一定数量的内存块

这里我们要从central cache的span中获取n个指定大小的对象。因为是span当中的取出来的这n个对象是链接在一起的(通过自由链表),我们只需要得到这段链表的头和尾即可,这里可以采用输出型参数进行获取。

//从central cache获取一定数量的内存块给thread cache
size_t CentralCache::FetchRangeObj(void*& start, void*& end, size_t n, size_t size)
{
	size_t index = SizeClass::Index(size);
	_spanLists[index]._mtx.lock(); //加桶锁
	
	//找到一个非空的span
	Span* span = GetOneSpan(_spanLists[index], size);
	assert(span); //span不为空
	assert(span->_freeList); 

	//从span中获取n个内存块
	//如果不够n个,有多少拿多少
	start = span->_freeList;
	end = span->_freeList;
	size_t actualNum = 1;
	while (NextObj(end)&&n - 1)
	{
		end = NextObj(end);
		actualNum++;
		n--;
	}
	span->_freeList = NextObj(end); //取完后剩下的内存块放回自由链表
	NextObj(end) = nullptr; //取出的一段链表的表尾置空
	span->_useCount += actualNum; //更新被分配给thread cache的计数

	_spanLists[index]._mtx.unlock(); //解锁
	return actualNum;
}

加桶锁的原因
①central cache为所有线程共享,比如加锁来保证临界资源的安全。
②是按照哈希桶特定的对齐方式来访问的,该线程申请内存的时候不会影响到其他桶

在向central cache获取对象时,先是在central cache对应的哈希桶中获取到一个非空的span,然后从这个span的自由链表中取出n个对象即可,但可能这个非空的span的自由链表当中对象的个数不足n个,这时该自由链表当中有多少个对象就给多少就行了。不够的话反正下一次还要来拿,也就不要写一些复杂的逻辑了。

向thread cache自由链表中插入我们申请到了一段自由链表

//管理切分好的小对象的自由链表
class FreeList
{
public:
	//插入一段范围的对象到自由链表
	void PushRange(void* start, void* end)
	{
		assert(start);
		assert(end);

		//头插
		NextObj(end) = _freeList;
		_freeList = start;
	}
private:
	void* _freeList = nullptr; //自由链表
	size_t _maxSize = 1;
};

page cache

page cache整体设计

central cache当中的span是从page cache当中申请到了,显而易见的,page cache当中也是存储的一个一个的span,只不过和central cache当中不同,没有经过切分。

page cache结构

在这里插入图片描述

首先,page cache的哈希桶采用的是直接定址法,比如一号通挂的都是1页的span,二号桶挂的是2页的span,依次类推,最大的是128号桶,挂128页的span

其次呢,page cache当中存储的是从堆中申请下来的整页的span,按照桶代表的大小切分是central cache干的事,因为central cache服务的是thread cache,它需要这么做。但是我们的page cache不同,保存的就是整页

而为了让桶号和页号对应起来,我们就可以空出0号桶来优化直接定址

//page cache中哈希桶的个数
static const size_t NPAGES = 129;

那为什么选择最大是128页的span呢?
    因为线程申请的最大空间的256KB,而128页可以被切成4个256KB的对象,已经是足够了。当然,你如果想挂更大的,也是可以的,根据具体的需求进行修改就可以了。


在page cache获取一个n也的span过程

现在的情况是thread cache向central cache申请,但是申请到了空的span,就表明central cache中没有来,所以central cache就会去找page cache去要span下来(然后central cache自己对他作切分),然后挂在对应的哈希桶上。

page cache当中,采用了分裂和合并的思想,分裂是对于申请而言的。比如central cache想要申请一个2页的span,有的话就返回,没有的话就会继续找下一个桶,如果找到128页的桶都还没有,page cache就要向堆申请大块内存了。而如果找到了,比如找到了64页的,那么就会切分成两块,把2页的给central cache,把剩下的62页的挂在62页的桶上


page cache的实现方式

因为每个线程都可能去向central cache进行申请,虽然central cache只有一个,但是因为它加的是桶锁,所以就可能这多个线程申请的桶再同时向page cache申请,对page cache来说相当于是一个多线程。又因为采用的是分裂和合并的思想,所以是涉及到其他桶的,所以我们申请page cache的时候要给整个哈希桶上锁。(如果说使用桶锁也没关系,但是就会出现大量频繁的加锁和解锁,导致程序的效率低下),因此采用的是锁住整个哈希桶。

page cache的访问也是比较频繁的,特别是在前期。所以不管是加桶锁还是给整个哈希桶加锁都是会大大影响效率的。所以后面想办法优化。

此外,page cache再整个进程中也是唯一存在,所以设置单例模式

//单例模式
class PageCache
{
public:
	//提供一个全局访问点
	static PageCache* GetInstance()
	{
		return &_sInst;
	}
private:
	SpanList _spanLists[NPAGES];
	std::mutex _pageMtx; //大锁
private:
	PageCache() //构造函数私有
	{}
	PageCache(const PageCache&) = delete; //防拷贝
	PageCache& operator=(const PageCache&) =delete;

	static PageCache _sInst;
};

page cache 中获取span

thraed cache 向central cache进行申请,central cache需要先从对于的哈希桶中 获取非空span,然后取出若干个内存块返回给thread cache。也就是找到对应的桶,然后遍历,查看哪个是非空的。

我们可以模拟迭代器给出Begin()和End()标识边界

//带头双向循环链表
class SpanList
{
public:
	Span* Begin()
	{
		return _head->_next;
	}
	Span* End()
	{
		return _head;
	}
private:
	Span* _head;
public:
	std::mutex _mtx; //桶锁
};

如果遍历到了End()结尾还没有非空span,就要向page cache申请了。

关于申请多大页数的span:
我们可以根据thread cache申请的个数来看,我们可以知道申请了多少字节,我们就可以转化为页数,就知道了需要多少页,不够就ceiling向上取整。

//管理对齐和映射等关系
class SizeClass
{
public:
	//central cache一次向page cache获取多少页
	static size_t NumMovePage(size_t size)
	{
		size_t num = NumMoveSize(size); //计算出thread cache一次向central cache申请对象的个数上限
		size_t nPage = num*size; //num个size大小的对象所需的字节数

		nPage >>= PAGE_SHIFT; //将字节数转换为页数
		//PAGE_SHIFT代表页大小转换偏移,比如一页是8K的话,PAGE_SHIFT就是13
		if (nPage == 0) //至少给一页
			nPage = 1;

		return nPage;
	}
};

当central cache申请到若干页的span后,还需要将这个span切成一个个对应大小的对象挂到该span的自由链表当中。(按照桶对应的带下)

    如何找到一个span所管理的内存块呢?首先需要计算出该span的起始地址,我们可以用这个span的起始页号乘以一页的大小即可得到这个span的起始地址,然后用这个span的页数乘以一页的大小就可以得到这个span所管理的内存块的大小,用起始地址加上内存块的大小即可得到这块内存块的结束位置。

    明确了这块内存的起始和结束位置后,我们就可以进行切分了。根据所需对象的大小,每次从大块内存切出一块固定大小的内存块尾插到span的自由链表中即可。

    为什么是尾插呢?因为我们如果是将切好的对象尾插到自由链表,这些对象看起来是按照链式结构链接起来的,而实际它们在物理上是连续的,这时当我们把这些连续内存分配给某个线程使用时,可以提高该线程的CPU缓存利用率。(命中的问题,局部性原理)

//获取一个非空的span
Span* CentralCache::GetOneSpan(SpanList& spanList, size_t size)
{
	//1.在spanList寻找非空span
	Span* it = spanList.Begin();
	while (it != spanList.End())
	{
		if (it->_freeList != nullptr)
		{
			return it;
		}
		else
		{
			it = it->_next;
		}
	}

	//2、spanList中没有非空的span
	//只能向page cache申请
	Span* span = PageCache::GetInstance()->NewSpan(SizeClass::NumMovePage(size));
	
	//计算span的大块内存的起始地址和大块内存的大小(字节数)
	char* start = (char*)(span->_pageId << PAGE_SHIFT);
	size_t bytes = span->_n << PAGE_SHIFT;

	//把大块内存切成size大小的对象链接起来
	char* end = start + bytes;
	
	//先切一块下来去做尾,方便尾插
	span->_freeList = start;
	start += size;
	void* tail = span->_freeList;
	//切分,尾插
	while (start < end)
	{
		NextObj(tail) = start;
		tail = NextObj(tail);
		start += size;
	}
	NextObj(tail) = nullptr; //尾的指向置空
	
	//将切好的span头插到spanList
	spanList.PushFront(span);

	return span;
}

需要注意的是,当我们把span切好后,需要将这个切好的span挂到central cache的对应哈希桶中。因此SpanList类还需要提供一个接口,用于将一个span插入到该双链表中。这里我们选择的是头插,这样当central cache下一次从该双链表中获取非空span时,一来就能找到。

由于SpanList类之前实现了Insert和Begin函数,这里实现双链表头插就非常简单,直接在双链表的Begin位置进行Insert即可。

//带头双向循环链表
class SpanList
{
public:
	void PushFront(Span* span)
	{
		Insert(Begin(), span);
	}
private:
	Span* _head;
public:
	std::mutex _mtx; //桶锁
};

从page cache获取一个k页的span

当我们调用上述的GetOneSpan从central cache的某个哈希桶获取一个非空的span时,如果遍历哈希桶中的双链表后发现双链表中没有span,或该双链表中的span都为空,那么此时central cache就需要向page cache申请若干页的span了,下面我们就来说说如何从page cache获取一个k页的span。

因为page cache是直接按照页数进行映射的,因此我们要从page cache获取一个k页的span,就应该直接先去找page cache的第k号桶,如果第k号桶中有span,那我们直接头删一个span返回给central cache就行了。所以我们这里需要再给SpanList类添加对应的Empty和PopFront函数。

//带头双向循环链表
class SpanList
{
public:
	bool Empty()
	{
		return _head == _head->_next;
	}
	Span* PopFront()
	{
		Span* front = _head->_next;
		Erase(front);
		return front;
	}
private:
	Span* _head;
public:
	std::mutex _mtx; //桶锁
};

如果page cache的第k号桶中没有span,我们就应该继续找后面的桶,只要后面任意一个桶中有一个n页span,我们就可以将其切分成一个k页的span和一个n-k页的span,然后将切出来k页的span返回给central cache,再将n-k页的span挂到page cache的第n-k号桶即可。(分裂)

但如果后面的桶中也都没有span,此时我们就需要向堆申请一个128页的span了,在向堆申请内存时,直接调用我们封装的SystemAlloc函数即可。(直接向堆申请内存 Win下VirtualMalloc,Linux下brk()和mmap())

需要注意的是,向堆申请内存后得到的是这块内存的起始地址,此时我们需要将该地址转换为页号。由于我们向堆申请内存时都是按页进行申请的,因此我们直接将该地址除以一页的大小即可得到对应的页号。

//获取一个k页的span
Span* PageCache::NewSpan(size_t k)
{
	assert(k > 0 && k < NPAGES);
	//先检查第k个桶里面有没有span
	if (!_spanLists[k].Empty())
	{
		return _spanLists[k].PopFront();
	}
	//检查一下后面的桶里面有没有span,如果有可以将其进行切分
	for (size_t i = k + 1; i < NPAGES; i++)
	{
		if (!_spanLists[i].Empty())
		{
			Span* nSpan = _spanLists[i].PopFront();
			Span* kSpan = new Span;
			//在nSpan的头部切k页下来
			kSpan->_pageId = nSpan->_pageId;
			kSpan->_n = k;

			nSpan->_pageId += k;
			nSpan->_n -= k;
			//将剩下的挂到对应映射的位置
			_spanLists[nSpan->_n].PushFront(nSpan);
			return kSpan;
		}
	}
	
	//走到这里说明后面没有大页的span了,这时就向堆申请一个128页的span
	Span* bigSpan = new Span;
	void* ptr = SystemAlloc(NPAGES - 1);
	bigSpan->_pageId = (PAGE_ID)ptr >> PAGE_SHIFT;
	bigSpan->_n = NPAGES - 1;

	_spanLists[bigSpan->_n].PushFront(bigSpan);

	//尽量避免代码重复,递归调用自己
	return NewSpan(k);
}

    当我们向堆申请到128页的span后,需要将其切分成k页的span和128-k页的span,但是为了尽量避免出现重复的代码,我们最好不要再编写对应的切分代码。我们可以先将申请到的128页的span挂到page cache对应的哈希桶中,然后再递归调用该函数就行了,此时在往后找span时就一定会在第128号桶中找到该span,然后进行切分。

这里其实有一个问题:当central cache向page cache申请内存时,central cache对应的哈希桶是处于加锁的状态的,那在访问page cache之前我们应不应该把central cache对应的桶锁解掉呢?

这里建议在访问page cache前,先把central cache对应的桶锁解掉。虽然此时central cache的这个桶当中是没有内存供其他thread cache申请的,但thread cache除了申请内存还会释放内存,如果在访问page cache前将central cache对应的桶锁解掉,那么此时当其他thread cache想要归还内存到central cache的这个桶时就不会被阻塞。

因此在调用NewSpan函数之前,我们需要先将central cache对应的桶锁解掉,然后再将page cache的大锁加上,当申请到k页的span后,我们需要将page cache的大锁解掉,但此时我们不需要立刻获取到central cache中对应的桶锁。因为central cache拿到k页的span后还会对其进行切分操作,因此我们可以在span切好后需要将其挂到central cache对应的桶上时,再获取对应的桶锁。

六、申请内存过程联调

ConcurrentAlloc()函数

对外提供一个Concurrent函数,用户内存的申请。每一个线程第一次调用时会通过TLS获取到自己专属的thread cache对象,比较TLS是维护在原生线程库当中的。然后每个线程就可以通过自己的thread cache申请对象了

static void* ConcurrentAlloc(size_t size)
{
	//通过TLS,每个线程无锁的获取自己专属的ThreadCache对象
	if (pTLSThreadCache == nullptr)
	{
		pTLSThreadCache = new ThreadCache;
	}
	return pTLSThreadCache->Allocate(size);
}

在编译的过程中出现的问题:我们在调用min去慢开始调节算法的时候。报错了,愿意是C++的algorithm中也有min这是一个模板,而Windows.h当中也有min,这是一个宏,而编译器优先选择了Windows.h下宏形式的min,但是我们的调用方式是std::min,那么这时候就会报错,因为Windows下没有,所以我们只能将std去掉。这也看出来没有命名空间进行封装的坏处,很可能重名

申请内存过程联调测试一

由于在多线程场景下调试起来观察特别麻烦,因为是并发,一会儿CPU又在调用这个线程,一会儿又调用那个线程,调用我们观察的时候,函数乱跳。这里就先不考虑多线程了。单线程跑通了再说。

下面该线程进行了三次内存申请,这三次内存申请的字节数最终都对齐到了8,此时当线程申请内存时就只会访问到thread cache的第0号桶。

void* p1 = ConcurrentAlloc(6);
void* p2 = ConcurrentAlloc(8);
void* p3 = ConcurrentAlloc(1);

当线程第一次申请内存时,该线程需要通过TLS获取到自己专属的thread cache对象,然后通过这个thread cache对象进行内存申请。

在这里插入图片描述
在申请内存时通过计算索引到了thread cache的第0号桶,但此时thread cache的第0号桶中是没有对象的,因此thread cache需要向central cache申请内存块。
在这里插入图片描述
在向central cache申请内存块前,首先通过NumMoveSize函数计算得出,thread cache一次最多可向central cache申请8字节大小对象的个数是512,但由于我们采用的是慢开始算法,因此还需要将上限值与对应自由链表的_maxSize的值进行比较,而此时对应自由链表_maxSize的值是1,所以最终得出本次thread cache向central cache申请8字节对象的个数是1个。(慢开始反馈调节算法计算申请内存块个数

并且在此之后会将该自由链表中_maxSize的值进行自增,下一次thread cache再向central cache申请8字节对象时最终申请对象的个数就会是2个了。
在这里插入图片描述
在thread cache向central cache申请对象之前,需要先将central cache的0号桶的锁加上,然后再从该桶获取一个非空的span。
在这里插入图片描述
在central cache的第0号桶获取非空span时,先遍历对应的span双链表,看看有没有非空的span,此时肯定是没有的
在这里插入图片描述
那么此时central cache就需要向page cache申请内存了,但在此之前需要先把central cache第0号桶的锁解掉,然后再将page cache的大锁给加上,之后才能向page cache申请内存。
在这里插入图片描述
在向page cache申请内存时,由于central cache一次给thread cache8字节对象的上限是512,对应就需要4096字节,所需字节数不足一页就按一页算,所以这里central cache就需要向page cache申请一页的内存块。
在这里插入图片描述

但此时page cache的第1个桶以及之后的桶当中都是没有span的,因此page cache需要直接向堆申请一个128页的span。
在这里插入图片描述
这里通过监视窗口可以看到,用于管理申请到的128页内存的span信息。
在这里插入图片描述
现在将申请到的128页的span插入到page cache的第128号桶当中,然后再调用一次NewSpan,在这次调用的时候,虽然在1号桶当中没有span,但是在往后找的过程中就一定会在第128号桶找到一个span。
在这里插入图片描述
此时我们就可以把这个128页的span拿出来,切分成1页的span和127页的span,将1页的span返回给central cache,而把127页的span挂到page cache的第127号桶即可。

在这里插入图片描述
从page cache返回后,就可以把page cache的大锁解掉了,但紧接着还要将获取到的1页的span进行切分,因此这里没有立刻重新加上central cache对应的桶锁。因为我们应该尽量的压缩临界区的大小,提高效率,因为是多线程

在这里插入图片描述
在进行切分的时候,先通过该span的起始页号得到该span的起始地址,然后通过该span的页数得到该span所管理内存块的总的字节数。(起始地址+总字节数)
在这里插入图片描述
在确定内存块的开始和结束后,就可以将其切分成一个个8字节大小的对象挂到该span的自由链表中了。在调试过程中通过内存监视窗口可以看到,切分出来的每个8字节大小的对象的前四个字节存储的都是下一个8字节对象的起始地址。
在这里插入图片描述
当切分结束后再获取central cache第0号桶的桶锁,然后将这个切好的span插入到central cache的第0号桶中,最后再将这个非空的span返回,此时就获取到了一个非空的span。
在这里插入图片描述
由于thread cache只向central cache申请了一个对象,因此拿到这个非空的span后,直接从这个span里面取出一个对象即可,此时该span的_useCount也由0变成了1。
在这里插入图片描述由于此时thread cache实际只向central cache申请到了一个对象,因此直接将这个对象返回给线程即可。
在这里插入图片描述


第二次申请内存块

当线程第二次申请内存块时就不会再创建thread cache了,因为第一次申请时就已经创建好了,此时该线程直接获取到对应的thread cache进行内存块申请即可。
在这里插入图片描述
 当该线程第二次申请8字节大小的对象时,此时thread cache的0号桶中还是没有对象的,因为第一次thread cache只向central cache申请了一个8字节对象,因此这次申请时还需要再向central cache申请对象。
在这里插入图片描述
这时thread cache向central cache申请对象时,thread cache第0号桶中自由链表的_maxSize已经慢增长到2了,所以这次在向central cache申请对象时就会申请2个。如果下一次thread cache再向central cache申请8字节大小的对象,那么central cache会一次性给thread cache3个,这就是所谓的慢增长。
在这里插入图片描述

但由于第一次central cache向page cache申请了一页的内存块,并将其切成了1024个8字节大小的对象,因此这次thread cache向central cache申请2两个8字节的对象时,central cache的第0号桶当中是有对象的,直接返回两个给thread cache即可,而不用再向page cache申请内存了。

但线程实际申请的只是一个8字节对象,因此thread cache将一个对象返回,其他的留在桶中
在这里插入图片描述


第三次申请

这样一来,当线程第三次申请1字节的内存时,由于1字节对齐后也是8字节,此时thread cache也就不需要再向central cache申请内存块了,直接将第0号桶当中之前剩下的一个8字节对象返回即可。
在这里插入图片描述

申请内存过程联调测试二

为了进一步测试代码的正确性,我们可以做这样一个测试:让线程申请1024次8字节的对象,然后通过调试观察在第1025次申请时,central cache是否会再向page cache申请内存块。

for (size_t i = 0; i < 1024; i++)
{
	void* p1 = ConcurrentAlloc(6);
}
void* p2 = ConcurrentAlloc(6);

因为central cache第一次就是向page cache申请的一页内存,这一页内存被切成了1024个8字节大小的对象,当这1024个对象全部被申请之后,再申请8字节大小的对象时central cache当中就没有对象了,此时就应该向page cache申请内存块。

通过调试我们可以看到,第1025次申请8字节大小的对象时,central cache第0号桶中的这个span的_useCount已经增加到了1024,也就是说这1024个对象都已经被线程申请了,此时central cache就需要再向page cache申请一页的span来进行切分了。
  在这里插入图片描述
而这次central cache在向page cache申请一页的内存时,page cache就是将127页span切分成了1页的span和126页的span了,然后central cache拿到这1页的span后,又会将其切分成1024块8字节大小的内存块以供thread cache申请。
在这里插入图片描述

thread cache回收内存

当某个线程申请的内存块不用了,就可以将其释放给thread cache,也就是直接找到对应的桶,挂在自由链表里面即可。

随着系统负荷的减少,线程也会不断的释放,对饮自由链表的长度也会越来越长,堆积在thread cache当中就是一种浪费,因为释放多了本身就代表这部分内存不是很需要了对于这个线程来说,那么我们就应该把它还给central cache,然后central cache来均衡调度,决定是要分给需要的线程还是继续向上进行释放。

我们选择的策略是自由链表到达一定长度就进行释放(比如大于我们一次性向申请的长度)

//thread cache释放内存
void ThreadCache::Deallocate(void* ptr, size_t size)
{
	assert(ptr);
	assert(size <= MAX_BYTES);

	//找出对应的自由链表桶将对象插入
	size_t index = SizeClass::Index(size);
	_freeLists[index].Push(ptr);

	//当自由链表长度大于一次批量申请的对象个数时就开始还一段list给central cache
	if (_freeLists[index].Size() >= _freeLists[index].MaxSize())
	{
	
	//释放给	central cache
	ListTooLong(_freeLists[index], size);
	}
}

自由链表的长度大于一次批量申请的对象时,我们具体的做法就是,从该自由链表中取出一次批量个数的对象,然后将取出的这些对象还给central cache中对应的span即可。

而还给central cache这里需要注意的是,不是找到对应大小的桶就随便找一个span还了,每一个span都有对应的起始页号和页的数量,这就可以得出一个地址区间。而我们进行归还就是要找在哪个地址区间。(这是在为归还给page cache做准备。而且一个span的定义本来就是多少页的连续大空间)

//释放对象导致链表过长,回收内存到中心缓存
void ThreadCache::ListTooLong(FreeList& list, size_t size)
{
	void* start = nullptr;
	void* end = nullptr;
	
	//从list中取出一次批量个数的对象
	list.PopRange(start, end, list.MaxSize());
	
	//将取出的对象还给central cache中对应的span
	CentralCache::GetInstance()->ReleaseListToSpans(start, size);
}

从上述代码可以看出,FreeList类需要支持用Size函数获取自由链表中对象的个数,还需要支持用PopRange函数从自由链表中取出指定个数的对象。因此我们需要给FreeList类增加一个对应的PopRange函数,然后再增加一个_size成员变量,该成员变量用于记录当前自由链表中对象的个数,当我们向自由链表插入或删除对象时,都应该更新_size的值。

//管理切分好的小对象的自由链表
class FreeList
{
public:
	//将释放的对象头插到自由链表
	void Push(void* obj)
	{
		assert(obj);

		//头插
		NextObj(obj) = _freeList;
		_freeList = obj;
		_size++;
	}
	//从自由链表头部获取一个对象
	void* Pop()
	{
		assert(_freeList);

		//头删
		void* obj = _freeList;
		_freeList = NextObj(_freeList);
		_size--;

		return obj;
	}
	//插入一段范围的对象到自由链表
	void PushRange(void* start, void* end, size_t n)
	{
		assert(start);
		assert(end);

		//头插
		NextObj(end) = _freeList;
		_freeList = start;
		_size += n;
	}
	//从自由链表获取一段范围的对象
	void PopRange(void*& start, void*& end, size_t n)
	{
		assert(n <= _size);

		//头删
		start = _freeList;
		end = start;
		for (size_t i = 0; i < n - 1;i++)
		{
			end = NextObj(end);
		}
		_freeList = NextObj(end); //自由链表指向end的下一个对象
		NextObj(end) = nullptr; //取出的一段链表的表尾置空
		_size -= n;
	}
	bool Empty()
	{
		return _freeList == nullptr;
	}
	size_t& MaxSize()
	{
		return _maxSize;
	}
	size_t Size()
	{
		return _size;
	}
private:
	void* _freeList = nullptr; //自由链表
	size_t _maxSize = 1;
	size_t _size = 0;
};

而对于FreeList类当中的PushRange成员函数,我们最好也像PopRange一样给它增加一个参数,表示插入对象的个数,不然我们这时还需要通过遍历统计插入对象的个数。

因此之前在调用PushRange的地方就需要修改一下,而我们实际就在一个地方调用过PushRange函数,并且此时插入对象的个数也是很容易知道的。当时thread cache从central cache获取了actualNum个对象,将其中的一个返回给了申请对象的线程,剩下的actualNum-1个挂到了thread cache对应的桶当中,所以这里插入对象的个数就是actualNum-1。

_freeLists[index].PushRange(NextObj(start), end, actualNum - 1);

说明:
  这里在设计PopRange接口时是取出指定个数的对象,因为在某些情况下当自由链表过长时,我们可能并不一定想把链表中全部的对象都取出来还给central cache,这样设计就是为了增加代码的可修改性。

我们判断thread cache是否应该还对象给central cache时,还可以综合考虑每个thread cache整体的大小。比如当某个thread cache的总占用大小超过一定阈值时,我们就将该thread cache当中的对象还一些给central cache,这样就尽量避免了某个线程的thread cache占用太多的内存。对于这一点,在tcmalloc当中就是考虑到了的。


central cache 回收内存

如何找到对应的span

我们已经可以通过地址来算出属于哪个页了,那我们怎么去找到对应的Span呢,难道我们需要去central遍历某个桶吗,那太慢了,所以我们在PageCache中存储了一个unordered_map来存储页号和Span*的对应关系,就可以很轻松的找到span

//单例模式
class PageCache
{
public:
	//获取从对象到span的映射
	Span* MapObjectToSpan(void* obj);
private:
	std::unordered_map<PAGE_ID, Span*> _idSpanMap;
};
//将一定数量的对象还给对应的span
void CentralCache::ReleaseListToSpans(void* start, size_t size)
{
	size_t index = SizeClass::Index(size);
	_spanLists[index]._mtx.lock(); //加锁
	while (start)
	{
		void* next = NextObj(start); //记录下一个
		Span* span = PageCache::GetInstance()->MapObjectToSpan(start);
		//将对象头插到span的自由链表
		NextObj(start) = span->_freeList;
		span->_freeList = start;

		span->_useCount--; //更新被分配给thread cache的计数
		if (span->_useCount == 0) //说明这个span分配出去的对象全部都回来了
		{
			//此时这个span就可以再回收给page cache,page cache可以再尝试去做前后页的合并
			_spanLists[index].Erase(span);
			span->_freeList = nullptr; //自由链表置空
			span->_next = nullptr;
			span->_prev = nullptr;

			//释放span给page cache时,使用page cache的锁就可以了,这时把桶锁解掉
			_spanLists[index]._mtx.unlock(); //解桶锁
			PageCache::GetInstance()->_pageMtx.lock(); //加大锁
			PageCache::GetInstance()->ReleaseSpanToPageCache(span);
			PageCache::GetInstance()->_pageMtx.unlock(); //解大锁
			_spanLists[index]._mtx.lock(); //加桶锁
		}

		start = next;
	}

	_spanLists[index]._mtx.unlock(); //解锁
}

thread cache将内存块还回来之后,因为可能它申请的属于不同span,所以还给不同的span,不过span当中又_useCount的计数器,当计数器减到0之后,就表明都还回来了。

那么此时就说明负荷不高,我们应该还些内存给page cache。

我们将一个小内存块还给span的时候是不需要管是怎么换回来,直接连到后面的自由链表就可以了,因为只要_usecount减为0,那么就代表这是一整块内存,自由链表表示的是里面存放的值是什么,所以我们整块内存都有了,没必要在乎里面存的值

并且在central cache还span给page cache时也存在锁的问题,此时需要先将central cache中对应的桶锁解掉,然后再加上page cache的大锁之后才能进入page cache进行相关操作,当处理完毕回到central cache时,除了将page cache的大锁解掉,还需要立刻获得central cache对应的桶锁,然后将还未还完对象继续还给central cache中对应的span。


page cache 回收内存

如果central cache中有某个span的_useCount减到0了,那么central cache就需要将这个span还给page cache了。

但是因为page cache的分裂和合并的思想,为了缓解内存碎片的问题,page cache还需要尝试将还回来的span与其他空闲的span进行合并。(找到连续的span才会进行合并,这是在解决外碎片的问题)

page cache前后页的合并

    合并的过程可以分为向前合并和向后合并。如果还回来的span的起始页号是num,该span所管理的页数是n。那么在向前合并时,就需要判断第num-1页对应span是否空闲,如果空闲则可以将其进行合并,并且合并后还需要继续向前尝试进行合并,直到不能进行合并为止。而在向后合并时,就需要判断第num+n页对应的span是否空闲,如果空闲则可以将其进行合并,并且合并后还需要继续向后尝试进行合并,直到不能进行合并为止。(我们每一次都要尝试去合并,尽量的缓解外碎片问题)

我们就需要一个字段来标识这个span是否空闲,以便我们进行合并

//管理以页为单位的大块内存
struct Span
{
	PAGE_ID _pageId = 0;        //大块内存起始页的页号
	size_t _n = 0;              //页的数量

	Span* _next = nullptr;      //双链表结构
	Span* _prev = nullptr;

	size_t _useCount = 0;       //切好的小块内存,被分配给thread cache的计数
	void* _freeList = nullptr;  //切好的小块内存的自由链表

	bool _isUse = false;        //是否在被使用
};

由于在合并page cache当中的span时,需要通过页号找到其对应的span,而一个span是在被分配给central cache时,才建立的各个页号与span之间的映射关系,因此page cache当中的span也需要建立页号与span之间的映射关系。

与central cache中的span不同的是,在page cache中,只需建立一个span的首尾页号与该span之间的映射关系。因为当一个span在尝试进行合并时,如果是往前合并,那么只需要通过一个span的尾页找到这个span,如果是向后合并,那么只需要通过一个span的首页找到这个span。也就是说,在进行合并时我们只需要用到span与其首尾页之间的映射关系就够了。

因此当我们申请k页的span时,如果是将n页的span切成了一个k页的span和一个n-k页的span,我们除了需要建立k页span中每个页与该span之间的映射关系之外,还需要建立剩下的n-k页的span与其首尾页之间的映射关系。

//获取一个k页的span
Span* PageCache::NewSpan(size_t k)
{
	assert(k > 0 && k < NPAGES);
	//先检查第k个桶里面有没有span
	if (!_spanLists[k].Empty())
	{
		Span* kSpan = _spanLists[k].PopFront();

		//建立页号与span的映射,方便central cache回收小块内存时查找对应的span
		for (PAGE_ID i = 0; i < kSpan->_n; i++)
		{
			_idSpanMap[kSpan->_pageId + i] = kSpan;
		}

		return kSpan;
	}
	//检查一下后面的桶里面有没有span,如果有可以将其进行切分
	for (size_t i = k + 1; i < NPAGES; i++)
	{
		if (!_spanLists[i].Empty())
		{
			Span* nSpan = _spanLists[i].PopFront();
			Span* kSpan = new Span;
			//在nSpan的头部切k页下来
			kSpan->_pageId = nSpan->_pageId;
			kSpan->_n = k;

			nSpan->_pageId += k;
			nSpan->_n -= k;
			//将剩下的挂到对应映射的位置
			_spanLists[nSpan->_n].PushFront(nSpan);
			//存储nSpan的首尾页号与nSpan之间的映射,方便page cache合并span时进行前后页的查找
			_idSpanMap[nSpan->_pageId] = nSpan;
			_idSpanMap[nSpan->_pageId + nSpan->_n - 1] = nSpan;

			//建立页号与span的映射,方便central cache回收小块内存时查找对应的span
			for (PAGE_ID i = 0; i < kSpan->_n; i++)
			{
				_idSpanMap[kSpan->_pageId + i] = kSpan;
			}

			return kSpan;
		}
	}
	//走到这里说明后面没有大页的span了,这时就向堆申请一个128页的span
	Span* bigSpan = new Span;
	void* ptr = SystemAlloc(NPAGES - 1);
	bigSpan->_pageId = (PAGE_ID)ptr >> PAGE_SHIFT;
	bigSpan->_n = NPAGES - 1;

	_spanLists[bigSpan->_n].PushFront(bigSpan);

	//尽量避免代码重复,递归调用自己
	return NewSpan(k);
}

此时page cache当中的span就都与其首尾页之间建立了映射关系,现在我们就可以进行span的合并了

//释放空闲的span回到PageCache,并合并相邻的span
void PageCache::ReleaseSpanToPageCache(Span* span)
{
	//对span的前后页,尝试进行合并,缓解内存碎片问题
	//1、向前合并
	while (1)
	{
		PAGE_ID prevId = span->_pageId - 1;
		auto ret = _idSpanMap.find(prevId);
		//前面的页号没有(还未向系统申请),停止向前合并
		if (ret == _idSpanMap.end())
		{
			break;
		}
		//前面的页号对应的span正在被使用,停止向前合并
		Span* prevSpan = ret->second;
		if (prevSpan->_isUse == true)
		{
			break;
		}
		//合并出超过128页的span无法进行管理,停止向前合并
		if (prevSpan->_n + span->_n > NPAGES - 1)
		{
			break;
		}
		//进行向前合并
		span->_pageId = prevSpan->_pageId;
		span->_n += prevSpan->_n;

		//将prevSpan从对应的双链表中移除
		_spanLists[prevSpan->_n].Erase(prevSpan);

		delete prevSpan;
	}
	//2、向后合并
	while (1)
	{
		PAGE_ID nextId = span->_pageId + span->_n;
		auto ret = _idSpanMap.find(nextId);
		//后面的页号没有(还未向系统申请),停止向后合并
		if (ret == _idSpanMap.end())
		{
			break;
		}
		//后面的页号对应的span正在被使用,停止向后合并
		Span* nextSpan = ret->second;
		if (nextSpan->_isUse == true)
		{
			break;
		}
		//合并出超过128页的span无法进行管理,停止向后合并
		if (nextSpan->_n + span->_n > NPAGES - 1)
		{
			break;
		}
		//进行向后合并
		span->_n += nextSpan->_n;

		//将nextSpan从对应的双链表中移除
		_spanLists[nextSpan->_n].Erase(nextSpan);

		delete nextSpan;
	}
	//将合并后的span挂到对应的双链表当中
	_spanLists[span->_n].PushFront(span);
	//建立该span与其首尾页的映射
	_idSpanMap[span->_pageId] = span;
	_idSpanMap[span->_pageId + span->_n - 1] = span;
	//将该span设置为未被使用的状态
	span->_isUse = false;
}

需要注意的是,在向前或向后进行合并的过程中:

  • 如果没有通过页号获取到其对应的span,说明对应到该页的内存块还未申请,此时需要停止合并。

  • 如果通过页号获取到了其对应的span,但该span处于被使用的状态,那我们也必须停止合并。

  • 如果合并后大于128页则不能进行本次合并,因为page cache无法对大于128页的span进行管理。

除此之外,在合并结束后,除了将合并后的span挂到page cache对应哈希桶的双链表当中,还需要建立该span与其首位页之间的映射关系,便于此后合并出更大的span。


七、释放内存过程联调

ConcurrentFree()函数

我们将thread cache、central cache以及page cache的释放流程也都写完了,此时我们就可以提供一个ConcurrentFree函数,用于释放 。

static void ConcurrentFree(void* ptr, size_t size/*暂时*/)
{
	assert(pTLSThreadCache);

	pTLSThreadCache->Deallocate(ptr, size);
}

释放内存过程联调测试

之前我们测试申请的时候,用单线程进行了3次内存申请,现在我们再进行3次释放

void* p1 = ConcurrentAlloc(6);
void* p2 = ConcurrentAlloc(8);
void* p3 = ConcurrentAlloc(1);

ConcurrentFree(p1, 6);
ConcurrentFree(p2, 8);
ConcurrentFree(p3, 1);

首先,这三次申请和释放的对象大小进行对齐后都应该是8字节。因此对应操作的就是thread cache和central cache的0号桶,page cache的1号桶

由于第三次对象申请时,刚好将thread cache当中仅剩的一个对象拿走了,因此在三次对象申请后thread cache的第0号桶是没有对象的(因为我们是慢开始算法,第一次申请一块,第二次申请两块,所以thread cache申请三次正好用完)。

当我们第一次释放的时候,此时thread cache0号桶中自由链表的_maxSize已经慢增长到了3,而0号桶的自由链表长度才是1,所以并不会向上回收。
在这里插入图片描述
第二次释放也是类似
在这里插入图片描述
直到第三次释放,自由链表长度等于我们慢开始的_maxSize,所以这时候thread cache就会释放,让central cache回收。
在这里插入图片描述thread cache将第0号桶当中的内存块弹出MaxSize个,也就是全部弹出,此时该自由链表_size的值变为0,然后继续调用central cache当中的ReleaseListToSpans函数,将这三个对象还给central cache当中对应的span。
在这里插入图片描述
在进入central cache的第0号桶还对象之前,先把第0号桶对应的桶锁加上,然后通过查page cache中的映射表找到其对应的span,最后将这个对象头插到该span的自由链表中,并将该span的_useCount进行–。当第一个对象还给其对应的span时,可以看到该span的_useCount减到了2。
在这里插入图片描述
而由于我们只进行了三次对象申请,并且这些对象大小对齐后大小都是8字节,因此我们申请的这三个对象实际都是同一个span切分出来的。当我们将这三个对象都还给这个span时,该span的_useCount就减为了0。

在这里插入图片描述

现在central cache就需要将这个span进一步还给page cache,而在将该span交给page cache之前,会将该span的自由链表以及前后指针都置空。并且在进入page cache之前会先将central cache第0号桶的桶锁解掉,然后再加上page cache的大锁,之后才能进入page cache进行相关操作。
在这里插入图片描述

由于这个一页的span是从128页的span的头部切下来的,在向前合并时由于前面的页还未向系统申请,因此在查映射关系时是无法找到的,此时直接停止了向前合并。

而在向后合并时,由于page cache没有将该页后面的页分配给central cache,因此在向后合并时肯定能够找到一个127页的span进行合并。合并后就变成了一个128页的span,这时我们将原来127页的span从第127号桶删除,然后还需要将该127页的span结构进行delete,因为它管理的127页已经与1页的span进行合并了,不再需要它来管理了。(重新调试后地址变了)
在这里插入图片描述
紧接着将这个128页的span插入到第128号桶,然后建立该span与其首尾页的映射,便于下次被用于合并,最后再将该span的状态设置为未被使用的状态即可。
在这里插入图片描述
当从page cache回来后,除了将page cache的大锁解掉,还需要立刻加上central cache中对应的桶锁,然后继续将对象还给central cache中的span,但此时实际上是还完了,因此再将central cache的桶锁解掉就行了。
在这里插入图片描述
完成了 三个对象的申请和释放流程

大于256KB的大块内存线程申请问题

申请过程

之前说过,每个线程的thread cache是用于申请小于等于256KB的内存的,而对于大与256KB的内存,我们可以考虑直接向page cache申请,但是page cache中最大的页也就只有128页,因此如果是比128页还大的内存,就只能直接向堆申请了。

申请内存的大小申请的对象
x<=256KBthread cache
256KB<x<=128页page cache
x>128页

我们之前面对大于256KB的申请是直接assert掉的,现在我们需要做下修改

//获取向上对齐后的字节数
static inline size_t RoundUp(size_t bytes)
{
	if (bytes <= 128)
	{
		return _RoundUp(bytes, 8);
	}
	else if (bytes <= 1024)
	{
		return _RoundUp(bytes, 16);
	}
	else if (bytes <= 8 * 1024)
	{
		return _RoundUp(bytes, 128);
	}
	else if (bytes <= 64 * 1024)
	{
		return _RoundUp(bytes, 1024);
	}
	else if (bytes <= 256 * 1024)
	{
		return _RoundUp(bytes, 8 * 1024);
	}
	else
	{
		//大于256KB的按页对齐
		return _RoundUp(bytes, 1 << PAGE_SHIFT);
	}
}

申请逻辑的修改

static void* ConcurrentAlloc(size_t size)
{
	if (size > MAX_BYTES) //大于256KB的内存申请
	{
		//计算出对齐后需要申请的页数
		size_t alignSize = SizeClass::RoundUp(size);//字节数
		size_t kPage = alignSize >> PAGE_SHIFT;//页数

		//向page cache申请kPage页的span
		PageCache::GetInstance()->_pageMtx.lock();
		Span* span = PageCache::GetInstance()->NewSpan(kPage);
		PageCache::GetInstance()->_pageMtx.unlock();

		void* ptr = (void*)(span->_pageId << PAGE_SHIFT);
		return ptr;//一个大内存拿去使用
	}
	else
	{
		//通过TLS,每个线程无锁的获取自己专属的ThreadCache对象
		if (pTLSThreadCache == nullptr)
		{
			pTLSThreadCache = new ThreadCache;
		}
		cout << std::this_thread::get_id() << ":" << pTLSThreadCache << endl;

		return pTLSThreadCache->Allocate(size);
	}
}

我们只所有可以直接向page cache申请就是因为我们有unordered_map可以找到对于的页号和span的对于关系,那么这一块内存我们就可以自己管理,然后释放的时候返回给page cache一个span。

向page cache就需要调用NewSpan
大于128直接找堆,但是从堆这一块申请下来的空间也需要管理好。

//获取一个k页的span
Span* PageCache::NewSpan(size_t k)
{
	assert(k > 0);
	if (k > NPAGES - 1) //大于128页直接找堆申请
	{
		void* ptr = SystemAlloc(k);
		Span* span = new Span;
		span->_pageId = (PAGE_ID)ptr >> PAGE_SHIFT;
		span->_n = k;
		//建立页号与span之间的映射
		_idSpanMap[span->_pageId] = span;
		return span;
	}
	//先检查第k个桶里面有没有span
	if (!_spanLists[k].Empty())
	{
		Span* kSpan = _spanLists[k].PopFront();

		//建立页号与span的映射,方便central cache回收小块内存时查找对应的span
		for (PAGE_ID i = 0; i < kSpan->_n; i++)
		{
			_idSpanMap[kSpan->_pageId + i] = kSpan;
		}

		return kSpan;
	}
	//检查一下后面的桶里面有没有span,如果有可以将其进行切分
	for (size_t i = k + 1; i < NPAGES; i++)
	{
		if (!_spanLists[i].Empty())
		{
			Span* nSpan = _spanLists[i].PopFront();
			Span* kSpan = new Span;
			//在nSpan的头部切k页下来
			kSpan->_pageId = nSpan->_pageId;
			kSpan->_n = k;

			nSpan->_pageId += k;
			nSpan->_n -= k;
			//将剩下的挂到对应映射的位置
			_spanLists[nSpan->_n].PushFront(nSpan);
			//存储nSpan的首尾页号与nSpan之间的映射,方便page cache合并span时进行前后页的查找
			_idSpanMap[nSpan->_pageId] = nSpan;
			_idSpanMap[nSpan->_pageId + nSpan->_n - 1] = nSpan;

			//建立页号与span的映射,方便central cache回收小块内存时查找对应的span
			for (PAGE_ID i = 0; i < kSpan->_n; i++)
			{
				_idSpanMap[kSpan->_pageId + i] = kSpan;
			}

			return kSpan;
		}
	}
	//走到这里说明后面没有大页的span了,这时就向堆申请一个128页的span
	Span* bigSpan = new Span;
	void* ptr = SystemAlloc(NPAGES - 1);
	bigSpan->_pageId = (PAGE_ID)ptr >> PAGE_SHIFT;
	bigSpan->_n = NPAGES - 1;

	_spanLists[bigSpan->_n].PushFront(bigSpan);

	//尽量避免代码重复,递归调用自己
	return NewSpan(k);
}

释放过程

释放内存大小释放的对象
x<=256KBthread cache
256KB<x<=128页page cache
x>128页

因此当释放对象时,我们需要先找到该对象对应的span,但是在释放对象时我们只知道该对象的起始地址。这也就是我们在申请大于256KB的内存时,也要给申请到的内存建立span结构,并建立起始页号与该span之间的映射关系的原因。此时我们就可以通过释放对象的起始地址计算出起始页号,进而通过页号找到该对象对应的span。

static void ConcurrentFree(void* ptr, size_t size)
{
	if (size > MAX_BYTES) //大于256KB的内存释放
	{
		Span* span = PageCache::GetInstance()->MapObjectToSpan(ptr);

		PageCache::GetInstance()->_pageMtx.lock();
		PageCache::GetInstance()->ReleaseSpanToPageCache(span);
		PageCache::GetInstance()->_pageMtx.unlock();
	}
	else
	{
		assert(pTLSThreadCache);
		pTLSThreadCache->Deallocate(ptr, size);
	}
}

因此page cache在回收span时也需要进行判断,如果该span的大小是小于等于128页的,那么直接还给page cache进行了,page cache会尝试对其进行合并。而如果该span的大小是大于128页的,那么说明该span是直接向堆申请的,我们直接将这块内存释放给堆,调用封装好的SystemFree(),然后将这个span结构进行delete就行了。

//释放空闲的span回到PageCache,并合并相邻的span
void PageCache::ReleaseSpanToPageCache(Span* span)
{
	if (span->_n > NPAGES - 1) //大于128页直接释放给堆
	{
		void* ptr = (void*)(span->_pageId << PAGE_SHIFT);
		SystemFree(ptr);
		delete span;
		return;
	}
	//对span的前后页,尝试进行合并,缓解内存碎片问题
	//1、向前合并
	while (1)
	{
		PAGE_ID prevId = span->_pageId - 1;
		auto ret = _idSpanMap.find(prevId);
		//前面的页号没有(还未向系统申请),停止向前合并
		if (ret == _idSpanMap.end())
		{
			break;
		}
		//前面的页号对应的span正在被使用,停止向前合并
		Span* prevSpan = ret->second;
		if (prevSpan->_isUse == true)
		{
			break;
		}
		//合并出超过128页的span无法进行管理,停止向前合并
		if (prevSpan->_n + span->_n > NPAGES - 1)
		{
			break;
		}
		//进行向前合并
		span->_pageId = prevSpan->_pageId;
		span->_n += prevSpan->_n;

		//将prevSpan从对应的双链表中移除
		_spanLists[prevSpan->_n].Erase(prevSpan);

		delete prevSpan;
	}
	//2、向后合并
	while (1)
	{
		PAGE_ID nextId = span->_pageId + span->_n;
		auto ret = _idSpanMap.find(nextId);
		//后面的页号没有(还未向系统申请),停止向后合并
		if (ret == _idSpanMap.end())
		{
			break;
		}
		//后面的页号对应的span正在被使用,停止向后合并
		Span* nextSpan = ret->second;
		if (nextSpan->_isUse == true)
		{
			break;
		}
		//合并出超过128页的span无法进行管理,停止向后合并
		if (nextSpan->_n + span->_n > NPAGES - 1)
		{
			break;
		}
		//进行向后合并
		span->_n += nextSpan->_n;

		//将nextSpan从对应的双链表中移除
		_spanLists[nextSpan->_n].Erase(nextSpan);

		delete nextSpan;
	}
	//将合并后的span挂到对应的双链表当中
	_spanLists[span->_n].PushFront(span);
	//建立该span与其首尾页的映射
	_idSpanMap[span->_pageId] = span;
	_idSpanMap[span->_pageId + span->_n - 1] = span;
	//将该span设置为未被使用的状态
	span->_isUse = false;
}

申请与释放系统调用

在Windows下,我们直接向堆申请可以调用VirtualAlloc() ,释放给堆可以调用VirtualFree()
在Linux下,向堆申请使用brk(),mmap(),释放给堆使用sbrk()和unmmap();

//直接将内存还给堆
inline static void SystemFree(void* ptr)
{
#ifdef _WIN32
	VirtualFree(ptr, 0, MEM_RELEASE);
#else
	//linux下sbrk unmmap等
#endif
}

简单测试
下面我们对大于256KB的申请释放流程进行简单的测试:

/找page cache申请
void* p1 = ConcurrentAlloc(257 * 1024); //257KB
ConcurrentFree(p1, 257 * 1024);

//找堆申请
void* p2 = ConcurrentAlloc(129 * 8 * 1024); //129页
ConcurrentFree(p2, 129 * 8 * 1024);

当申请257KB的内存时,由于257KB的内存按页向上对齐后是33页,并没有大于128页,因此不会直接向堆进行申请,会向page cache申请内存,但此时page cache当中实际是没有内存的,最终page cache就会向堆申请一个128页的span,将其切分成33页的span和95页的span,并将33页的span进行返回。(257KB也会被对齐)

在这里插入图片描述

而在释放内存时,由于该对象的大小大于了256KB,因此不会将其还给thread cache,而是直接调用的page cache当中的释放接口直接释放给page cache。

在这里插入图片描述

由于该对象的大小是33页,不大于128页,因此page cache也不会直接将该对象还给堆,而是尝试对其进行合并,最终就会把这个33页的span和之前剩下的95页的span进行合并,最终将合并后的128页的span挂到第128号桶中。
在这里插入图片描述


当申请129页的内存时,由于是大于256KB的,于是还是调用的page cache对应的申请接口,但此时申请的内存同时也大于128页,因此会直接向堆申请。在申请后还会建立该span与其起始页号之间的映射,便于释放时可以通过页号找到该span。

在这里插入图片描述

在释放内存时,通过对象的地址找到其对应的span,从span结构中得知释放内存的大小大于128页,于是会将该内存直接还给堆。

在这里插入图片描述

使用定长内存池脱离使用new

因为tcmalloc是要在高并发场景下替代malloc进行内存申请的,因此tcmalloc在实现的时候,其内部是不能去调用malloc,new等的。不调用new是因为底层是malloc。

为了完全脱离掉malloc函数,此时我们之前实现的定长内存池就起作用了(相当于一个小组件),代码中使用new时基本都是为Span结构的对象申请空间,而span对象基本都是在page cache层创建的,因此我们可以在PageCache类当中定义一个_spanPool,用于span对象的申请和释放。

其实本质也就是malloc,new等换成更底层的系统调用,只不过我们封装过定长内存池,就直接用了,而且它还可以进行初始化,何乐而不为

//单例模式
class PageCache
{
public:
	//...
private:
	ObjectPool<Span> _spanPool;
};

然后将代码中使用new的地方调用定长内存池当中的New函数,将代码中使用delete的地方替换为定长内存池当中的Delete。定长内存池的函数设计的时候是直接操作堆的

//申请span对象
Span* span = _spanPool.New();
//释放span对象
_spanPool.Delete(span);

每个线程创建其专属的thread cache,而这个thread cache目前也是new出来的,我们也需要对其进行替换。

//通过TLS,每个线程无锁的获取自己专属的ThreadCache对象
if (pTLSThreadCache == nullptr)
{
	static std::mutex tcMtx;
	static ObjectPool<ThreadCache> tcPool;
	tcMtx.lock();
	pTLSThreadCache = tcPool.New();
	tcMtx.unlock();
}

最后在SpanList的构造函数中也用到了new,因为SpanList是带头循环双向链表,所以在构造期间我们需要申请一个span对象作为双链表的头结点。

//带头双向循环链表
class SpanList
{
public:
	SpanList()
	{
		_head = _spanPool.New();
		_head->_next = _head;
		_head->_prev = _head;
	}
private:
	Span* _head;
	static ObjectPool<Span> _spanPool;
};

释放内存时怎么优化成不需要传大小

当我们使用malloc申请内存时,需要指明内存大小,而当我们使用free的时候,却不需要传大小,只需要这块空间的指针

而我们现在的内存池,在释放内存时需要指针和大小

void* p1 = ConcurrentAlloc(6);
void* p2 = ConcurrentAlloc(8);
void* p3 = ConcurrentAlloc(1);

ConcurrentFree(p1, 6);
ConcurrentFree(p2, 8);
ConcurrentFree(p3, 1);

原因如下:

  • 如果释放的是大于256KB的内存,需要根据对象的大小来判断这块内存到底还给谁。
  • 如果释放的是<=256KB的对象,需要根据对象的大小计算出应该还给thread cache的哪一个哈希桶

如果我们想做到不需要传入大小,那么就需要建立对象地址与对象大小之间的映射,由于现在可以通过页号(也可以说是对象的地址)找到对应的span,而span中挂的都是相同大小的对象。因此我们可以在span中再加一个_objSize成员,该成员代表着这个span管理的内存块被切成的一个个对象的大小

//管理以页为单位的大块内存
struct Span
{
	PAGE_ID _pageId = 0;        //大块内存起始页的页号
	size_t _n = 0;              //页的数量

	Span* _next = nullptr;      //双链表结构
	Span* _prev = nullptr;

	size_t _objSize = 0;        //切好的小对象的大小
	size_t _useCount = 0;       //切好的小块内存,被分配给thread cache的计数
	void* _freeList = nullptr;  //切好的小块内存的自由链表

	bool _isUse = false;        //是否在被使用
};

而所有的span都是从page cache中拿出来的,因此每当我们调用NewSpan获得一个k页的span时,就应该将这个span的_objSize保存下来。

Span* span = PageCache::GetInstance()->NewSpan(SizeClass::NumMovePage(size));
span->_objSize = size;

因为我们可以通过地址找到对应的span,里面又有被分割的大小,那么我们就可以直到该free掉多少了。

此时当我们释放对象时,就可以直接从对象的span中获取到该对象的大小,准确来说获取到的是对齐以后的大小。

static void ConcurrentFree(void* ptr)
{
	Span* span = PageCache::GetInstance()->MapObjectToSpan(ptr);
	size_t size = span->_objSize;
	if (size > MAX_BYTES) //大于256KB的内存释放
	{
		PageCache::GetInstance()->_pageMtx.lock();
		PageCache::GetInstance()->ReleaseSpanToPageCache(span);
		PageCache::GetInstance()->_pageMtx.unlock();
	}
	else
	{
		assert(pTLSThreadCache);
		pTLSThreadCache->Deallocate(ptr, size);
	}
}

读取映射关系时的加锁问题

我们将页号与span之间的映射关系存储在PageCache类当中,当我们访问这个映射关系的时候是需要加锁的,因为STL容器他是不保证线程安全的。

对于当前的代码来说,如果我们正在page cache进行相关操作,那么访问这个映射关系时安全的,因为当进入page cache之前是需要加锁的

如果我们是在central cache访问这个映射关系,或是在调用ConcurrentFree函数释放内存时访问这个映射关系,就存在线程安全问题。因为此时可能其他线程正在page cache中进行某些操作。并且该线程此时可能也在访问这个映射关系,因此当我们在page cache外部访问这个映射关系时是需要加锁的。

实际就是在调用page cache对外提供访问映射关系的函数时需要加锁,这里我们可以考虑使用C++当中的unique_lock,当然你也可以用普通的锁。unique_lock就是说他是RAII的。(C++的锁都不允许拷贝)

//获取从对象到span的映射
Span* PageCache::MapObjectToSpan(void* obj)
{
	PAGE_ID id = (PAGE_ID)obj >> PAGE_SHIFT; //页号

	std::unique_lock<std::mutex> lock(_pageMtx); //构造时加锁,析构时自动解锁
	auto ret = _idSpanMap.find(id);
	if (ret != _idSpanMap.end())
	{
		return ret->second;
	}
	else
	{
		assert(false);
		return nullptr;
	}
}

多线程环境下对比malloc测试

之前我们只是对代码进行了一些基础的单元测试,下面我们在多线程场景下对比malloc进行测试。

void BenchmarkMalloc(size_t ntimes, size_t nworks, size_t rounds)
{
	std::vector<std::thread> vthread(nworks);
	std::atomic<size_t> malloc_costtime = 0;
	std::atomic<size_t> free_costtime = 0;
	for (size_t k = 0; k < nworks; ++k)
	{
		vthread[k] = std::thread([&, k]() {
			std::vector<void*> v;
			v.reserve(ntimes);//n次开辟的空间存储起来
			for (size_t j = 0; j < rounds; ++j)
			{
				size_t begin1 = clock();
				for (size_t i = 0; i < ntimes; i++)
				{
					v.push_back(malloc(16));
					//v.push_back(malloc((16 + i) % 8192 + 1));
				}
				size_t end1 = clock();
				size_t begin2 = clock();
				for (size_t i = 0; i < ntimes; i++)
				{
					free(v[i]);
				}
				size_t end2 = clock();
				v.clear();
				malloc_costtime += (end1 - begin1);
				free_costtime += (end2 - begin2);
			}
		});
	}
	for (auto& t : vthread)
	{
		t.join();
	}
	printf("%u个线程并发执行%u轮次,每轮次malloc %u次: 花费:%u ms\n",
		nworks, rounds, ntimes, malloc_costtime);
	printf("%u个线程并发执行%u轮次,每轮次free %u次: 花费:%u ms\n",
		nworks, rounds, ntimes, free_costtime);
	printf("%u个线程并发malloc&free %u次,总计花费:%u ms\n",
		nworks, nworks*rounds*ntimes, malloc_costtime + free_costtime);
}

void BenchmarkConcurrentMalloc(size_t ntimes, size_t nworks, size_t rounds)
{
	std::vector<std::thread> vthread(nworks);
	std::atomic<size_t> malloc_costtime = 0;
	std::atomic<size_t> free_costtime = 0;
	for (size_t k = 0; k < nworks; ++k)
	{
		vthread[k] = std::thread([&]() {
			std::vector<void*> v;
			v.reserve(ntimes);
			for (size_t j = 0; j < rounds; ++j)
			{
				size_t begin1 = clock();
				for (size_t i = 0; i < ntimes; i++)
				{
					v.push_back(ConcurrentAlloc(16));
					//v.push_back(ConcurrentAlloc((16 + i) % 8192 + 1));
				}
				size_t end1 = clock();
				size_t begin2 = clock();
				for (size_t i = 0; i < ntimes; i++)
				{
					ConcurrentFree(v[i]);
				}
				size_t end2 = clock();
				v.clear();
				malloc_costtime += (end1 - begin1);
				free_costtime += (end2 - begin2);
			}
		});
	}
	for (auto& t : vthread)
	{
		t.join();
	}
	printf("%u个线程并发执行%u轮次,每轮次concurrent alloc %u次: 花费:%u ms\n",
		nworks, rounds, ntimes, malloc_costtime);
	printf("%u个线程并发执行%u轮次,每轮次concurrent dealloc %u次: 花费:%u ms\n",
		nworks, rounds, ntimes, free_costtime);
	printf("%u个线程并发concurrent alloc&dealloc %u次,总计花费:%u ms\n",
		nworks, nworks*rounds*ntimes, malloc_costtime + free_costtime);
}

int main()
{
	size_t n = 10000;
	cout << "==========================================================" <<
		endl;
	BenchmarkConcurrentMalloc(n, 4, 10);
	cout << endl << endl;
	BenchmarkMalloc(n, 4, 10);
	cout << "==========================================================" <<
		endl;
	return 0;
}

参数含义:

  • ntimes:单轮此申请和释放的次数
  • nworks:线程数
  • rounds:伦次

在测试函数中,我们通过clock函数分别获取到每轮次申请和释放所花费的时间,然后将其对应累加到malloc_costtime和free_costtime上。最后我们就得到了,nworks个线程跑rounds轮,每轮申请和释放ntimes次,这个过程申请所消耗的时间、释放所消耗的时间、申请和释放总共消耗的时间。

注意,我们创建线程时让线程执行的是lambda表达式,而我们这里在使用lambda表达式时,以值传递的方式捕捉了变量k,以引用传递的方式捕捉了其他父作用域中的变量,因此我们可以将各个线程消耗的时间累加到一起。

我们将所有线程申请内存消耗的时间都累加到malloc_costtime上, 将释放内存消耗的时间都累加到free_costtime上,此时malloc_costtime和free_costtime可能被多个线程同时进行累加操作的,所以存在线程安全的问题。鉴于此,我们在定义这两个变量时使用了atomic类模板,这时对它们的操作就是原子操作了。


固定大小内存的申请和释放

v.push_back(malloc(16));
v.push_back(ConcurrentAlloc(16));

此时4个线程执行10轮操作,每轮申请释放10000次,总共申请释放了40万次,运行后可以看到,malloc的效率还是更高的。

在这里插入图片描述
由于此时我们申请释放的都是固定大小的对象,每个线程申请释放时访问的都是各自thread cache的同一个桶,当thread cache的这个桶中没有对象或对象太多要归还时,也都会访问central cache的同一个桶。此时central cache中的桶锁就不起作用了,因为我们让central cache使用桶锁的目的就是为了,让多个thread cache可以同时访问central cache的不同桶,而此时每个thread cache访问的却都是central cache中的同一个桶。


不同大小内存的申请和释放

v.push_back(malloc((16 + i) % 8192 + 1));
v.push_back(ConcurrentAlloc((16 + i) % 8192 + 1));

运行后可以看到,由于申请和释放内存的大小是不同的,此时central cache当中的桶锁就起作用了,ConcurrentAlloc的效率也有了较大增长,但相比malloc来说还是差一点点。(因为malloc本身也是一个内存池,效率还是很优的,我们的策略对于多线程虽然更好,但是我们还没优化完,所以效率暂时还不行)在这里插入图片描述

性能瓶颈分析

经过前面的测试可以看到,我们的代码此时与malloc之间还是有差距的,此时我们就应该分析分析我们当前项目的瓶颈在哪里,但这不能简单的凭感觉,我们应该用性能分析的工具来进行分析。(VS编译器自带的性能分析工具 ,“调试-> 性能和诊断”),注意该操作要在Debug模式下进行

在这里插入图片描述
同时我们将代码中n的值由10000调成了1000,否则该分析过程可能会花费较多时间,并且将malloc的测试代码进行了屏蔽,因为我们要分析的是我们实现的高并发内存池。

int main()
{
	size_t n = 1000;
	cout << "==========================================================" <<
		endl;
	BenchmarkConcurrentMalloc(n, 4, 10);
	cout << endl << endl;
	//BenchmarkMalloc(n, 4, 10);
	cout << "==========================================================" <<
		endl;
	return 0;
}

分析性能瓶颈

我们可以看到,光是Deallocate和MapObjectToSpan这两个函数就占用了一半多的时间。在这里插入图片描述
而在Deallocate函数中,调用ListTooLong函数时消耗的时间是最多的。
在这里插入图片描述
在ListTooLong函数中,调用ReleaseListToSpans函数时消耗的时间是最多的。
在这里插入图片描述
在ReleaseListToSpans函数中,调用MapObjectToSpan函数时消耗的时间是最多的。
在这里插入图片描述
也就是说,最终消耗时间最多的实际就是MapObjectToSpan函数,我们这时再来看看为什么调用MapObjectToSpan函数会消耗这么多时间。通过观察我们最终发现,调用该函数时会消耗这么多时间就是因为锁的原因。(最终得出瓶颈在于page cache的整体加锁上面)
在这里插入图片描述
因此当前项目的瓶颈点就在锁竞争上面,需要解决调用MapObjectToSpan函数访问映射关系时的加锁问题。tcmalloc当中针对这一点使用了基数树进行优化,使得在读取这个映射关系时可以做到不加锁。

因为访问一次就要将整个哈希桶上锁的因素呢,导致操作和访问page cache都将是线性的,而不能够并发,所以这对性能的影响是很大的


针对性能瓶颈使用基数树进行优化

基数树实际上就是分层哈希,根据所分层数不同可分为单层基数树、二层基数树、三层基数树等。

单层基数树

单层基数树实际采用的就是直接定址法(哈希表),每一个页号对应span的地址就存储数组中在以该页号为下标的位置。

在这里插入图片描述

一层基数树我们需要建立所有页号和span之间的映射关系,因此这个数组中元素个数应该与页号的数目相同,数组中每个位置存储的就是对应span的指针。

//单层基数树
template <int BITS>
class TCMalloc_PageMap1
{
public:
	typedef uintptr_t Number;
	explicit TCMalloc_PageMap1()
	{
		size_t size = sizeof(void*) << BITS; //需要开辟数组的大小
		size_t alignSize = SizeClass::_RoundUp(size, 1 << PAGE_SHIFT); //按页对齐后的大小
		array_ = (void**)SystemAlloc(alignSize >> PAGE_SHIFT); //向堆申请空间
		memset(array_, 0, size); //对申请到的内存进行清理
	}
	void* get(Number k) const
	{
		if ((k >> BITS) > 0) //k的范围不在[0, 2^BITS-1]
		{
			return NULL;
		}
		return array_[k]; //返回该页号对应的span
	}
	void set(Number k, void* v)
	{
		assert((k >> BITS) == 0); //k的范围必须在[0, 2^BITS-1]
		array_[k] = v; //建立映射
	}
private:
	void** array_; //存储映射关系的数组
	static const int LENGTH = 1 << BITS; //页的数目

此时当我们需要建立映射时就调用set函数,需要读取映射关系时,就调用get函数就行了。

代码中的非类型模板参数BITS表示储页号最多需要比特位的个数,在32位下我们传入的是32-PAGE_SHIFT,在64为下传入64-PAGE_SHIFT。而其中的LENGTH成员代表的就是页号的数目,即2BITS

如果32位平台下,以一页大小为8K为例,此时页数就是232 ÷ 2 13 = 219,因此存储页号最多需要19个比特位,此时传入非类型模板参数的值就是32 - 13 = 19.由于32位平台指针的大小是4字节,因此该数组的大小就是 219 × 4 = 221 = 2M ,内存消耗不大,是可行的。但如果是在64位平台下,此时该数组的大小是 251 × 8 = 254 = 224G,这显然不可能,对于64位平台,我们需要使用三层基数树.


二层基数树

还是以32位平台下,一页的大小为8K为里来看。此时最多需要19个比特位。而二层基数数就是把这19个比特位分为两次进行映射。

比如用前5个比特位在第一层,映射后得到对应的第二层。然后用剩下的在第二层进行映射,映射后最终得到该页号对应的span指针。(分层哈希)
在这里插入图片描述

在二层基数树中,第一层的数组占用25 × 4 = 27Byte空间,第二层的数组最多占用25 × 214 × 4 = 221 = 2M。二层基数树我们算出来的和一层的需要的空间是一样的阿?其实二层基数树的好处就在于。一层基数树一开始就把2M的数组开辟出来,而二层基数树一开始只需将第一层的数组开辟出来,当需要进行某一页号映射时再开辟对应的第二层的数组

//二层基数树
template <int BITS>
class TCMalloc_PageMap2
{
private:
	static const int ROOT_BITS = 5;                //第一层对应页号的前5个比特位
	static const int ROOT_LENGTH = 1 << ROOT_BITS; //第一层存储元素的个数
	static const int LEAF_BITS = BITS - ROOT_BITS; //第二层对应页号的其余比特位
	static const int LEAF_LENGTH = 1 << LEAF_BITS; //第二层存储元素的个数
	//第一层数组中存储的元素类型
	struct Leaf
	{
		void* values[LEAF_LENGTH];
	};
	Leaf* root_[ROOT_LENGTH]; //第一层数组
public:
	typedef uintptr_t Number;
	explicit TCMalloc_PageMap2()
	{
		memset(root_, 0, sizeof(root_)); //将第一层的空间进行清理
		PreallocateMoreMemory(); //直接将第二层全部开辟
	}
	void* get(Number k) const
	{
		const Number i1 = k >> LEAF_BITS;        //第一层对应的下标
		const Number i2 = k & (LEAF_LENGTH - 1); //第二层对应的下标
		if ((k >> BITS) > 0 || root_[i1] == NULL) //页号值不在范围或没有建立过映射
		{
			return NULL;
		}
		return root_[i1]->values[i2]; //返回该页号对应span的指针
	}
	void set(Number k, void* v)
	{
		const Number i1 = k >> LEAF_BITS;        //第一层对应的下标
		const Number i2 = k & (LEAF_LENGTH - 1); //第二层对应的下标
		assert(i1 < ROOT_LENGTH);
		root_[i1]->values[i2] = v; //建立该页号与对应span的映射
	}
	//确保映射[start,start_n-1]页号的空间是开辟好了的
	bool Ensure(Number start, size_t n)
	{
		for (Number key = start; key <= start + n - 1;)
		{
			const Number i1 = key >> LEAF_BITS;
			if (i1 >= ROOT_LENGTH) //页号超出范围
				return false;
			if (root_[i1] == NULL) //第一层i1下标指向的空间未开辟
			{
				//开辟对应空间
				static ObjectPool<Leaf> leafPool;
				Leaf* leaf = (Leaf*)leafPool.New();
				memset(leaf, 0, sizeof(*leaf));
				root_[i1] = leaf;
			}
			key = ((key >> LEAF_BITS) + 1) << LEAF_BITS; //继续后续检查
		}
		return true;
	}
	void PreallocateMoreMemory()
	{
		Ensure(0, 1 << BITS); //将第二层的空间全部开辟好
	}
};

(Ensure确保)在二层基数树中有一个Ensure函数,当需要建立某一页号与其span之间的映射关系时,需要先调用该Ensure函数确保用于映射该页号的空间是开辟了的,如果没有开辟则会立即开辟。

而在32位平台下,就算将二层基数树第二层的数组全部开辟出来也就消耗了2M的空间,内存消耗也不算太多,因此我们可以在构造二层基数树时就把第二层的数组全部开辟出来。(当然用的时候再开辟更好)


> 三层基数数

上面一层基数树和二层基数树都适用于32位平台,而对于64位的平台就需要用三层基数树了。三层基数树与二层基数树类似,三层基数树实际上就是把存储页号的若干比特位分为三次进行映射。
在这里插入图片描述此时只有当要建立某一页号的映射关系时,再开辟对应的数组空间,而没有建立映射的页号就可以不用开辟其对应的数组空间,此时就能在一定程度上节省内存空间。

//三层基数树
template <int BITS>
class TCMalloc_PageMap3
{
private:
	static const int INTERIOR_BITS = (BITS + 2) / 3;       //第一、二层对应页号的比特位个数
	static const int INTERIOR_LENGTH = 1 << INTERIOR_BITS; //第一、二层存储元素的个数
	static const int LEAF_BITS = BITS - 2 * INTERIOR_BITS; //第三层对应页号的比特位个数
	static const int LEAF_LENGTH = 1 << LEAF_BITS;         //第三层存储元素的个数
	struct Node
	{
		Node* ptrs[INTERIOR_LENGTH];
	};
	struct Leaf
	{
		void* values[LEAF_LENGTH];
	};
	Node* NewNode()
	{
		static ObjectPool<Node> nodePool;
		Node* result = nodePool.New();
		if (result != NULL)
		{
			memset(result, 0, sizeof(*result));
		}
		return result;
	}
	Node* root_;
public:
	typedef uintptr_t Number;
	explicit TCMalloc_PageMap3()
	{
		root_ = NewNode();
	}
	void* get(Number k) const
	{
		const Number i1 = k >> (LEAF_BITS + INTERIOR_BITS);         //第一层对应的下标
		const Number i2 = (k >> LEAF_BITS) & (INTERIOR_LENGTH - 1); //第二层对应的下标
		const Number i3 = k & (LEAF_LENGTH - 1);                    //第三层对应的下标
		//页号超出范围,或映射该页号的空间未开辟
		if ((k >> BITS) > 0 || root_->ptrs[i1] == NULL || root_->ptrs[i1]->ptrs[i2] == NULL)
		{
			return NULL;
		}
		return reinterpret_cast<Leaf*>(root_->ptrs[i1]->ptrs[i2])->values[i3]; //返回该页号对应span的指针
	}
	void set(Number k, void* v)
	{
		assert(k >> BITS == 0);
		const Number i1 = k >> (LEAF_BITS + INTERIOR_BITS);         //第一层对应的下标
		const Number i2 = (k >> LEAF_BITS) & (INTERIOR_LENGTH - 1); //第二层对应的下标
		const Number i3 = k & (LEAF_LENGTH - 1);                    //第三层对应的下标
		Ensure(k, 1); //确保映射第k页页号的空间是开辟好了的
		reinterpret_cast<Leaf*>(root_->ptrs[i1]->ptrs[i2])->values[i3] = v; //建立该页号与对应span的映射
	}
	//确保映射[start,start+n-1]页号的空间是开辟好了的
	bool Ensure(Number start, size_t n)
	{
		for (Number key = start; key <= start + n - 1;)
		{
			const Number i1 = key >> (LEAF_BITS + INTERIOR_BITS);         //第一层对应的下标
			const Number i2 = (key >> LEAF_BITS) & (INTERIOR_LENGTH - 1); //第二层对应的下标
			if (i1 >= INTERIOR_LENGTH || i2 >= INTERIOR_LENGTH) //下标值超出范围
				return false;
			if (root_->ptrs[i1] == NULL) //第一层i1下标指向的空间未开辟
			{
				//开辟对应空间
				Node* n = NewNode();
				if (n == NULL) return false;
				root_->ptrs[i1] = n;
			}
			if (root_->ptrs[i1]->ptrs[i2] == NULL) //第二层i2下标指向的空间未开辟
			{
				//开辟对应空间
				static ObjectPool<Leaf> leafPool;
				Leaf* leaf = leafPool.New();
				if (leaf == NULL) return false;
				memset(leaf, 0, sizeof(*leaf));
				root_->ptrs[i1]->ptrs[i2] = reinterpret_cast<Node*>(leaf);
			}
			key = ((key >> LEAF_BITS) + 1) << LEAF_BITS; //继续后续检查
		}
		return true;
	}
	void PreallocateMoreMemory()
	{}
};

因此当我们要建立某一页号的映射关系时,需要先确保存储该页映射的数组空间是开辟好了的,也就是调用代码中的Ensure函数,如果对应数组空间未开辟则会立马开辟对应的空间。

对于64位平台,我们上面计算过,可不敢在高层基数树把空间全部开辟,一定是用什么开辟什么

使用基数树进行优化代码实现

现在我们用基数树对代码进行优化,此时将PageCache类当中的unorder_map用基数树进行替换即可,由于当前是32位平台,因此这里随便用几层基数树都可以

//单例模式
class PageCache
{
public:
	//...
private:
	//std::unordered_map<PAGE_ID, Span*> _idSpanMap;
	TCMalloc_PageMap1<32 - PAGE_SHIFT> _idSpanMap;//开辟多大的一个哈希表(全部开好)
};

此时当我们需要建立页号与span的映射时,就调用基数树当中的set函数。

_idSpanMap.set(span->_pageId, span);

而当我们需要读取某一页号对应的span时,就调用基数树当中的get函数。

Span* ret = (Span*)_idSpanMap.get(id);

并且现在PageCache类向外提供的,用于读取映射关系的MapObjectToSpan函数内部就不需要加锁了。

//获取从对象到span的映射
Span* PageCache::MapObjectToSpan(void* obj)
{
	PAGE_ID id = (PAGE_ID)obj >> PAGE_SHIFT; //页号
	Span* ret = (Span*)_idSpanMap.get(id);
	assert(ret != nullptr);
	return ret;
}

读取基数树映射关系不需要加锁的原因

首先是因为容器的原因,比如我们使用的map或者unordered_map,底层对应红黑树和哈希表。那么他们在建立映射的时候,就可能会发生结构变化,比如红黑树变色旋转等,哈希表可能发生扩容,导致重新建立映射。而基数树是一次性开辟好(一层),那么所以以后的Span*的位置就是确定的,不用担心因为结构变化访问到其他元素(就算没有开辟完,结构的位置也是确定的,开辟的时候该在哪儿就在哪儿。)
②我们对某一个内存的操作只有“一次”,不可能是两个人在使用同一块内存,我们不会同时对同一个页进行读取映射和建立映射的操作,因为我们只有在释放对象时才需要读取映射,而建立映射的操作都是在page cache进行的。也就是说,读取映射时读取的都是对应span的_useCount不等于0的页,而建立映射时建立的都是对应span的_useCount等于0的页,所以说我们不会同时对同一个页进行读取映射和建立映射的操作。

再次对比malloc进行测试

同样的代码,只不过我们用基数树对代码进行了优化,这时测试固定大小内存的申请和释放的结果如下:
在这里插入图片描述
可以看到,这时就算申请释放的是固定大小的对象,其效率都是malloc的两倍。下面在申请释放不同大小的对象时,由于central cache的桶锁起作用了,其效率更是变成了malloc的8倍以上,我测试过超过10倍的性能(可能产生的随机数正好落在不同的桶概率较大)。
在这里插入图片描述

打包成动静态库

实际Google开源的tcmalloc是会直接用于替换malloc的
不同平台替换的方式不同。
比如Linux下的gcc,他就采用alias的方式去进行替换
其他平台可能不支持,需要使用hook的钩子技术来做。钩子技术就比如说调用的时候我把它勾过来,换成我这个

对于我们当前实现的项目,可以考虑将其打包成静态库或动态库。我们先右击解决方案资源管理器当中的项目名称,然后选择属性。

在这里插入图片描述
在这里插入图片描述

项目源码

Gitee: https://gitee.com/zhu-pi/zhupi-project/tree/master/ConcurrentMemoryPool
在这里插入图片描述

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/527023.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

51单片机(十一)蜂鸣器

❤️ 专栏简介&#xff1a;本专栏记录了从零学习单片机的过程&#xff0c;其中包括51单片机和STM32单片机两部分&#xff1b;建议先学习51单片机&#xff0c;其是STM32等高级单片机的基础&#xff1b;这样再学习STM32时才能融会贯通。 ☀️ 专栏适用人群 &#xff1a;适用于想要…

使用ntpd同步服务端时间

安装ntpd 服务服务&#xff1a;yum install ntp -y ps&#xff1a;离线安装 1. 下载rpm离线包&#xff1a;autogen-libopts-5.18-5.el7.x86_64.rpm、ntp-4.2.6p5-29.el7.centos.2.x86_64.rpm、ntpdate-4.2.6p5-29.el7.centos.2.x86_64.rpm 2. 安装&#xff1a;rpm -Uvh --force…

logback按天归档日志

效果图 logback.xml文件配置 <configuration debug"false"><!--日志输出到文件--><appender name"BaseLogFile" class"ch.qos.logback.core.rolling.RollingFileAppender"><file>logs/logback.log</file><…

计算机网络 | 五种I/O模型

欢迎关注博主 Mindtechnist 或加入【Linux C/C/Python社区】一起学习和分享Linux、C、C、Python、Matlab&#xff0c;机器人运动控制、多机器人协作&#xff0c;智能优化算法&#xff0c;滤波估计、多传感器信息融合&#xff0c;机器学习&#xff0c;人工智能等相关领域的知识和…

用《文心一言》1分钟写一篇博客简直yyds

文章目录 前言文心一言是什么文心一言可以做什么文心一言写博客申请体验写在最后 前言 当今社会&#xff0c;博客已成为了许多人分享观点、知识和经验的重要平台。用文心一言写博客是将自己的思考、想法和经验以文字的形式呈现出来&#xff0c;让更多人了解自己。通过写博客&a…

SpringBoot——整合Junit测试

简单介绍&#xff1a; 其实Spring Boot的测试在之前我们就已经使用过了&#xff0c;只不过当时我们只是使用&#xff0c;并不知道他具体是怎么实现的&#xff0c;我们稍微介绍一下他在做测试的时候是怎么实现的&#xff1a; SpringBoot的测试&#xff1a; 在test文件夹下面&a…

C语言CRC-16 CCITT格式校验函数

C语言CRC-16 CCITT格式校验函数 CRC-16校验产生2个字节长度的数据校验码&#xff0c;通过计算得到的校验码和获得的校验码比较&#xff0c;用于验证获得的数据的正确性。基本的CRC-16校验算法实现&#xff0c;参考&#xff1a; C语言标准CRC-16校验函数。 不同同应用规范通过…

国产数据采集虚拟仪器板卡结合labview的应用

众所周知&#xff0c;虚拟仪器技术是根据用户的需求由软件定义通用测试硬件功能的系统。 通过将可重复配置的硬件应用到一个虚拟仪器系统&#xff0c;工程师可以使用软件来开发算法并把它们应用到一个嵌入式芯片&#xff0c;从而把虚拟仪器软件的可配置能力扩展至硬件。 以前只…

MySQL 主从复制与分离

基本概念 什么是读写分离 读写分离&#xff0c;基本的原理是让主数据库处理事务性增、改、删操作&#xff08;INSERT、UPDATE、DELETE&#xff09;&#xff0c;而从数据库处理SELECT查询操作。数据库复制被用来把事务性操作导致的变更同步到集群中的从数据库。 为什么要读写分…

详解线程与进程的区别

文章目录 一、进程&#xff08;Process&#xff09;二、线程&#xff08;Thread&#xff09;三、线程与进程的区别 一、进程&#xff08;Process&#xff09; 有关进程的介绍我们在前文已经详细介绍&#xff0c;有疑问的同学可以戳这里->操作系统与进程调度 我们知道&…

Guitar Pro8吉他谱编写软件下载安装及使用教程

音乐制作的许多程序都可以借助软件来完成&#xff0c;吉他谱的编写也是如此。今天要和大家分享的是吉他谱编写软件Guitar Pro&#xff0c;吉他谱怎么做电子版。 提到吉他谱的编写&#xff0c;有一款软件总是被第一时间想到&#xff0c;那就是Guitar Pro。 Guitar Pro是一款专业…

MHA高可用与故障切换

一、MHA的概述 1、 MHA的概念 MHA&#xff08;MasterHigh Availability&#xff09;是一套优秀的MySQL高可用环境下故障切换和主从复制的软件。 MHA 的出现就是解决MySQL 单点的问题。 MySQL故障切换过程中&#xff0c;MHA能做到0-30秒内自动完成故障切换操作。 MHA能在故…

快速幂算法 Pow(x,n)函数的实现_20230515

快速幂算法 Pow(x,n)函数的实现 前言 如果要实现x的整数n次幂(xn)&#xff0c;那么可以采用不同的策略&#xff0c;最直观和简单的算法就是利用递归或迭代把n个x连续相乘起来&#xff0c;从而获得幂乘结果。显而易见&#xff0c;此算法至少需要O(n)次运算&#xff0c;n比较大…

大语言模型友好的 API:借助集体智慧构建更好的软件架构

在过去的几个月里&#xff0c;我们一直在探索&#xff1a;如何将大语言模型用于研发效能提升&#xff1f;而随着&#xff0c;我们研究的逐步推进&#xff0c;慢慢进入一些深水区&#xff0c;诸如于&#xff1a;如何将 AI 更好地用于辅助架构设计&#xff1f;基于我们多年的架构…

​目标检测:如何有效实现YOLOv8

本文是一篇实用指南&#xff0c;介绍了如何使用命令行界面和Python来对图像、视频和实时网络摄像头中的物体进行检测。 介绍 目标检测是计算机视觉的一个子领域&#xff0c;主要涉及在图像或视频中以一定的置信度识别和定位物体。识别出的物体通常带有一个边界框&#xff0c;提…

c++ 11标准模板(STL) std::set(三)

定义于头文件 <set> template< class Key, class Compare std::less<Key>, class Allocator std::allocator<Key> > class set;(1)namespace pmr { template <class Key, class Compare std::less<Key>> using se…

如何使用chatgpt生成精美PPT提高工作效率

本教程收集于:AIGC从入门到精通教程 如何快速生成精美PPT提高工作效率 一、ChatGPT生成markdown源代码 二、Mindshow登录/注册 三、导入markd

Redis缓存架构详解

文章目录 Redis缓存结构详解前言Redis 缓存架构redis 和db数据一致性先写db还是写redis如果是先写db,再删除缓存呢&#xff1f;延迟双删 简单的缓存,并发不高,没啥流量简单的缓存,并发高,但是存在redis和 Db 双写不一致,读写并发不一致问题解决方案 1解决方案 2解决方案 3读写锁…

Linux常用命令——iconv命令

在线Linux命令查询工具 iconv 转换文件的编码方式 补充说明 iconv命令是用来转换文件的编码方式的&#xff0c;比如它可以将UTF8编码的转换成GB18030的编码&#xff0c;反过来也行。JDK中也提供了类似的工具native2ascii。Linux下的iconv开发库包括iconv_open,iconv_close,…

《程序员面试金典(第6版)》面试题 16.22. 兰顿蚂蚁(哈希映射,C++)

题目描述 一只蚂蚁坐在由白色和黑色方格构成的无限网格上。开始时&#xff0c;网格全白&#xff0c;蚂蚁面向右侧。每行走一步&#xff0c;蚂蚁执行以下操作。传送门 (1) 如果在白色方格上&#xff0c;则翻转方格的颜色&#xff0c;向右(顺时针)转 90 度&#xff0c;并向前移动…