C++项目--高并发内存池

news2025/1/12 10:03:55

目录

  • 一、项目介绍
  • 二、内存池介绍
    • 2.1 池化技术
    • 2.2 内存池
    • 2.3 内存池主要解决的问题
    • 2.4 malloc
  • 三、定长内存池的实现
    • 3.1 定长内存池概念
    • 3.2 内存池管理释放对象
    • 3.3 内存池申请对象
    • 3.4 定长内存池整体代码
    • 3.5 性能对比
  • 四、高并发内存池整体框架设计
    • 4.1 该项目解决的问题
    • 4.2 整体框架结构
    • 4.3 各部分的主要作用
  • 五、threadcache
    • 5.1 threadcache整体设计
    • 5.2 threadcache哈希桶映射对齐规则
      • 5.2.1 映射对齐规则
      • 5.2.2 空间浪费率
      • 5.2.3 对齐和映射相关函数的编写
      • 5.2.4 ThreadCache类
    • 5.3 threadcache TLS无锁访问
  • 六、centralcache
    • 6.1 centralcache整体设计
      • 6.1.1 central cache与thread cache的不同之处
    • 6.2 centralcache结构设计
      • 6.2.1 页号的类型
      • 6.2.2 span的结构
      • 6.2.3 双链表结构
      • 6.2.4 central cache的结构
    • 6.3 centralcache核心实现
      • 6.3.1 central cache的实现方式
      • 6.3.2 慢开始反馈调节算法
      • 6.3.3 从中心缓存获取对象
      • 6.3.4 从中心缓存获取一定数量对象
      • 6.3.5 插入或删除一段范围的自由链表对象
  • 七、pagecache
    • 7.1 pagecache整体设计
      • 7.1.1 在page cache获取一个n页的span过程
      • 7.1.2 page cache的实现方式
    • 7.2 pagecache中获取Span
      • 7.2.1 获取一个非空的span
      • 7.2.2 获取一个k页的span
  • 八、申请内存过程联调
    • 8.1 ConcurrentAlloc函数
    • 8.2 申请内存过程联调测试(上)
    • 8.3 申请内存过程联调测试(下)
  • 九、threadcache回收内存
  • 十、centralcache回收内存
  • 十一、pagecache回收内存
    • 11.1 page cache进行前后页的合并
  • 十二、释放内存过程联调
    • 12.1 ConcurrentFree函数
  • 十三、大于256KB的大块内存申请问题
    • 13.1 申请过程
    • 13.2 释放过程
  • 十四、使用定长内存池配合脱离使用new
  • 十五、释放对象时优化为不传对象大小
    • 15.1 读取映射关系时的加锁问题
  • 十六、多线程环境下对比malloc测试
    • 16.1 固定大小内存的申请和释放
    • 16.2 不同大小内存的申请和释放
  • 十七、复杂问题的调试技巧
    • 17.1 条件断点
    • 17.2 查看函数栈帧
    • 17.3 疑似死循环时中断程序
  • 十八、性能瓶颈分析
    • 18.1 VS编译器下性能分析的操作步骤
    • 18.2 分析性能瓶颈
  • 十九、针对性能瓶颈使用基数树进行优化
    • 19.1 单层基数树
    • 19.2 二层基数树
    • 19.3 三层基数树
  • 二十、使用基数树进行优化代码实现
    • 20.1 代码更改说明
    • 20.2 读取基数树映射关系时不需要加锁
    • 20.3 再次对比malloc进行测试
  • 二十一、项目源码

一、项目介绍

  本项目是实现一个高并发内存池,它的原型是Google的一个开源项目tcmalloc,tcmalloc全称Thread-Caching Malloc,即线程缓存的malloc,实现了高效的多线程内存管理,用于替代系统的内存分配相关的函数(malloc、free)。
在这里插入图片描述
  这个项目主要把tcmalloc最核心的框架简化后拿出来,模拟实现出一个自己的高并发内存池,目的就是学习tcmalloc的精华。该项目主要会用到C/C++、数据结构(链表、哈希桶)、操作系统内存管理、单例模式、多线程、互斥锁等方面的知识。

二、内存池介绍

2.1 池化技术

  所谓“池化技术”,就是程序先向系统申请过量的资源,然后自己管理,以备不时之需。之所以要申请过量的资源,是因为每次申请该资源都有较大的开销,不如提前申请好了,这样使用时就会变得非常快捷,大大提高程序运行效率。
  在计算机中,有很多使用“池”这种技术的地方,除了内存池,还有连接池、线程池、对象池等。以服务器上的线程池为例,它的主要思想是:先启动若干数量的线程,让它们处于睡眠状态,当接收到客户端的请求时,唤醒池中某个睡眠的线程,让它来处理客户端的请求,当处理完这个请求,线程又进入睡眠状态。

2.2 内存池

  内存池是指程序预先从操作系统申请一块足够大的内存,此后,当程序中需要申请内存的时候,不是直接向操作系统申请,而是直接从内存池中获取;同理,当程序释放内存的时候,并不真正将内存返回给操作系统,而是返回内存池。当程序退出(或者特定时间)时,内存池才将之前申请的内存真正释放。

2.3 内存池主要解决的问题

  内存池主要解决的就是效率问题,它能够避免让程序频繁的向系统申请和释放内存。其次,内存池作为系统的内存分配器,还需要尝试解决内存碎片的问题。
在这里插入图片描述
内存碎片分为内碎片和外碎片

  • 外部碎片是一些空闲的小块内存区域,由于这些内存空间不连续,以至于合计的内存足够,但是不能满足一些内存分配申请需求。
  • 内碎片是由于一些对齐的需求,导致分配出去的空间中一些内存无法被利用。

注意:内存池尝试解决的是外碎片的问题,同时也尽可能的减少内碎片的产生。

2.4 malloc

  C/C++中我们要动态申请内存都是通过malloc去申请内存,但是我们要知道,实际我们不是直接去堆获取内存的,而malloc就是一个内存池。malloc()相当于向操作系统“批发”了一块较大的内存空间,然后“零售”给程序用。当全部“售完”或程序有大量的内存需求时,再根据实际需求向操作系统“进货”。
在这里插入图片描述

  malloc的实现方式有很多种,一般不同编译器平台用的都是不同的。比如windows的vs系列用的微软自己写的一套,Linux gcc用的glib中的ptmalloc。

三、定长内存池的实现

3.1 定长内存池概念

  作为C/C++程序员,我们知道申请内存使用的是malloc,malloc其实就是一个通用的内存池,什么场景下都可以使用,但是什么场景下都可以使用就意味着什么场景下都不会有很高的性能,因为malloc并不是针对某种场景专门设计的。
在这里插入图片描述

  定长内存池就是针对固定大小内存块的申请和释放的内存池,由于定长内存池只需支持固定大小内存块的申请和释放,因此我们可以将其性能做到极致,并且在实现定长内存池时不需要考虑内存碎片等问题,因为我们申请/释放的都是固定大小的内存块。
  我们可以通过实现定长内存池来熟悉一下对简单内存池的控制,其次,这个定长内存池后面会作为高并发内存池的一个基础组件。

实现定长
  在实现定长内存池时要做到“定长”有很多种方法,比如我们可以使用非类型模板参数,使得在该内存池中申请到的对象的大小都是N。

//定长内存池
template<size_t N>
class ObjectPool
{};

  此外,定长内存池也叫做对象池,在创建对象池时,对象池可以根据传入的对象类型的大小来实现“定长”,因此我们可以通过使用模板参数来实现“定长”,比如创建定长内存池时传入的对象类型是int,那么该内存池就只支持4字节大小内存的申请和释放。

template<class T>
class ObjectPool
{};

直接向堆申请空间
  既然是内存池,那么我们首先得向系统申请一块内存空间,然后对其进行管理。要想直接向堆申请内存空间,在Windows下,可以调用VirtualAlloc函数;在Linux下,可以调用brk或mmap函数。

#ifdef _WIN32
	#include<windows.h>
#else
// 
#endif

// 直接去堆上按页申请空间
inline static void* SystemAlloc(size_t kpage)
{
#ifdef _WIN32
	void* ptr = VirtualAlloc(0, kpage << 13, MEM_COMMIT | MEM_RESERVE, PAGE_READWRITE);
#else
	// linux下brk mmap等
#endif
	if (ptr == nullptr)
		throw std::bad_alloc();
	return ptr;
}

  这里我们可以通过条件编译将对应平台下向堆申请内存的函数进行封装,此后我们就不必再关心当前所在平台,当我们需要直接向堆申请内存时直接调用我们封装后的SystemAlloc函数即可。

定长内存池中应该包含哪些成员变量
  对于向堆申请到的大块内存,我们可以用一个指针来对其进行管理,但仅用一个指针肯定是不够的,我们还需要用一个变量来记录这块内存的长度。
  由于后面需要将这块内存进行切分,为了方便切分操作,指向这块内存的指针最好是字符指针,因为指针的类型决定了指针向前或向后走一步有多大距离,对于字符指针来说,当我们需要向后移动n个字节时,直接对字符指针进行加n操作即可。
在这里插入图片描述
  其次,释放回来的定长内存块也需要被管理,我们可以将这些释放回来的定长内存块链接成一个链表,这里我们将管理释放回来的内存块的链表叫做自由链表,为了能够找到这个自由链表,我们还需要一个指向自由链表的指针。
在这里插入图片描述
定长内存池当中包含三个成员变量

  • _memory:指向大块内存的指针
  • _remainBytes:大块内存切分过程中剩余字节数
  • _freeList:还回来过程中链接的自由链表的头指针

3.2 内存池管理释放对象

  对于还回来的定长内存块,我们可以用自由链表将其链接起来,但我们并不需要为其专门定义链式结构,我们可以让内存块的前4个字节(32位平台)或8个字节(64位平台)作为指针,存储后面内存块的起始地址即可。
  因此在向自由链表插入被释放的内存块时,先让该内存块的前4个字节或8个字节存储自由链表中第一个内存块的地址,然后再让_freeList指向该内存块即可,也就是一个简单的链表头插操作。
在这里插入图片描述

如何让一个指针在32位平台下解引用后能向后访问4个字节,在64位平台下解引用后能向后访问8个字节?
  首先,32位平台下指针的大小是4个字节,64位平台下指针的大小是8个字节。而指针指向数据的类型,决定了指针解引用后能向后访问的空间大小,因此我们需要的是一个指向指针的指针,这里使用二级指针就行了。
  当我们需要访问一个内存块的前4/8个字节时,我们就可以将该内存块的地址强转为二级指针,由于二级指针存储的是一级指针的地址,二级指针解引用能向后访问一个指针的大小,因此在32位平台下访问的就是4个字节,在64位平台下访问的就是8个字节,此时我们就访问到了该内存的前4/8个字节。

static void*& NextObj(void* obj)
{
	return *(void**)obj;
}

  需要注意的是,在释放对象时,我们应该显式调用该对象的析构函数清理该对象,因为该对象可能还管理着其他某些资源,如果不对其进行清理,那么这些资源将无法被释放,就会导致内存泄漏。

void Delete(T* obj)     //切分出去对象进行链接
	{
		//显式调用析构函数清理对象
		obj->~T();

		//if (_freeList == nullptr)
		//{
		//	_freeList = obj;
		//	//*(int*)obj = nullptr;    //使用int在32位下访问4字节,程序没有问题,在64位下程序就有问题
		//	*(void**)obj = nullptr;
		//}
		//else
		//{
		//	//头插
		//	*(void**)obj = _freeList;
		//	_freeList = obj;
		//}

		//头插(上面两种情况对于有没有结点都适用,可以合并为一种情况)
		*(void**)obj = _freeList;    //void*在32位下是4字节,在64位下是8字节
		_freeList = obj;
	}

3.3 内存池申请对象

  当我们申请对象时,内存池应该优先把还回来的内存块对象再次重复利用,因此如果自由链表当中有内存块的话,就直接从自由链表头删一个内存块进行返回即可。
在这里插入图片描述
  如果自由链表当中没有内存块,那么我们就在大块内存中切出定长的内存块进行返回,当内存块切出后及时更新_memory指针的指向,以及_remainBytes的值即可。

  需要特别注意的是,由于当内存块释放时我们需要将内存块链接到自由链表当中,因此我们必须保证切出来的对象至少能够存储得下一个地址,所以当对象的大小小于当前所在平台指针的大小时,需要按指针的大小进行内存块的切分。
  此外,当大块内存已经不足以切分出一个对象时,我们就应该调用封装的SystemAlloc函数,再次向堆申请一块内存空间,此时也要注意及时更新_ memory指针的指向,以及_remainBytes的值。

T* New()     //开空间,切分对象
	{
		T* obj = nullptr;

		//优先把还回来的内存块对象,再次重复利用
		if (_freeList)    //头删
		{
			void* next = *((void**)_freeList);
			obj = (T*)_freeList;
			_freeList = next;
			
		}
		else
		{
			//剩余内存不够一个对象大小时,则重新开大块空间
			if (_remainBytes < sizeof(T))
			{
				_remainBytes = 128 * 1024;
				//_memory = (char*)malloc(_remainBytes);
				_memory = (char*)SystemAlloc(_remainBytes>>13);
				if (_memory == nullptr)
				{
					throw std::bad_alloc();
				}
			}

			obj = (T*)_memory;
			size_t objSize = sizeof(T) < sizeof(void*) ? sizeof(void*) : sizeof(T);   //objSize保证每次切分对象的地址能存下一个指针大小
			_memory += objSize;
			_remainBytes -= objSize;
		}	

		//定位new,显式调用T的构造函数初始化 (对已经有的空间进行初始化)
		new(obj)T;

		return obj;
	}

注意:与释放对象时需要显式调用该对象的析构函数一样,当内存块切分出来后,我们应该使用定位new,显式调用该对象的构造函数对其进行初始化。

3.4 定长内存池整体代码

#ifdef _WIN32
	#include<windows.h>
#else
// 
#endif

定长内存池
//template<size_t N>
//class ObjectPool
//{};

// 直接去堆上按页申请空间
inline static void* SystemAlloc(size_t kpage)
{
#ifdef _WIN32
	void* ptr = VirtualAlloc(0, kpage << 13, MEM_COMMIT | MEM_RESERVE, PAGE_READWRITE);
#else
	// linux下brk mmap等
#endif
	if (ptr == nullptr)
		throw std::bad_alloc();

	return ptr;
}

template<class T>
class ObjectPool
{
public:
	T* New()     //开空间,切分对象
	{
		T* obj = nullptr;

		//优先把还回来的内存块对象,再次重复利用
		if (_freeList)    //头删
		{
			void* next = *((void**)_freeList);
			obj = (T*)_freeList;
			_freeList = next;
			
		}
		else
		{
			//剩余内存不够一个对象大小时,则重新开大块空间
			if (_remainBytes < sizeof(T))
			{
				_remainBytes = 128 * 1024;
				//_memory = (char*)malloc(_remainBytes);
				_memory = (char*)SystemAlloc(_remainBytes>>13);
				if (_memory == nullptr)
				{
					throw std::bad_alloc();
				}
			}

			obj = (T*)_memory;
			size_t objSize = sizeof(T) < sizeof(void*) ? sizeof(void*) : sizeof(T);   //objSize保证每次切分对象的地址能存下一个指针大小
			_memory += objSize;
			_remainBytes -= objSize;
		}	

		//定位new,显式调用T的构造函数初始化 (对已经有的空间进行初始化)
		new(obj)T;

		return obj;
	}

	void Delete(T* obj)     //切分出去对象进行链接
	{
		//显式调用析构函数清理对象
		obj->~T();

		//if (_freeList == nullptr)
		//{
		//	_freeList = obj;
		//	//*(int*)obj = nullptr;    //使用int在32位下访问4字节,程序没有问题,在64位下程序就有问题
		//	*(void**)obj = nullptr;
		//}
		//else
		//{
		//	//头插
		//	*(void**)obj = _freeList;
		//	_freeList = obj;
		//}

		//头插(上面两种情况对于有没有结点都适用,可以合并为一种情况)
		*(void**)obj = _freeList;    //void*在32位下是4字节,在64位下是8字节
		_freeList = obj;
	}

private:
	char* _memory = nullptr;   //指向大块内存的指针
	size_t _remainBytes = 0;   //大块内存在切分过程中剩余字节数

	void* _freeList = nullptr;   //还回来过程中链接的自由链表的头指针
};

3.5 性能对比

  下面将实现的定长内存池和malloc/free进行性能对比,测试代码如下:

struct TreeNode
{
	int _val;
	TreeNode* _left;
	TreeNode* _right;
	TreeNode()
		:_val(0)
		, _left(nullptr)
		, _right(nullptr)
	{}
};

void TestObjectPool()
{
	// 申请释放的轮次
	const size_t Rounds = 3;
	// 每轮申请释放多少次
	const size_t N = 100000;
	size_t begin1 = clock();
	std::vector<TreeNode*> v1;
	v1.reserve(N);

	for (size_t j = 0; j < Rounds; ++j)
	{
		for (int i = 0; i < N; ++i)
		{
			v1.push_back(new TreeNode);
		}
		for (int i = 0; i < N; ++i)
		{
			delete v1[i];
		}
		v1.clear();
	}

	size_t end1 = clock();
	ObjectPool<TreeNode> TNPool;
	size_t begin2 = clock();
	std::vector<TreeNode*> v2;
	v2.reserve(N);

	for (size_t j = 0; j < Rounds; ++j)
	{
		for (int i = 0; i < N; ++i)
		{
			v2.push_back(TNPool.New());
		}
		for (int i = 0; i < N; ++i)
		{
			TNPool.Delete(v2[i]);
		}
		v2.clear();
	}

	size_t end2 = clock();
	cout << "new cost time:" << end1 - begin1 << endl;
	cout << "object pool cost time:" << end2 - begin2 << endl;
}

  我们先用new申请若干个TreeNode对象,然后再用delete将这些对象再释放,通过clock函数得到整个过程消耗的时间。(new和delete底层就是封装的malloc和free)
  然后再重复该过程,只不过将其中的new和delete替换为定长内存池当中的New和Delete,此时再通过clock函数得到该过程消耗的时间。
在这里插入图片描述
  可以看到在这个过程中,定长内存池消耗的时间比malloc/free消耗的时间要短。这就是因为malloc是一个通用的内存池,而定长内存池是专门针对申请定长对象而设计的,因此在这种特殊场景下定长内存池的效率更高。

四、高并发内存池整体框架设计

4.1 该项目解决的问题

  现在很多的开发环境都是多核多线程的,在申请内存的场景下,必然存在激烈的锁竞争问题。malloc本身其实已经很优秀了,那么我们项目的原型tcmalloc就是在多线程高并发的场景下更胜一筹,所以这次我们实现的内存池需要考虑以下几方面的问题

  • 1、性能问题
  • 2、多线程环境下,锁竞争问题
  • 3、内存碎片问题

4.2 整体框架结构

  concurrent memory pool主要由以下3个部分构成:

在这里插入图片描述

1、thread cache:线程缓存是每个线程独有的,用于小于256KB内存的分配,线程从这里申请内存不需要加锁,每个线程独享一个cache,这也就是这个并发线程池高效的地方
2、central cache:中心缓存是所有线程所共享,thread cache是按需从central cache中获取的对象,central cache合适的时机回收thread cache中的对象,避免一个线程占用了太多的内存,而其他线程的内存吃紧,达到内存分配在多个线程中更均衡的按需调度的目的。central cache是存在竞争的,所以从这里取内存对象是需要加锁,首先这里用的是桶锁,其次只有thread cache没有内存对象时才会找central cache,所以这里竞争不会很激烈
3、page cache:页缓存是在central cache缓存上面的一层缓存,存储的内存是以页为单位存储及分配的,central cache没有内存对象时,从page cache分配出一定数量的page,并切割成定长大小的小块内存,分配给central cache。当一个span的几个跨度页的对象都回收以后,page cache会回收central cache满足条件的span对象,并且合并相邻的页,组成更大的页,缓解内存碎片的问题。

4.3 各部分的主要作用

  thread cache主要解决锁竞争的问题,每个线程独享自己的thread cache,当自己的thread cache中有内存时该线程不会去和其他线程进行竞争,每个线程只要在自己的thread cache申请内存就行了。
  central cache主要起到一个居中调度的作用,每个线程的thread cache需要内存时从central cache获取,而当thread cache的内存多了就会将内存还给central cache,其作用类似于一个中枢,因此取名为中心缓存。
  page cache就负责提供以页为单位的大块内存,当central cache需要内存时就会去向page cache申请,而当page cache没有内存了就会直接去找系统,也就是直接去堆上按页申请内存块。

五、threadcache

5.1 threadcache整体设计

  定长内存池只支持固定大小内存块的申请释放,因此定长内存池中只需要一个自由链表管理释放回来的内存块。现在要支持申请和释放不同大小的内存块,那么就需要多个自由链表来管理释放回来的内存块,因此thread cache实际上是一个哈希桶结构,每个桶中存放的都是一个自由链表。
  thread cache支持小于等于256KB内存的申请,如果我们将每种字节数的内存块都用一个自由链表进行管理的话,那么此时就需要20多万个自由链表,光是存储这些自由链表的头指针就需要消耗大量内存,这显然是得不偿失。
  这时我们可以选择做一些平衡的牺牲,让这些字节数按照某种规则进行对齐,例如我们让这些字节数都按照8字节进行向上对齐,那么thread cache的结构就是下面这样的,此时当线程申请1~8字节的内存时会直接给出8字节,而当线程申请9~16字节的内存时会直接给出16字节,以此类推。

在这里插入图片描述  因此当线程要申请某一大小的内存块时,就需要经过某种计算得到对齐后的字节数,进而找到对应的哈希桶,如果该哈希桶中的自由链表中有内存块,那就从自由链表中头删一个内存块进行返回;如果该自由链表已经为空,那么就需要向下一层的central cache进行获取了。
  但此时由于对齐的原因,就可能会产生一些碎片化的内存无法被利用,比如线程只申请了6字节的内存,而thread cache却直接给了8字节的内存,这多给出的2字节就无法被利用,导致了一定程度的空间浪费,这些因为某些对齐原因导致无法被利用的内存,就是内存碎片中的内碎片。

  鉴于当前项目比较复杂,最好对自由链表这个结构进行封装,目前我们就提供Push和Pop两个成员函数,对应的操作分别是将对象插入到自由链表(头插)和从自由链表获取一个对象(头删),后面在需要时还会添加对应的成员函数。

申请内存(Pop)

1、当内存申请size<=256KB,先获取到线程本地存储的thread cache对象,计算size映射的哈希桶自由链表下标i。
2、如果自由链表_freeList[i]中有对象,则直接Pop一个内存对象返回。
3、如果_freeList[i]中没有对象时,则批量从central cache中获取一定数量的对象,插入到自由链表并返回一个对象。

释放内存(Push)

1、当释放内存小于256k时将内存释放回thread cache,计算size映射自由链表桶位置i,将对象Push到_freeLists[i]。
2、当链表的长度过长,则回收一部分内存对象到central cache。

//管理切分好的小对象的自由链表
class FreeList
{
public:
	void Push(void* obj)   //插入对象 (一个对象)
	{
		assert(obj);

		//头插
		//*(void**)obj = _freeList;
		NextObj(obj) = _freeList;
		_freeList = obj;

		++_size;
	}

	void* Pop()    //删除对象
	{
		assert(_freeList);

		//头删
		void* obj = _freeList;
		_freeList = NextObj(obj);

		--_size;

		return obj;
	}
	
private:
	void* _freeList=nullptr;
	size_t _size=0;
};

  因此,thread cache实际就是一个数组,数组中存储的就是一个个的自由链表,至于这个数组中到底存储了多少个自由链表,就需要看我们在进行字节数对齐时具体用的是什么映射对齐规则了。

5.2 threadcache哈希桶映射对齐规则

5.2.1 映射对齐规则

  首先,这些内存块是会被链接到自由链表上的,因此一开始肯定是按8字节进行对齐是最合适的,因为我们必须保证这些内存块,无论是是在32位平台下还是64位平台下,都至少能够存储得下一个指针。
  但如果所有的字节数都按照8字节进行对齐的话,那么就需要建立256×1024÷8=32768个桶,这个数量还是比较多的,实际上我们可以让不同范围的字节数按照不同的对齐数进行对齐,具体对齐方式如下:

字节数对齐数哈希桶下标
[1,128]8[0,16)
[128+1,1024]16[16,72)
[1024+1,8×1024]128[72,128)
[8×1024+1,64×1024]1024[128,184)
[64×1024+1,256×1024]8×1024[184,208)

5.2.2 空间浪费率

  虽然对齐产生的内碎片会引起一定程度的空间浪费,但按照上面的对齐规则,我们可以将浪费率控制到百分之十左右。需要说明的是,1~128这个区间我们不做讨论,因为1字节就算是对齐到2字节也有百分之五十的浪费率,这里我们就从第二个区间开始进行计算。

计算空间浪费率:空间浪费率 = 浪费的字节数 ÷ 对齐后的字节数

  根据上面的公式,我们要得到某个区间的最大浪费率,就应该让分子取到最大,让分母取到最小。比如129~1024这个区间,该区域的对齐数是16,那么最大浪费的字节数就是15,而最小对齐后的字节数就是这个区间内的前16个数所对齐到的字节数,也就是144,那么该区间的最大浪费率也就是15÷144≈10.42%。同样的道理,后面两个区间的最大浪费率分别是127÷1152≈11.02%和1023÷9216≈11.10%。

5.2.3 对齐和映射相关函数的编写

  此时有了字节数的对齐规则后,我们就需要提供两个对应的函数,分别用于获取某一字节数对齐后的字节数,以及该字节数对应的哈希桶下标。关于处理对齐和映射的函数,我们可以将其封装到一个类当中。

//计算对象大小的对齐映射规则
class SizeClass
{
public:
	//整体控制在最多10%左右的内碎片浪费
	//[1,128]                  8byte对齐         freelist[0,16)
	//[128+1,1024]             16byte对齐        freelist[16,72)
	//[1024+1,8*1024]          128byte对齐       freelist[72,128)
	//[8*1024+1,64*1024]       1024byte对齐      freelist[128,184)
	//[64*1024+1,256*1024]     8*1024byte对齐    freelist[184,208)

	/*size_t _RoundUp(size_t size, size_t alignNum)   //alignNum表示多少字节对齐数
	{
		size_t alignSize;
		if (size % alignNum != 0)
		{
			alignSize = (size / alignNum + 1) * alignNum;
		}
		else
		{
			alignSize = size;
		}

		return alignSize;
	}*/


	//1--8
	//15&~7
	//7   000111
	//~7  111000
	//8--15与~7相&最后的结果都是8
	//位运算写法
	static inline size_t _RoundUp(size_t bytes, size_t alignNum)
	{
		return ((bytes+ alignNum - 1) & ~(alignNum - 1));
	}


	static inline size_t RoundUp(size_t size)   //计算对象采用多少字节对齐
	{
		if (size <= 128)
		{
			return _RoundUp(size, 8);
		}
		else if (size <= 1024)
		{
			return _RoundUp(size, 16);
		}
		else if (size <= 8 * 1024)
		{
			return _RoundUp(size, 128);
		}
		else if (size <= 64 * 1024)
		{
			return _RoundUp(size, 1024);
		}
		else if (size <= 256 * 1024)
		{
			return _RoundUp(size, 8*1024);
		}
		else
		{
			return _RoundUp(size, 1<<PAGE_SHIFT);
		}
	}

	/*size_t _Index(size_t bytes, size_t alignNum)
	{
		if (bytes % alignNum == 0)
		{
			return bytes / alignNum - 1;
		}
		else
		{
			return bytes / alignNum;
		}
	}*/


	//1--8   + 7  =8--15
	//9--16  + 7  =16--23
	
	//位运算写法
	static inline size_t _Index(size_t bytes, size_t align_shift)      //align_shift表示左移的转换位数
	{
		return ((bytes + (1 << align_shift) - 1) >> align_shift) - 1;
	}

	//计算映射的哪一个自由链表桶(一共5个自由链表桶)
	static inline size_t Index(size_t bytes)
	{
		assert(bytes <= MAX_BYTES);

		//每个区间有多少链
		static int group_array[4] = { 16,56,56,56 };
		if (bytes <= 128)
		{
			return _Index(bytes, 3);
		}
		else if (bytes <= 1024)
		{
			return _Index(bytes - 128, 4) + group_array[0];
		}
		else if (bytes <= 8 * 1024)
		{
			return _Index(bytes - 1024, 7) + group_array[1] + group_array[0];
		}
		else if (bytes <= 64 * 1024)
		{
			return _Index(bytes - 8*1024, 10) + group_array[2] + group_array[1] + group_array[0];
		}
		else if (bytes <= 256 * 1024)
		{
			return _Index(bytes - 64*1024, 13) + group_array[3] + group_array[2] + group_array[1] + group_array[0];
		}
		else
		{
			assert(false);
		}

		return -1;
	}
};

  需要注意的是,SizeClass类当中的成员函数最好设置为静态成员函数,否则我们在调用这些函数时就需要通过对象去调用,并且对于这些可能会频繁调用的函数,可以考虑将其设置为内联函数。
  在获取某一字节数向上对齐后的字节数时,可以先判断该字节数属于哪一个区间,然后再通过调用一个子函数进行处理。
  可以通过一般的写法通过对齐数计算出某一字节数对齐后的字节数,也可以通过位运算的方式进行计算,因为计算机执行位运算的速度是比执行乘法和除法更快。

  在获取某一字节数对应的哈希桶下标时,也是先判断该字节数属于哪一个区间,然后再通过调用一个子函数进行进一步处理。
  可以通过一般的写法根据对齐数来计算某一字节数对应的下标。为了提高效率,也可以用位运算的方法来解决,需要注意的时,此时我们并不是传入该字节数的对齐数,而是将对齐数写成2的n次方的形式后,将这个n值进行传入。比如对齐数是8,传入的就是3。

5.2.4 ThreadCache类

  按照上述的对齐规则,thread cache中桶的个数,也就是自由链表的个数是208,以及thread cache允许申请的最大内存大小256KB,我们可以将这些数据按照如下方式进行定义:

//小于等于MAX_BYTES,就找thread cache申请
//大于MAX_BYTES,就直接找page cache或者系统堆申请
static const size_t MAX_BYTES = 256 * 1024;
//thread cache和central cache自由链表哈希桶的表大小
static const size_t NFREELISTS = 208;

  现在就可以对ThreadCache类进行定义了,thread cache就是一个存储208个自由链表的数组,目前thread cache就先提供一个Allocate函数用于申请对象就行了,Deallocate函数用于释放内存对象,FetchFromCentralCache函数用户从中心缓存获取对象,ListTooLong函数用于链表过长,回收内存回到中心缓存。

class ThreadCache
{
public:
	//申请和释放内存对象
	void* Allocate(size_t size);

	void Deallocate(void* ptr, size_t size);

	//从中心缓存获取对象
	void* FetchFromCentralCache(size_t index, size_t size);

	//释放对象时,链表过长时,回收内存回到中心缓存
	void ListTooLong(FreeList& list, size_t size);

private:
	FreeList _freeLists[NFREELIST];
};

  在thread cache申请对象时,通过所给字节数计算出对应的哈希桶下标,如果桶中自由链表不为空,则从该自由链表中取出一个对象进行返回即可;但如果此时自由链表为空,那么我们就需要从central cache进行获取了,这里的FetchFromCentralCache函数和ListTooLong函数也是thread cache类中的一个成员函数,在后面再进行具体实现。

void* ThreadCache::Allocate(size_t size)      //申请内存对象
{
	assert(size <= MAX_BYTES);
	size_t alignSize = SizeClass::RoundUp(size);
	size_t index = SizeClass::Index(size);

	if (!_freeLists[index].Empty())
	{
		return _freeLists[index].Pop();
	}
	else
	{
		return FetchFromCentralCache(index, alignSize);
	}
}


void ThreadCache::Deallocate(void* ptr, size_t size)   //释放内存对象
{
	assert(ptr);
	assert(size <= MAX_BYTES);

	//找对应映射的自由链表桶,对象插入进去
	size_t index = SizeClass::Index(size);
	_freeLists[index].Push(ptr);

	//当链表长度大于一次批量申请的内存时就开始还一段list给central cache
	if (_freeLists[index].Size() >= _freeLists[index].MaxSize())
	{
		ListTooLong(_freeLists[index], size);
	}

}

5.3 threadcache TLS无锁访问

  每个线程都有一个自己独享的thread cache,那应该如何创建呢?我们不能将这个thread cache创建为全局的,因为全局变量是所有线程共享的,这样就不可避免的需要锁来控制,增加了控制成本和代码复杂度。
  要实现每个线程无锁的访问属于自己的thread cache,我们需要用到线程局部存储TLS(Thread Local Storage),这是一种变量的存储方法,使用该存储方法的变量在它所在的线程是全局可访问的,但是不能被其他线程访问到,这样就保持了数据的线程独立性。

// TLS thread local storage (线程本地存储)  
//是一种变量的存储方法,这个变量在它所在的线程内是全局可访问的,但是不能被其他线程访问到
//这样就保持了数据的线程独立性
static _declspec(thread) ThreadCache* pTLSThreadCache = nullptr;

  但不是每个线程被创建时就立马有了属于自己的thread cache,而是当该线程调用相关申请内存的接口时才会创建自己的thread cache,因此在申请内存的函数中会包含以下逻辑:

if (pTLSThreadCache == nullptr)
{
	pTLSThreadCache = new ThreadCache;
}	
return pTLSThreadCache->Allocate(size);		

六、centralcache

6.1 centralcache整体设计

  当线程申请某一大小的内存时,如果thread cache中对应的自由链表不为空,那么直接取出一个内存块进行返回即可,但如果此时该自由链表为空,那么这时thread cache就需要向central cache申请内存了。
  central cache的结构和thread cache是一样的,它们都是哈希桶的结构,并且它们遵循的对齐映射规则都是一样的。这样做的好处就是,当thread cache的某个桶中没有内存了,就可以直接到central cache中对应的哈希桶里去内存就行了。

6.1.1 central cache与thread cache的不同之处

  central cache与thread cache有两个明显不同的地方,首先,thread cache是每个线程独享的,而central cache是所有线程共享的,因为每个线程的thread cache没有内存了都会去找central cache,因此在访问central cache时是需要加锁的。
  但central cache在加锁时并不是将整个central cache全部锁上,central cahce在加锁时用的是桶锁,也就是说每个桶都有一个锁。此时只有当多个线程同时访问central cache的同一个桶时才会存在锁竞争,如果是多个线程同时访问central cache的不同桶就不会存在锁竞争。
  central cache与thread cache的第二个不同之处就是,thread cache的每个桶挂的是一个个切好的内存块,而central cache的每个桶中挂的是一个个的span。central cache也是一个哈希桶结构,它的哈希桶的映射关系跟thread cache是一样的。不同的是它的每个哈希桶位置挂的是SpanList链表结构,不过每个映射桶下面的span中的大内存块按映射关系被切成了一个个小内存块对象挂在span的自由链表中。

在这里插入图片描述
  每个span管理的都是一个以页为单位的大块内存,每个桶里面的若干span是按照双链表的形式链接起来的,并且每个span里面还有一个自由链表,这个自由链表里面挂的就是一个个切好了的内存块,根据其所在的哈希桶这些内存块被切成了对应的大小。

6.2 centralcache结构设计

6.2.1 页号的类型

  每个程序运行起来后都有自己的进程地址空间,在32位平台下,进程地址空间的大小是 2 32 2^{32} 232;而在64位平台下,进程地址空间的大小就是 2 64 2^{64} 264
  页的大小一般是4K或者8K,我们以8K为例。在32位平台下,进程地址空间就可以被分成 2 32 2^{32} 232÷ 2 13 2^{13} 213= 2 19 2^{19} 219个页;在64位平台下,进程地址空间就可以被分成 2 64 2^{64} 264÷ 2 13 2^{13} 213= 2 51 2^{51} 251个页。页号本质与地址是一样的,它们都是一个编号,只不过地址是以一个字节为一个单位,而页是以多个字节为一个单位。
  由于页号在64位平台下的取值范围是[0, 2 51 2^{51} 251),因此我们不能简单的用一个无符号整型来存储页号,这时我们需要借助条件编译来解决这个问题。

#ifdef _WIN64
	typedef unsigned long long PAGE_ID;
#elif _WIN32
	typedef size_t PAGE_ID;
#else
	// linux
#endif  

  注意:在32位平台下,_WIN32有定义,_WIN64没有定义;而在64位平台下,_WIN32和_WIN64都有定义。因此在条件编译时,我们应该先判断_WIN64是否有定义,再判断_WIN32是否有定义。

6.2.2 span的结构

  central cache的每个桶挂的是一个个的span,span是一个管理以页为单位的大块内存,span的结构如下:

//管理多个连续页大块内存跨度结构
struct Span
{
	PAGE_ID _pageId=0;   //大块内存起始页的页号
	size_t _n=0;         //页的数量

	Span* _next=nullptr;       //双向链表的结构
	Span* _prev=nullptr;   

	size_t _objSize = 0;    //切好的小对象的大小
	size_t _useCount=0;  //切好的小块内存,被分配给thread cache的计数
	void* _freeList=nullptr;   //切好的小块内存的自由链表

	bool _isUse = false;      //是否在被使用
};

  对于span管理的以页为单位的大块内存,我们需要知道这块内存具体在哪一个位置,便于之后page cache进行前后页的合并,因此span结构当中会记录所管理大块内存起始页的页号。
  至于每一个span管理的到底是多少个页,这并不是固定的,需要根据多方面的因素来控制,因此span结构当中有一个_n成员,该成员就代表着该span管理的页的数量。
  此外,每个span管理的大块内存,都会被切成相应大小的内存块挂到当前span的自由链表中,比如8Byte哈希桶中的span,会被切成一个个8Byte大小的内存块挂到当前span的自由链表中,因此span结构中需要存储切好的小块内存的自由链表。
  span结构当中的_useCount成员,记录的就是当前span中切好的小块内存,被分配给thread cache的计数,当某个span的_useCount计数变为0时,代表当前span切出去的内存块对象全部还回来了,此时central cache就可以将这个span再还给page cache。
  每个桶当中的span是以双链表的形式组织起来的,当我们需要将某个span归还给page cache时,就可以很方便的将该span从双链表结构中移出。如果用单链表结构的话就比较麻烦了,因为单链表在删除时,需要知道当前结点的前一个结点。

6.2.3 双链表结构

  central cache的每个哈希桶里面存储的都是一个双链表结构,对于该双链表结构我们可以对其进行封装。

申请内存
1、当thread cache中没有内存时,就会批量向central cache申请一些内存对象,这里的批量获取对象的数量使用了类似网络tcp协议拥塞控制的慢开始算法;central cache也有一个哈希映射的SpanList,SpanList中挂着span,从span中取出对象给thread cache,这个过程是需要加锁的,不过这里使用的是一个桶锁,尽可能提高效率。
2、central cache映射的SpanList中所有span都没有内存以后,则需要向page cache申请一个新的span对象,拿到span以后将span管理的内存块按大小切好作为自由链表链接到一起。然后从span中取对象给thread cache。
3、central cache中挂的span中的_useCount记录分配了多少个对象出去,分配一个对象给thread cache,就++_useCount

释放内存
1、当thread cache过长或者线程销毁,则会将内存释放回central cache中,释放回来时- -_useCount。当_useCount减到0时则表示所有对象都回到了span,则将span释放回page cache,page cache中会对前后相邻的空闲页进行合并。

//带头双向循环链表
class SpanList
{
public:
	SpanList()
	{
		_head = new Span;
		_head->_next = _head;
		_head->_prev = _head;
	}

	void Insert(Span* pos, Span* newSpan)
	{
		assert(pos);
		assert(newSpan);

		Span* prev = pos->_prev;
		//prev newspan pos
		prev->_next = newSpan;
		newSpan->_prev = prev;
		newSpan->_next = pos;
		pos->_prev = newSpan;
	}

	void Erase(Span* pos)
	{
		assert(pos);
		assert(pos != _head);
		Span* prev = pos->_prev;
		Span* next = pos->_next;

		prev->_next = next;
		next->_prev = prev;
	}

private:
	Span* _head;
public:
	std::mutex _mtx;  //桶锁
};

  注意:从双链表删除的span会还给下一层的page cache,相当于只是把这个span从双链表中移除,因此不需要对删除的span进行delete操作。

6.2.4 central cache的结构

  central cache的映射规则和thread cache是一样的,因此central cache里面哈希桶的个数也是208,但central cache每个哈希桶中存储的就是我们上面定义的双链表结构。

class CentralCache
{
public:
	//...
private:
	SpanList _spanLists[NFREELISTS];
};

  central cache和thread cache的映射规则一样,有一个好处就是,当thread cache的某个桶没有内存了,就可以直接去central cache对应的哈希桶进行申请就行了。

6.3 centralcache核心实现

6.3.1 central cache的实现方式

  每个线程都有一个属于自己的thread cache,我们是用TLS来实现每个线程无锁的访问属于自己的thread cache。而central cache和page cache在整个进程中只有一个,对于这种只能创建一个对象的类,我们可以将其设置为单例模式。
  单例模式可以保证系统中该类只有一个实例对象,并提供一个访问它的全局访问函数,该实例被所有程序模块共享。单例模式又分为饿汉模式和懒汉模式。懒汉模式相对较复杂,这里使用饿汉模式就足够了。

//单例模式---饿汉模式 (程序启动时创建一个唯一的实例对象)
class CentralCache
{
public:
	static CentralCache* GetInstance()
	{
		return &_sInst;
	}

private:
	SpanList _spanLists[NFREELIST];

private:
	CentralCache()
	{}

	CentralCache(const CentralCache&) = delete;

	static CentralCache _sInst;
};

  为了保证CentralCache类只能创建一个对象,我们需要将central cache的构造函数和拷贝构造函数设置为私有,或者在C++11中也可以在函数声明的后面加上=delete进行修饰。
  CentralCache类当中还需要有一个CentralCache类型的静态成员变量,当程序运行起来后就立马创建该对象,在此后的程序中就只有这一个单例了。
  最后central cache还需要提供一个公有的成员函数,用于获取该对象,此时在整个进程中就只会有一个central cache对象了。

6.3.2 慢开始反馈调节算法

  当thread cache向central cache申请内存时,central cache应该给出多少个对象呢?如果central cache给的太少,那么thread cache在短时间内用完了又会来申请;但如果一次性给的太多了,可能thread cache用不完也就浪费了。
  鉴于此,我们这里采用了一个慢开始反馈调节算法。当thread cache向central cache申请内存时,如果申请的是较小的对象,那么可以多给一点,但如果申请的是较大的对象,就可以少给一点。
  通过下面的函数,我们就可以根据所需申请的对象的大小计算出具体给出的对象个数,并且可以将给出的对象个数控制到2~512个之间。也就是说,就算thread cache要申请的对象再小,最多一次性给出512个对象;就算thread cache要申请的对象再大,最少一次性给出2个对象。

class SizeClass
{
public:
	//一次thread cache从中心缓存获取多少个对象
	static size_t NumMoveSize(size_t size)
	{
		/*if (size == 0)
			return 0;*/

		assert(size > 0);

		//[2,512],一次批量移动多少个对象的(慢启动)上限值
		//小对象一次批量上限高
		//小对象一次批量上限低 
		int num = MAX_BYTES / size;
		if (num < 2)
			num = 2;

		if (num > 512)
			num = 512;

		return num;
	}
};

  但就算申请的是小对象,一次性给出512个也是比较多的,基于这个原因,我们可以在FreeList结构中增加一个叫做_maxSize的成员变量,该变量的初始值设置为1,并且提供一个公有成员函数用于获取这个变量。也就是说,现在thread cache中的每个自由链表都会有一个自己的_maxSize。

class FreeList
{
public:
	size_t& MaxSize()
	{
		return _maxSize;
	}
private:
	void* _freeList=nullptr;
	size_t _maxSize = 1;
};

  此时当thread cache申请对象时,我们会比较_maxSize和计算得出的值,取出其中的较小值作为本次申请对象的个数。此外,如果本次采用的是_maxSzie的值,那么还会将thread cache中该自由链表的_maxSize的值进行加一。
  因此,thread cache第一次向central cache申请某大小的对象时,申请到的都是一个,但下一次thread cache再向central cache申请同样大小的对象时,因为该自由链表中的_maxSIze增加了,最终就会申请到两个。直到该自由链表中_maxSIze的值,增长到超过计算出的值后就不会继续增长了,此后申请到的对象个数就是计算出的个数。

6.3.3 从中心缓存获取对象

  每次thread cache向central cache申请对象时,先通过慢开始反馈调节算法计算出本次应该申请的对象的个数,然后再向central cache进行申请。
  如果thread cache最终申请到对象的个数就是一个,那么直接将该对象返回即可。**为什么需要返回一个申请到的对象呢?**因为thread cache要向central cache申请对象,其实由于某个线程向thread cache申请对象但thread cache当中没有,这才导致thread cahe要向central cache申请对象。因此central cache将对象返回给thread cache后,thread cache会再将该对象返回给申请对象的线程。
  但如果thread cache最终申请到的是多个对象,那么除了将第一个对象返回之外,还需要将剩下的对象挂到thread cache对应的哈希桶当中。

void* ThreadCache::FetchFromCentralCache(size_t index, size_t size)
{
	//慢开始反馈调节算法
	//1、最开始不会一次向central cache一次批量要太多,因为要太多了可能用不完
	//2、如果你不断有这个size大小内存需求,那么batchNum就会不断增长,直到上限
	//3、size越大,一次向central cache要的batchNum就越小
	//4、size越小,一次向central cache要的batchNum就越大
	size_t batchNum = min(_freeLists[index].MaxSize(), SizeClass::NumMoveSize(size));
	if (_freeLists[index].MaxSize() == batchNum)
	{
		_freeLists[index].MaxSize() += 1;
	}

	void* start = nullptr;
	void* end = nullptr;
	size_t actualNum=CentralCache::GetInstance()->FetchRangeObj(start, end, batchNum, size);
	assert(actualNum > 0);

	if (actualNum == 1)
	{
		assert(start == end);
		return start;
	}
	else
	{
		_freeLists[index].PushRange(NextObj(start), end,actualNum-1);
		return start;
	}
	
}

6.3.4 从中心缓存获取一定数量对象

  这里我们要从central cache获取n个指定大小的对象,这些对象肯定都是从central cache对应哈希桶的某个span中取出来的,因此取出来的这n个对象是链接在一起的,我们只需要得到这段链表的头和尾即可,这里采用输出型参数进行获取。

//从中心缓存获取一定数量的对象给thread cache
size_t CentralCache::FetchRangeObj(void*& start, void*& end, size_t batchNum, size_t size)
{
	size_t index = SizeClass::Index(size);
	_spanLists[index]._mtx.lock();

	Span* span = GetOneSpan(_spanLists[index], size);
	assert(span);
	assert(span->_freeList);

	//从span中获取batchNum个对象
	//如果不够batchNum个,有多少拿多少
	start = span->_freeList;
	end = start;
	size_t i = 0;
	size_t actualNum = 1;
	while (i < batchNum - 1&&NextObj(end)!=nullptr)
	{
		end = NextObj(end);
		++i;
		++actualNum;
	}
	span->_freeList = NextObj(end);
	NextObj(end) = nullptr;
	span->_useCount += actualNum;

	_spanLists[index]._mtx.unlock();
	return actualNum;
}

  由于central cache是所有线程共享的,所以我们在访问central cache中的哈希桶时,需要先给对应的哈希桶加上桶锁,在获取到对象后面再将桶锁解掉。
  在向central cache获取对象时,先是在central cache对应的哈希桶中获取到一个非空的span,然后从这个span的自由链表中取出n个对象即可,但可能这个非空的span自由链表当中对象的个数不足n个,这时该自由链表当中有多少个对象就给多少个对象就行了。
  也就是说,thread cache实际从central cache获得的对象个数可能与我们传入的n值是不一样的,因此我们需要统计本次申请过程中,实际thread cache获取到的对象个数,然后根据该值及时更新这个span的小对象被分配给thread cache的计数。
  注意:虽然我们实际申请到对象的个数可能比n要小,但这并不会产生任何影响。因为thread cache的本意就是向centreal cache申请一个对象,我们之所以要一次多申请一些对象,是因为这样一来下次线程再申请相同大小的对象时就可以直接在thread cache里面获取了,而不用再向central cahe申请对象。

6.3.5 插入或删除一段范围的自由链表对象

  此外,如果thread cache最终从central cache获取到的对象个数是大于一的,那么我们还需要将剩下的对象插入到thread cache中对应的哈希桶中,为了能够让自由链表支持插入一段范围的对象或删除一段范围的对象,我们还需要在FreeList类中增加一个对应的成员函数。

//管理切分好的小对象的自由链表
class FreeList
{
public:
	void PushRange(void* start, void* end,size_t n)  //(多个对象的插入)
	{
		NextObj(end) = _freeList;
		_freeList = start;

		_size += n;
	}

	void PopRange(void*& start, void*& end, size_t n)
	{
		assert(n <= _size);
		start = _freeList;
		end = start;

		for (size_t i = 0; i < n - 1; ++i)
		{
			end = NextObj(end);
		}

		_freeList = NextObj(end);
		NextObj(end) = nullptr;
		_size -= n;
	}
private:
	void* _freeList=nullptr;
	size_t _maxSize = 1;
	size_t _size=0;
};

七、pagecache

7.1 pagecache整体设计

page cache与central cache结构的相同之处
1、page cache与central cache一样,它们都是哈希桶的结构,并且page cache的每个哈希桶中挂的也是一个个的span,这些span也是按照双链表的结构链接起来的。

page cache与central cache结构的不同之处
1、首先,central cache的映射规则与thread cache保持一致,而page cache的映射规则与它们都不相同。page cache的哈希桶映射规则采用的是直接定址法,比如,1号桶挂的都是1页的span,2号桶挂的都是2页的span,以此类推。
2、其次,central cache每个桶中的span被切成了一个个对应大小的对象,以供thread cache申请。而page cache当中的span是没有被进一步切小的,因为page cache服务的是central cache,当central cache没有span时,向page cache申请的是某一固定页数的span,而如何切分申请到的这个span就应该由central cache自己来决定。

在这里插入图片描述
  至于page cache当中究竟有多少个桶,这就要看你最大想挂几页的span了,这里我们就最大挂128页的span,为了让桶号与页号对应起来,我们可以将第0号桶空出来不用,因此我们需要将哈希桶的个数设置尾129。

//page cache中哈希桶的个数
static const size_t NPAGES = 129;

  为什么这里最大挂128页的span呢?因为线程申请单个对象最大是256KB,而128页可以被切分成4个256KB的对象,因此是足够的。当然,如果你想在page cache中挂更大的span也是可以的,根据具体的需求进行设置就行了。

7.1.1 在page cache获取一个n页的span过程

  如果central cache要获取一个n页的span,那我们就可以在page cache的第n号桶中取出一个span返回给central cache即可,但如果第n号桶中没有span了,这时我们并不是直接转向堆申请一个n页的span,而是继续在后面的桶当中寻找span。
  直接向堆申请以页为单位的内存时,我们应该尽量申请大块一点的内存块,因为此时申请到的内存是连续的,当线程需要内存时我们可以将其切小后分配给线程,而当线程将内存释放后我们又可以将其合并成大块的连续内存。如果我们向堆申请内存时是小块小块的申请的,那么我们申请到的内存就不一定是连续的了。
  因此,当第n号桶中没有span时,我们可以继续找第n+1号桶,因为我们将n+1页的span切分成一个n页的span和一个1页的span,这时我们就可以将n页的span返回,而将切分后1页的span挂到1号桶中。但如果后面的桶中都没有span,这时我们就只能向堆申请一个128页的内存块,并将其用一个span结构管理起来,然后将128页的span切分成n页的span和128-n页的span,其中n页的span返回给central cache,而128-n页的span挂到第128-n号桶中。
  也就是说,我们每次向堆申请的都是128页大小的内存块,central cache要的这些span实际都是由128页的span切分出来的。

7.1.2 page cache的实现方式

  当每个线程的thread cache没有内存时都会向central cache申请,此时多个线程的thread cache如果访问的不是central cache的同一个桶,那么这些线程是可以同时进行访问的。这时central cache的多个桶就可能同时向page cache申请内存的,所以page cache也是存在线程安全问题的,因此在访问page cache时也必须要加锁。
  但是在page cache这里我们不能使用桶锁,因为当central cache向page cache申请内存时,page cache可能会将其他桶当中大页的span切小后再给central cache。此外,当central cache将某个span归还给page cache时,page cache也会尝试将该span与其他桶当中的span进行合并。
  也就是说,在访问page cache时,我们可能需要访问page cache中的多个桶,如果page cache用桶锁就会出现大量频繁的加锁和解锁,导致程序的效率低下。因此我们在访问page cache时没有使用桶锁,而是用一个大锁将整个page cahe给锁住。
  而thread cache在访问central cache时,只需要访问central cache中对应的哈希桶就行了,因为central cache的每个哈希桶中的span都被切分成了对应大小,thread cache只需要根据自己所需对象的大小访问central cache中对应的哈希桶即可,不会访问其他哈希桶,因此central cache可以用桶锁。
  此外,page cache在整个进程中也是只能存在一个的,因此我们也需要将其设置为单例模式。

//单例模式---饿汉模式 (程序启动时创建一个唯一的实例对象)
class PageCache
{
public:
	static PageCache* GetInstance()
	{
		return &_sInst;
	}

private:
	PageCache()
	{}

	PageCache(const PageCache&) = delete;

	static PageCache _sInst;
};

  当程序运行起来后我们就立马创建该对象即可。

7.2 pagecache中获取Span

申请内存
1、当central cache向page cache申请内存时,page cache先检查对应位置有没有span,如果没有则向更大页寻找一个span,如果找到则分裂成两个。比如:申请的是4页page,4页page后面没有挂span,则向后面寻找更大的span,假设在10页page位置找到一个span,则将10页page span分裂为一个4页page span和一个6页page span。
2、如果找到_SpanList[128]都没有合适的span,则向系统使用mmap、brk或者是VirtualAlloc等方式申请128页page span挂在自由链表中,再重复1中的过程。
3、需要注意的是,central cache和page cache的核心结构都是SpanList的哈希桶,但是它们是有本质区别的,central cache中哈希桶,是按照thread cache一样的大小对齐关系映射的,它的SpanList中挂的span中的内存都按照映射关系被切好链接成小块内存的自由链表。而page cache中的SpanList则是按下标桶号映射的,也就是说第i号桶中挂的span都是第i页的内存。

释放内存
1、如果central cache释放回一个span,则一次寻找span的前后page id没有使用的空闲span,看是否可以合并,如果合并就继续向前寻找。这样就可以将切小的内存合并收缩成大的span,减少内存碎片

7.2.1 获取一个非空的span

  thread cache向central cache申请对象时,central cache需要先从对应的哈希桶中获取到一个非空的span,然后从这个非空的span中取出若干对象返回给thread cache。那central cache到底是如何从对应的哈希桶中,获取到一个非空的span的呢?
  首先当然是先遍历central cache对应哈希桶当中的双链表,如果该双链表中有非空的span,那么直接将该span进行返回即可。为了方便遍历这个双链表,我们可以模拟迭代器的方式,给SpanList类提供Begin和End成员函数,分别用于获取双链表中的第一个span和最后一个span的下一个位置,也就是头结点。

class SpanList
{
public:
	Span* Begin()
	{
		return _head->_next;
	}

	Span* End()
	{
		return _head;
	}
private:
	Span* _head;
public:
	std::mutex _mtx;  //桶锁
};

  但如果遍历双链表后发现双链表中没有span,或该双链表中的span都为空,那么此时central cache就需要向page cache申请内存块了。
  那具体是向page cache申请多大的内存块呢?我们可以根据所需对象的大小来决定,就像之前我们根据对象的大小计算出,thread cache一次向central cache申请对象的个数上限,现在我们是根据对象的大小计算出,central cache一次应该向page cache申请几页的内存块。
  我们可以先根据对象的大小计算出,thread cache一次向central cache申请对象的个数上限,然后将这个上限值乘以单个对象的大小,就算出了具体需要多少字节,最后再将这个算出来的字节数转换为页数,如果转换后不够一页,那么我们就申请一页,否则转换出来是几页就申请几页。也就是说,central cache向page cache申请内存时,要求申请到的内存尽量能够满足thread cache向central cache申请时的上限。

class SizeClass
{
public:	
	//计算一次向系统获取几个页
	//单个对象 8byte
	//...
	//单个对象 256KB
	static size_t NumMovePage(size_t size)
	{
		size_t num = NumMoveSize(size);
		size_t npage = num * size;

		npage >>= PAGE_SHIFT;    //*8K就是右移13位
		if (npage == 0)
			npage = 1;

		return npage;
	}
};

  代码中的PAGE_SHIFT代表页大小转换偏移,我们这里以页的大小为8K为例,PAGE_SHIFT的值就是13。

//页大小转换偏移,即一页定义为2^13,也就是8KB
static const size_t PAGE_SHIFT = 13;

  注意:当central cache申请到若干页的span后,还需要将这个span切成一个个对应大小的对象挂到该span的自由链表当中。
  如何找到一个span所管理的内存块呢?首先需要计算出该span的起始地址,我们可以用这个span的起始页号乘以一页的大小即可得到这个span的起始地址,然后用这个span的页数乘以一页的大小就可以得到这个span所管理的内存块的大小,用起始地址加上内存块的大小即可得到这块内存块的结束位置。
  明确了这块内存的起始和结束位置后,我们就可以进行切分了。根据所需对象的大小,每次从大块内存切出一块固定大小的内存块尾插到span的自由链表中即可。
  为什么是尾插呢?因为我们如果是将切好的对象尾插到自由链表,这些对象看起来是按照链式结构链接起来的,而实际它们在物理上是连续的,这时当我们把这些连续内存分配给某个线程使用时,可以提高该线程的CPU缓存利用率。

//获取一个非空的span
Span* CentralCache::GetOneSpan(SpanList& list, size_t size)
{
	//查看当前的spanlist中是否还有未分配对象的span
	Span* it = list.Begin();
	while (it != list.End())
	{
		if (it->_freeList != nullptr)
		{
			return it;
		}
		else
		{
			it = it->_next;
		}
	}

	//先把central cache的桶锁解掉,这样如果其他线程释放内存对象回来,不会阻塞
	list._mtx.unlock();

	//走到这里说明没有空闲span了,只能找page cache要
	PageCache::GetInstance()->_pageMtx.lock();
	Span* span=PageCache::GetInstance()->NewSpan(SizeClass::NumMovePage(size));
	
	span->_isUse = true;
	span->_objSize = size;

	PageCache::GetInstance()->_pageMtx.unlock();

	//对获取span进行切分,不需要加锁,因为这会其他线程访问不到这个span

	//计算span的大块内存的起始地址和大块内存的大小(字节数)
	char* start = (char*)(span->_pageId << PAGE_SHIFT);
	size_t bytes = span->_n << PAGE_SHIFT;
	char* end = start + bytes;

	//把大块内存切成自由链表链接起来
	//1、先切一块下来去做头,方便尾插
	span->_freeList = start;
	start += size;
	void* tail = span->_freeList;

	int i = 1;
	while (start < end)
	{
		++i;
		NextObj(tail) = start;
		tail = NextObj(tail);    //tail=start;
		start += size;
	}

	NextObj(tail) = nullptr;

	//切好span以后,需要把span挂到桶里面去的时候,再加锁
	list._mtx.lock();
	list.PushFront(span);

	return span;
}

  注意:当我们把span切好后,需要将这个切好的span挂到central cache的对应哈希桶中。因此SpanList类还需要提供一个接口,用于将一个span插入到该双链表中。这里我们选择头插,这样当central cache下一次从该双链表中获取非空span时,一来就能找到。
  由于SpanList类之前实现了Insert、Erase和Begin、End函数,这里实现双链表头插和头删就非常简单,直接在双链表的Begin位置进行Insert和Erase即可。

//带头双向循环链表
class SpanList
{
public:
	void PushFront(Span* span)
	{
		Insert(Begin(), span);
	}

	Span* PopFront()
	{
		Span* front = _head->_next;
		Erase(front);
		return front;
	}
private:
	Span* _head;
public:
	std::mutex _mtx;  //桶锁
};

7.2.2 获取一个k页的span

  当调用上述的GetOneSpan从central cache的某个哈希桶获取一个非空的span时,如果遍历哈希桶中的双链表后发现双链表中没有span,或该双链表中的span都为空,那么此时central cache就需要向page cache申请若干页的span了。那么如何从page cache获取一个k页的span呢
  因为page cache是直接按照页数进行映射的,因此我们要从page cache获取一个k页的span,就应该直接先去找page cache的第k号桶,如果第k号桶中有span,那我们直接头删一个span返回给central cache就行了。所以这里需要再给SpanList类添加对应的Empty和PopFront函数。

//带头双向循环链表
class SpanList
{
public:
	bool Empty()
	{
		return _head == _head->_next;
	}
	Span* PopFront()
	{
		Span* front = _head->_next;
		Erase(front);
		return front;
	}
private:
	Span* _head;
public:
	std::mutex _mtx; //桶锁
};

  如果page cache的第k号桶中没有span,我们就应该继续找后面的桶,只要后面任意一个桶中有一个n页的span,我们就可以将其切分成一个k页的span和一个n-k页的span,然后将切出来k页的span返回给central cache,再将n-k页的span挂到page cache的第n-k号桶即可。
  但如果后面的桶中都没有span,此时我们就需要向堆申请一个128页的span了,在向堆申请内存时,直接调用我们封装的SystemAlloc函数即可。
  注意:向堆申请内存后得到的是这块内存的起始地址,此时我们需要将该地址转换为页号。由于我们向堆申请内存时都是按页进行申请的,因此我们直接将该地址除以一页的大小即可得到对应的页号。

//获取一个K页的span
Span* PageCache::NewSpan(size_t k)
{
	//assert(k > 0 && k < NPAGES);
	assert(k > 0);

	//大于128 page的直接向堆申请
	if (k > NPAGES - 1)
	{
		void* ptr = SystemAlloc(k);
		Span* span = new Span;

		span->_pageId = (PAGE_ID)ptr >> PAGE_SHIFT;
		span->_n = k;

		_idSpanMap[span->_pageId] = span;

		return span;
	}


	//先检查第k个桶里面有没有span
	if (!_spanLists[k].Empty())
	{
		return _spanLists[k].PopFront();

		//建立id和span的映射,方便central cache回收小块内存时,查找对应的span
		for (PAGE_ID i = 0; i < kSpan->_n; ++i)
		{
			_idSpanMap[kSpan->_pageId + i] = kSpan;
		}

		return kSpan;
	}

	//检查一下后面的桶里面有没有span,如果有可以把它进行切分
	for (size_t i = k + 1; i < NPAGES; ++i)
	{
		if (!_spanLists[i].Empty())
		{
			Span* nSpan = _spanLists[i].PopFront();
			Span* kSpan = new Span;

			//在nSpan的头部切一个k页下来
			kSpan->_pageId = nSpan->_pageId;
			kSpan->_n = k;

			nSpan->_pageId += k;
			nSpan->_n -= k;

			_spanLists[nSpan->_n].PushFront(nSpan);
			//存储nSpan的首尾页码跟nSpan映射,
			//方便page cache回收内存时进行的合并查找
			_idSpanMap[nSpan->_pageId] = nSpan;

			//1000  + 5-1=1004
			_idSpanMap[nSpan->_pageId + nSpan->_n - 1] = nSpan;

			//建立id和span的映射,方便central cache回收小块内存时,查找对应的span
			for (PAGE_ID i = 0; i < kSpan->_n; ++i)
			{
				_idSpanMap[kSpan->_pageId + i] = kSpan;
			}

			return kSpan;
		}
	}

	//走到这个位置就说明后面没有大页的span了
	//这是就去找堆要一个128页的span
	Span* bigSpan = new Span;

	void* ptr = SystemAlloc(NPAGES - 1);
	bigSpan->_pageId = (PAGE_ID)ptr >> PAGE_SHIFT;
	bigSpan->_n = NPAGES - 1;

	_spanLists[bigSpan->_n].PushFront(bigSpan);

	return NewSpan(k);
}

  说明:当我们向堆申请到128页的span后,需要将其切分成k页的span和128-k页的span,但是为了尽量避免出现重复的代码,我们最好不要再编写对应的切分代码。我们可以先将申请到的128页的span挂到page cache对应的哈希桶中,然后再递归调用该函数就行了,此时再往后找span时就一定会在第128号桶中找到该span,然后进行切分。
  当central cache向page cache申请内存时,central cache对应的哈希桶是处于加锁状态的,那么访问page cache之前我们应不应该把central cache对应的桶锁解掉呢
  这里建议在访问page cache前,先把central cache对应的桶锁解掉。虽然此时central cache的这个桶当中是没有内存供其他thread cache申请的,但thread cachec除了申请内存还会释放内存,如果在访问page cache前将central cache对应的桶锁解掉,那么此时当其他thread cache想要归还内存到central cache的这个桶时就不会被阻塞。
  因此在调用NewSpan函数之前,我们需要先将central cache对应的桶锁解掉,然后再将page cache的大锁加上,当申请到k页的span后,我们需要将page cache的大锁解掉,但此时我们不需要立刻获取到central cache中对应的桶锁。因为central cache拿到k页的span后还会对其进行切分操作,因此我们可以在span切好后需要将其挂到central cache对应的桶上时,再获取对应的桶锁。具体逻辑在之前实现的GetOneSpan函数中已经体现。

八、申请内存过程联调

8.1 ConcurrentAlloc函数

  在将thread cache、central cache以及page cache的申请流程写通了之后,我们就可以向外提供一个ConcurrentAlloc函数,用于申请内存块。每个线程第一次调用该函数时会通过TLS获取到自己专属的thrad cache对象,然后每个线程就通过自己对应的thread cache申请对象了。

static void* ConcurrentAlloc(size_t size)
{
	//通过TLS,每个线程无锁的获取自己专属的ThreadCache对象
	if (pTLSThreadCache == nullptr)
	{
		pTLSThreadCache = new ThreadCache;
	}
	return pTLSThreadCache->Allocate(size);
}

  注意:在编译时,C++的algorithm头文件中有一个min函数,这是一个函数模板,而在Windows.h头文件中也有一个min,这是一个宏。由于调用函数模板时需要进行参数类型的推演,因此当我们调用min函数时,编译器会优先匹配Windows.h当中以宏的形式实现的min,此时当我们以std::min的形式调用min函数时就会产生报错,这就是没有用命名空间进行封装的坏处,这时我们只能选择将std::去掉,让编译器调用Windows.h当中的min。

8.2 申请内存过程联调测试(上)

  由于在多线程场景下调试观察起来非常麻烦,这里就先不考虑多线程场景,看看在单线程场景下代码的执行逻辑是否符合我们的预期。其次,我们这里只简单观察在一个桶当中的内存申请就行了。

void TestConcurrentAlloc1()
{
	void* p1 = ConcurrentAlloc(6);
	void* p2 = ConcurrentAlloc(8);
	void* p3 = ConcurrentAlloc(1);
	void* p4 = ConcurrentAlloc(7);
	void* p5 = ConcurrentAlloc(8);

	cout << p1 << endl;
	cout << p2 << endl;
	cout << p3 << endl;
	cout << p4 << endl;
	cout << p5 << endl;
}

在这里插入图片描述

  申请的内存字节数最终都对齐到了8,此时当线程申请内存时就只会访问到thread cache的第0号桶。当线程第一次申请内存时,该线程需要通过TLS获取到自己专属的thread cache对象,然后通过这个thread cache对象进行内存申请。
  在申请内存时通过计算索引到了thread cache的第0号桶,但此时thread cache的第0号桶中是没有对象的,因此thread cache需要向central cache申请内存块。
  在向central cache申请内存块前,首先通过NumMoveSize函数计算得出,thread cache一次最多可向central cache申请8字节大小对象的个数是512,但由于我们采用的是慢开始算法,因此还需要将上限值与对应自由链表的_maxSize的值进行比较,而此时对应自由链表_maxSize的值是1,所以最终得出本次thread cache向central cahe申请8字节对象的个数是1个。
  并且在此之后会将该自由链表中_maxSize的值进行自增,下一次thread cache再向central cache申请8字节对象时最终申请对象的个数就会是2个了。
  在thread cache 向central cache申请对象之前,需要先将central ache的0号桶的锁加上,然后再从该桶获取一个非空的span。
  在central cache的第0号桶获取非空span时,先遍历对应的span双链表,看看有没有非空的span,但此时肯定是没有的,因此在这个过程中我们无法找到一个非空的span。
  那么此时central cache就需要向page cache申请内存了,但在此之前需要先把central cahe第8号桶的锁解掉,然后再将page cache的大锁给加上,之后才能向page cache申请内存。
  在向page cache申请内存时,由于central cache一次给thread cache8字节对象的上限是512,对应就需要4096字节,所需字节数不足一页就按一页算,所以这里central cache就需要向page cache申请一页的内存块。
  但此时page cache的第1个桶以及之后的桶当中都是没有span的,因此page cache需要直接向堆申请一个128页的span。
  现在将申请到的128页的span插入到page cache的第128号桶中,然后再调用一次NewSpan,在这次调用的时候,虽然在1号桶当中没有span,但是在往后找的过程中就一定会在第128号桶找到一个span。
  此时我们就可以把这个128页的span拿出来,切分成1页的span和127页的span,将1页的span返回给central cache,而把127页的span挂到page cache的第127号桶即可。
  从page cache返回后,就可以把page cache的大锁解掉了,但紧接着还要将获取到的1页span进行切分,因此这里没有立刻重新加上central cache对应的桶锁。
  在进行切分的时候,先通过该span的起始页号得到该span起始地址,然后通过该span的页数得到该span所管理内存块的总字节数。
  在确定内存块的开始和结束后,就可以将其切分成一个个8字节大小的对象挂到该span的自由链表中。在调试过程中通过内存监视窗口可以看到,切分出来的每个8字节大小的对象的前4个字节存储的都是下一个8字节对象的起始地址。
  当切分结束后再获取central cached第0号桶的桶锁,然后将这个切好的span插入到central cache的第0号桶中,最后再将这个非空的span返回,此时就获取到了一个非空的span。
  由于thread cache只向central cache上申请了一个对象,因此拿到这个非空的span后,直接从这个span里面取出一个对象即可,此时该span的_useCount也由0变成了1。
  由于此时thread cache实际只向central cache申请到了一个对象,因此直接将这个对象返回给线程即可。
  当线程第二次申请内存块就不会再创建thread cache了,直接获取到对应的thread cache进行内存块申请就行了,重复以上步骤,不同的是,central cache和page cache中已经有内存了,不用再向堆申请了,提高了效率。

8.3 申请内存过程联调测试(下)

  为了进一步测试代码的正确性,让线程申请1024次8字节的对象,然后通过调试观察在第1025次申请时,central cache是否会再向page cache申请内存块

//测试申请释放1024个8byte内不等的几个对象的申请,这1024次申请把第一个页分完了
//打断点,通过调试观察第1025次申请,在page cache去切一个新span,验证前1024次申请和切分逻辑
void TestConcurrentAlloc2()
{
	for (size_t i = 0; i < 1024; ++i)
	{
		void* p1 = ConcurrentAlloc(6);
		cout << p1 << endl;
	}

	void* p2 = ConcurrentAlloc(8);
	cout << p2 << endl;
}

  因为central cache第一次就是向page cache申请的一页内存,这一页内存被切成了1024个8字节大小的对象,当这1024个对象全部被申请之后,再申请8字节大小的对象时central cache当中就没有对象了,此时就应该像page cache申请内存块。
  在第1025次申请8字节大小的对象时,central cache第0号桶中的这个span的_useCount已经增加到了1024,也就是说这1024个对象都已经被线程申请了,此时central cache就需要再向page cache申请一页的span来进行切分了。
  而这次central cache在向page cache申请一页的内存时,page cache就是将127页span切分成了1页的span和126页的span了,然后central cache拿到这1页的span后,又会将其切分成1024块8字节大小的内存块以供thread cache申请。

九、threadcache回收内存

  当某个线程申请的对象不用了,可以将其释放给thread cache,然后thread cache将该对象插入到对应哈希桶的自由链表当中即可。
  但是随着线程不断的释放,对应自由链表的长度也会越来越长,这些内存堆积在一个thread ache中就是一种浪费,应该将这些内存还给central cache,这样一来,这些内存对其他线程来说也是可以申请的,因此当thread cache某个桶当中的自由链表太长时我们可以进行一些处理。
  如果thread cache某个桶当中自由链表的长度超过它一次批量向central cache申请的对象个数,那么此时我们就要把该自由链表当中的这些对象还给central cache。

void ThreadCache::Deallocate(void* ptr, size_t size)   //释放内存对象
{
	assert(ptr);
	assert(size <= MAX_BYTES);

	//找对应映射的自由链表桶,对象插入进去
	size_t index = SizeClass::Index(size);
	_freeLists[index].Push(ptr);

	//当链表长度大于一次批量申请的内存时就开始还一段list给central cache
	if (_freeLists[index].Size() >= _freeLists[index].MaxSize())
	{
		ListTooLong(_freeLists[index], size);
	}
}

  当自由链表的长度大于一次批量申请的对象时,我们的做法是:从该自由链表中取出一次批量个数的对象,然后将取出的这些对象还给central cache中对应的span即可。

void ThreadCache::ListTooLong(FreeList& list, size_t size)
{
	void* start = nullptr;
	void* end = nullptr;
	list.PopRange(start, end, list.MaxSize());

	CentralCache::GetInstance()->ReleaseListToSpans(start, size);
}

  从上面可以看出,FreeList类需要支持用Size函数获取自由链表中对象的个数,还需要支持用PopRange函数从自由链表中取出指定个数的对象。因此我们需要给FreeList类增加一个对应的PopRange函数,然后再增加一个_size成员变量,该成员变量用于记录当前自由链表中对象的个数,当我们向自由链表插入或删除对象时,都应该更新_size的值。

//管理切分好的小对象的自由链表
class FreeList
{
public:
	void Push(void* obj)   //插入对象 (一个对象)
	{
		assert(obj);

		//头插
		//*(void**)obj = _freeList;
		NextObj(obj) = _freeList;
		_freeList = obj;

		++_size;
	}

	void PushRange(void* start, void* end,size_t n)  //(多个对象的插入)
	{
		NextObj(end) = _freeList;
		_freeList = start;
		
		_size += n;
	}

	void PopRange(void*& start, void*& end, size_t n)
	{
		assert(n <= _size);
		start = _freeList;
		end = start;

		for (size_t i = 0; i < n - 1; ++i)
		{
			end = NextObj(end);
		}

		_freeList = NextObj(end);
		NextObj(end) = nullptr;
		_size -= n;
	}

	void* Pop()    //删除对象
	{
		assert(_freeList);

		//头删
		void* obj = _freeList;
		_freeList = NextObj(obj);

		--_size;

		return obj;
	}

	bool Empty()
	{
		return _freeList == nullptr;
	}

	size_t& MaxSize()
	{
		return _maxSize;
	}

	size_t Size()
	{
		return _size;
	}

private:
	void* _freeList=nullptr;
	size_t _maxSize = 1;
	size_t _size=0;
};

  而对于FreeList类当中的PushRange成员函数,我们最好也像PopRange一样给它增加一个参数,表示插入对象的个数,不然我们这时还需要通过遍历统计插入对象的个数。

十、centralcache回收内存

  当thread cache中某个自由链表太长时,会将自由链表当中的这些对象还给central cache中的span。
  注意:还给central cache的这些对象不一定都是属于同一个span的。central cache中的每个哈希桶中可能都不止一个span,因此当我们计算出还回来的对象应该还给central cache的哪一个桶后,还需要知道这些对象到底应该还给这个桶当中的哪个span。

如何根据对象的地址得到对象所在的页号
  首先我们必须要理解的是,某个页当中的所有地址除以页的大小都等于该页的页号。比如我们假设一页的大小是100,那么地址0~99都属于第0页,它们除以100都等于0,而地址100~199都属于第1页,它们除以100都等于1。

如何找到一个对象对应的span
  虽然我们现在可以通过对象的地址得到其所在的页号,但是我们还是不能知道这个对象到底属于哪一个span。因为一个span管理的可能是多个页。
  为了解决这个问题,我们可以建立页号和span之间的映射。由于这个映射关系在page cache进行span的合并时也需要用到,因此我们直接将其存放到page cache里面。这时我们就需要在PageCache类中添加一个映射关系了,这里可以用C++当中的unordered_map进行实现,并且添加一个函数接口,用于让central cache获取这里的映射关系。

//单例模式
class PageCache
{
public:
	//获取从对象到span的映射
	Span* MapObjectToSpan(void* obj);
private:
	std::unordered_map<PAGE_ID, Span*> _idSpanMap;
};

  每当page cache分配span给central cache时,都需要记录以下页号和span之间的映射关系。此后当thread cache还对象给central cache时,才知道应该具体还给哪个span。
  因此当central cache在调用NewSpan接口向page cache申请k页的span时,page cache在返回这个k页的span给central cache之前,应该建立这k个页号与该span之间的映射关系。

//获取一个K页的span
Span* PageCache::NewSpan(size_t k)
{
	//assert(k > 0 && k < NPAGES);
	assert(k > 0);

	//大于128 page的直接向堆申请
	if (k > NPAGES - 1)
	{
		void* ptr = SystemAlloc(k);
		Span* span = new Span;

		span->_pageId = (PAGE_ID)ptr >> PAGE_SHIFT;
		span->_n = k;

		_idSpanMap[span->_pageId] = span;

		return span;
	}


	//先检查第k个桶里面有没有span
	if (!_spanLists[k].Empty())
	{
		return _spanLists[k].PopFront();

		//建立id和span的映射,方便central cache回收小块内存时,查找对应的span
		for (PAGE_ID i = 0; i < kSpan->_n; ++i)
		{
			_idSpanMap[kSpan->_pageId + i] = kSpan;
		}

		return kSpan;
	}

	//检查一下后面的桶里面有没有span,如果有可以把它进行切分
	for (size_t i = k + 1; i < NPAGES; ++i)
	{
		if (!_spanLists[i].Empty())
		{
			Span* nSpan = _spanLists[i].PopFront();
			Span* kSpan = new Span;

			//在nSpan的头部切一个k页下来
			kSpan->_pageId = nSpan->_pageId;
			kSpan->_n = k;

			nSpan->_pageId += k;
			nSpan->_n -= k;

			_spanLists[nSpan->_n].PushFront(nSpan);
			//存储nSpan的首尾页码跟nSpan映射,
			//方便page cache回收内存时进行的合并查找
			_idSpanMap[nSpan->_pageId] = nSpan;

			//1000  + 5-1=1004
			_idSpanMap[nSpan->_pageId + nSpan->_n - 1] = nSpan;

			//建立id和span的映射,方便central cache回收小块内存时,查找对应的span
			for (PAGE_ID i = 0; i < kSpan->_n; ++i)
			{
				_idSpanMap[kSpan->_pageId + i] = kSpan;
			}

			return kSpan;
		}
	}

	//走到这个位置就说明后面没有大页的span了
	//这是就去找堆要一个128页的span
	Span* bigSpan = new Span;

	void* ptr = SystemAlloc(NPAGES - 1);
	bigSpan->_pageId = (PAGE_ID)ptr >> PAGE_SHIFT;
	bigSpan->_n = NPAGES - 1;

	_spanLists[bigSpan->_n].PushFront(bigSpan);

	return NewSpan(k);
}

  此时就可以通过对象的地址找到该对象对应的span了,直接将该对象的地址除以页的大小得到页号,然后在unordered_map当中找到其对应的span即可。

//获取从对象到span的映射
Span* PageCache::MapObjectToSpan(void* obj)
{
	PAGE_ID id = (PAGE_ID)obj >> PAGE_SHIFT; //页号
	auto ret = _idSpanMap.find(id);
	if (ret != _idSpanMap.end())
	{
		return ret->second;
	}
	else
	{
		assert(false);
		return nullptr;
	}
}

  注意:当我们通过某个页号查找其对应的span时,该页号与其span之间的映射一定是建立过的,如果此时没有在unordered_map当中找到,则说明之前的代码逻辑有问题。

回收内存
  这时当thread cache还对象给central cache时,就可以依次遍历这些对象,将这些对象插入到其对应span的自由链表当中,并且及时更新该span的_useCount计数即可。
  在thread cache还对象给central cache的过程中,如果central cache中某个span的_useCount减到0时,说明这个span分配出去的对象全部还回来了,那么此时就可以将这个span再进一步还给page cache。

void CentralCache::ReleaseListToSpans(void* start, size_t size)
{
	size_t index = SizeClass::Index(size);
	_spanLists[index]._mtx.lock();
	while (start)
	{
		void* next = NextObj(start);

		Span* span = PageCache::GetInstance()->MapObjectToSpan(start);
		NextObj(start) = span->_freeList;
		span->_freeList = start;
		span->_useCount--;

		//说明span的切分出去的所有小块内存都回来了
		//这个span就可以再回收给page cache, pagecache可以再尝试去做前后页的合并
		if (span->_useCount == 0)
		{
			_spanLists[index].Erase(span);
			span->_freeList = nullptr;
			span->_next = nullptr;
			span->_prev = nullptr;

			//释放span给page cache时,使用page cache的锁就可以了
			//这时把桶锁解掉
			_spanLists[index]._mtx.unlock();

			PageCache::GetInstance()->_pageMtx.lock();
			PageCache::GetInstance()->ReleaseSpanToPageCache(span);
			PageCache::GetInstance()->_pageMtx.unlock();

			_spanLists[index]._mtx.lock();
		}
		start = next;
	}

	_spanLists[index]._mtx.unlock();
}

  注意:如果要把某个span还给page cache,我们需要先将这个span从central cache对应的双链表中移除,然后再将该span的自由链表置空,因为page cache中的span是不需要切分成一个个的小对象的,以及该span的前后指针也都应该置空,因为之后要将其插入到page cache对应的双链表中。但span当中记录的起始页号以及它管理的页数是不能清除的,否则对应内存块就找不到了。
  并且在central cache还span给page cache时也存在锁的问题,此时需要先将central cache中对应的桶锁解掉,然后再加上page cache的大锁之后才能进入page cache进行相关操作,当处理完毕回到central cache时,除了将page cache的大锁解掉,还需要立刻获得central cache对应的桶锁,然后将还未还完的对象继续还给central cache中对应的span。

十一、pagecache回收内存

  如果central cache中有某个span的_useCount减到0了,那么central cache就需要将这个span还给page cache了。
  这个过程看似是非常简单的,pagecache只需将还回来的span挂到对应的哈希桶上就行了。但实际为了缓解内存碎片的问题,page cache还需要尝试将还回来的span与其他空间的span进行合并。

11.1 page cache进行前后页的合并

  合并的过程可以分为向前合并和向后合并。如果还回来的span的起始页号是num,该span所管理的页数是n。那么在向前合并时,就需要判断第num-1页对应span是否空闲,如果空闲则可以将其进行合并,并且合并后还需要继续向前尝试进行合并,直到不能进行合并为止。而在向后合并时,就需要判断第num+n页对应的span是否空闲,如果空闲则可以将其进行合并,并且合并后还需要继续向后尝试进行合并,直到不能进行合并为止。
  因此page cache在合并span时,是需要通过页号获取到对应的span的,这就是我们要把页号与span之间的映射关系存储到page cache的原因。
  但需要注意的是,当我们通过页号找到其对应的span时,这个span此时可能挂在page cache,也可能挂在central cache。而在合并时我们只能合并挂在page cache的span,因为挂在central cache的span中的对象正在被其他线程使用。
  可是我们不能通过span结构中的_useCount成员,来判断某个span到底是在central cache还是在page cache。因为当central cache刚向page cache申请到一个span时,这个span的_useCount就是等于0的,这时可能当我们正在对该span进行切分的时候,page cache就把这个span拿去进行合并了,这显然是不合理的。
  鉴于此,可以在span结构中再增加一个_isUse成员,用于标记这个span是否正在被使用,而当一个span结构被创建时我们默认该span是没有被使用的。

//管理多个连续页大块内存跨度结构
struct Span
{
	PAGE_ID _pageId=0;   //大块内存起始页的页号
	size_t _n=0;         //页的数量

	Span* _next=nullptr;       //双向链表的结构
	Span* _prev=nullptr;   

	size_t _objSize = 0;    //切好的小对象的大小
	size_t _useCount=0;  //切好的小块内存,被分配给thread cache的计数
	void* _freeList=nullptr;   //切好的小块内存的自由链表

	bool _isUse = false;      //是否在被使用
};

  因此,当central cache向page cache申请到一个span时,需要立即将该span的_isUse改为true。
  而当central cache将某个span还给page cache时,也就需要将该span的_isUse改成false。
  由于在合并page cache中的span时,需要通过页号找到其对应的span,而一个span是在被分配给central cache时,才建立的各个页号与span之间的映射关系,因此page cache当中的span也需要建立页号与span之间的映射关系。
  与central cache中的span不同的是,在page cache中,只需建立一个span的首尾页号与该span之间的映射关系。因为当一个span在尝试进行合并时,如果是往前合并,那么只需要通过一个span的尾页找到这个span,如果是向后合并,那么只需要通过一个span的首页找到这个span。
  当我们申请k页的span时,如果是将n页的span切成了一个k页的span和一个n-k页的span,我们除了需要建立k页span中每个页与该span之间的映射关系之外,还需要建立剩下的n-k页的span与其首尾页之间的映射关系。

//获取一个K页的span
Span* PageCache::NewSpan(size_t k)
{
	//assert(k > 0 && k < NPAGES);
	assert(k > 0);

	//大于128 page的直接向堆申请
	if (k > NPAGES - 1)
	{
		void* ptr = SystemAlloc(k);
		Span* span = new Span;

		span->_pageId = (PAGE_ID)ptr >> PAGE_SHIFT;
		span->_n = k;

		_idSpanMap[span->_pageId] = span;

		return span;
	}


	//先检查第k个桶里面有没有span
	if (!_spanLists[k].Empty())
	{
		return _spanLists[k].PopFront();

		//建立id和span的映射,方便central cache回收小块内存时,查找对应的span
		for (PAGE_ID i = 0; i < kSpan->_n; ++i)
		{
			_idSpanMap[kSpan->_pageId + i] = kSpan;
		}

		return kSpan;
	}

	//检查一下后面的桶里面有没有span,如果有可以把它进行切分
	for (size_t i = k + 1; i < NPAGES; ++i)
	{
		if (!_spanLists[i].Empty())
		{
			Span* nSpan = _spanLists[i].PopFront();
			Span* kSpan = new Span;

			//在nSpan的头部切一个k页下来
			kSpan->_pageId = nSpan->_pageId;
			kSpan->_n = k;

			nSpan->_pageId += k;
			nSpan->_n -= k;

			_spanLists[nSpan->_n].PushFront(nSpan);
			//存储nSpan的首尾页码跟nSpan映射,
			//方便page cache回收内存时进行的合并查找
			_idSpanMap[nSpan->_pageId] = nSpan;

			//1000  + 5-1=1004
			_idSpanMap[nSpan->_pageId + nSpan->_n - 1] = nSpan;

			//建立id和span的映射,方便central cache回收小块内存时,查找对应的span
			for (PAGE_ID i = 0; i < kSpan->_n; ++i)
			{
				_idSpanMap[kSpan->_pageId + i] = kSpan;
			}

			return kSpan;
		}
	}

	//走到这个位置就说明后面没有大页的span了
	//这是就去找堆要一个128页的span
	Span* bigSpan = new Span;

	void* ptr = SystemAlloc(NPAGES - 1);
	bigSpan->_pageId = (PAGE_ID)ptr >> PAGE_SHIFT;
	bigSpan->_n = NPAGES - 1;

	_spanLists[bigSpan->_n].PushFront(bigSpan);

	return NewSpan(k);
}

  此时page cache当中的span就都与其首尾页之间建立了映射关系,现在我们就可以进行span的合并了。

void PageCache::ReleaseSpanToPageCache(Span* span)
{
	//大于128 page的直接还给堆
	if (span->_n > NPAGES - 1)
	{
		void* ptr = (void*)(span->_pageId << PAGE_SHIFT);
		SystemFree(ptr);
		delete span;

		return;
	}

	//对span前后的页,尝试进行合并,缓解内存碎片问题
	while (1)
	{
		PAGE_ID prevId = span->_pageId - 1;
		auto ret = _idSpanMap.find(prevId);
		//前面的页号没有,不合并了
		if (ret == _idSpanMap.end())
		{
			break;
		}

		//前面相邻页的span在使用,不合并了
		Span* prevSpan = ret->second;
		if (prevSpan->_isUse==true)
		{
			break;
		}

		//合并出超过128页的span没办法管理,不合并了
		if (prevSpan->_n + span->_n > NPAGES - 1)
		{
			break;
		}

		span->_pageId = prevSpan->_pageId;
		span->_n += prevSpan->_n;

		_spanLists[prevSpan->_n].Erase(prevSpan);
		delete prevSpan;
	}

	//向后合并
	while (1)
	{
		PAGE_ID nextId = span->_pageId + span->_n;
		auto ret = _idSpanMap.find(nextId);
		if (ret==_idSpanMap.end())
		{
			break;
		}

		Span* nextSpan = ret->second;
		if (nextSpan->_isUse == true)
		{
			break;
		}

		if (nextSpan->_n + span->_n > NPAGES - 1)
		{
			break;
		}

		span->_n += nextSpan->_n;

		_spanLists[nextSpan->_n].Erase(nextSpan);
		delete nextSpan;
	}

	_spanLists[span->_n].PushFront(span);
	span->_isUse = false;

	_idSpanMap[span->_pageId] = span;
	_idSpanMap[span->_pageId + span->_n - 1] = span;
}

  注意

  • 如果没有通过页号获取到其对应的span,说明对应到该页的内存块还未申请,此时需要停止合并。
  • 如果通过页号获取了其对应的span,但该span处于被使用的状态,那我们也必须停止合并。
  • 如果合并后大于128页则不能进行本次合并,因为page cache无法对大于128页的span进行管理。

  在合并span时,由于这个span是在page cache的某个哈希桶的双链表当中的,因此在合并后需要将其从对应的双链表中移除,然后再将这个被合并了的span结构进行delete。
  除此之外,在合并结束后,除了将合并后的span挂到page cache对应哈希桶的双链表当中,还需要建立该span与其首位页之间的映射关系,便于此后合并出更大的span。

十二、释放内存过程联调

12.1 ConcurrentFree函数

  此时我们可以向外提供一个ConcurrentFree函数,用于释放内存块,释放内存块时每个线程通过自己的thread cache对象,调用thread cache中释放内存对象的接口即可。

static void ConcurrentFree(void* ptr, size_t size/*暂时*/)
{
	assert(pTLSThreadCache);

	pTLSThreadCache->Deallocate(ptr, size);
}

十三、大于256KB的大块内存申请问题

13.1 申请过程

  之前说到,每个线程的thread cache是用于申请小于等于256KB的内存的,而对于大于256KB的内存,我们可以考虑直接向page cache申请,但page cache中最大的页也就只有128页,因此如果是大于128页的内存申请,就只能直接向堆申请了。

申请内存的大小申请方式
x ≤ 256KB(32页)向thread cache申请
32页 < x ≤ 128页向page cache申请
x ≥ 128页向堆申请
当申请的内存大于256KB时,虽然不是从thread cache进行获取,但在分配内存时也是需要进行向上对齐的,对于大于256KB的内存我们直接按页进行对齐

  而我们之前实现RoundUp函数时,对传入字节数大于256KB的情况直接做了断言处理,因此这里需要对RoundUp函数稍作修改。

static inline size_t RoundUp(size_t size)   //计算对象采用多少字节对齐
	{
		if (size <= 128)
		{
			return _RoundUp(size, 8);
		}
		else if (size <= 1024)
		{
			return _RoundUp(size, 16);
		}
		else if (size <= 8 * 1024)
		{
			return _RoundUp(size, 128);
		}
		else if (size <= 64 * 1024)
		{
			return _RoundUp(size, 1024);
		}
		else if (size <= 256 * 1024)
		{
			return _RoundUp(size, 8*1024);
		}
		else
		{
			return _RoundUp(size, 1<<PAGE_SHIFT);
		}
	}

  现在对于之前的申请逻辑就需要进行修改了,当申请对象的大小大于256KB时,就不用向thread cache申请了,这时先计算出按页对齐后实际需要申请的页数,然后通过调用NewSpan申请指定页数的span即可。

static void* ConcurrentAlloc(size_t size)
{
	if (size > MAX_BYTES)
	{
		size_t alignSize = SizeClass::RoundUp(size);
		size_t kpage = alignSize >> PAGE_SHIFT;

		PageCache::GetInstance()->_pageMtx.lock();
		Span* span = PageCache::GetInstance()->NewSpan(kpage);
		span->_objSize = size;
		PageCache::GetInstance()->_pageMtx.unlock();

		void* ptr = (void*)(span->_pageId << PAGE_SHIFT);
		return ptr;
	}
	else
	{
		//通过TLS (Thread Local Storage 线程本地存储) 每个线程无锁的获取自己的专属ThreadCache对象
		if (pTLSThreadCache == nullptr)
		{
			static ObjectPool<ThreadCache> tcPool;
			//pTLSThreadCache = new ThreadCache;
			pTLSThreadCache = tcPool.New();

		}

		cout << std::this_thread::get_id() << ":" << pTLSThreadCache << endl;

		return pTLSThreadCache->Allocate(size);
	}
}

  也就是说,申请大于256KB的内存时,会直接调用page cache当中的NewSpan函数进行申请,因此这里我们需要再对NewSpan函数进行改造,当需要申请的内存页数大于128页时,就直接向堆申请对应页数的内存块。而如果申请的内存页数是小于128页的,那就在page cache中进行申请,因此当申请大于256KB的内存调用NewSpan函数时也是需要加锁的,因为我们可能是在page cache中进行申请的。

//获取一个K页的span
Span* PageCache::NewSpan(size_t k)
{
	//assert(k > 0 && k < NPAGES);
	assert(k > 0);

	//大于128 page的直接向堆申请
	if (k > NPAGES - 1)
	{
		void* ptr = SystemAlloc(k);
		//Span* span = new Span;
		Span* span = _spanPool.New();

		span->_pageId = (PAGE_ID)ptr >> PAGE_SHIFT;
		span->_n = k;

		//_idSpanMap[span->_pageId] = span;
		_idSpanMap.set(span->_pageId, span);

		return span;
	}

	//先检查第k个桶里面有没有span
	if (!_spanLists[k].Empty())
	{
		//return _spanLists[k].PopFront();
		Span* kSpan= _spanLists[k].PopFront();

		//建立id和span的映射,方便central cache回收小块内存时,查找对应的span
		for (PAGE_ID i = 0; i < kSpan->_n; ++i)
		{
			//_idSpanMap[kSpan->_pageId + i] = kSpan;
			_idSpanMap.set(kSpan->_pageId + i, kSpan);
		}

		return kSpan;
	}

	//检查一下后面的桶里面有没有span,如果有可以把它进行切分
	for (size_t i = k + 1; i < NPAGES; ++i)
	{
		if (!_spanLists[i].Empty())
		{
			Span* nSpan = _spanLists[i].PopFront();
			//Span* kSpan = new Span;
			Span* kSpan = _spanPool.New();

			//在nSpan的头部切一个k页下来
			kSpan->_pageId = nSpan->_pageId;
			kSpan->_n = k;

			nSpan->_pageId += k;
			nSpan->_n -= k;

			_spanLists[nSpan->_n].PushFront(nSpan);
			//存储nSpan的首尾页码跟nSpan映射,
			//方便page cache回收内存时进行的合并查找
			//_idSpanMap[nSpan->_pageId] = nSpan;
			_idSpanMap.set(nSpan->_pageId , nSpan);

			//1000  + 5-1=1004
			//_idSpanMap[nSpan->_pageId + nSpan->_n - 1] = nSpan;
			_idSpanMap.set(nSpan->_pageId + nSpan->_n - 1, nSpan);


			//建立id和span的映射,方便central cache回收小块内存时,查找对应的span
			for (PAGE_ID i = 0; i < kSpan->_n; ++i)
			{
				//_idSpanMap[kSpan->_pageId + i] = kSpan;
				_idSpanMap.set(kSpan->_pageId + i, kSpan);
			}

			return kSpan;
		}
	}

	//走到这个位置就说明后面没有大页的span了
	//这是就去找堆要一个128页的span
	//Span* bigSpan = new Span;
	Span* bigSpan = _spanPool.New();

	void* ptr = SystemAlloc(NPAGES - 1);
	bigSpan->_pageId = (PAGE_ID)ptr >> PAGE_SHIFT;
	bigSpan->_n = NPAGES - 1;

	_spanLists[bigSpan->_n].PushFront(bigSpan);

	return NewSpan(k);
}

13.2 释放过程

  当释放对象时,我们需要判断释放对象的大小:

释放内存的大小释放方式
x ≤ 256KB(32页)释放给thread cache
32页 < x ≤ 128页释放给page cache
x ≥ 128页释放给堆

  因此当释放对象时,我们需要先找到该对象对应的span,但是在释放对象时我们只知道该对象的起始地址。这也就是我们在申请大于256KB的内存时,也要给申请到的内存建立span结构,并建立起始页号与该span之间的映射关系的原因。此时我们就可以通过释放对象的起始地址计算出起始页号,进而通过页号找到该对象对应的span。

//static void ConcurrentFree(void* ptr,size_t size)
static void ConcurrentFree(void* ptr)
{
	Span* span = PageCache::GetInstance()->MapObjectToSpan(ptr);
	size_t size = span->_objSize;

	if (size > MAX_BYTES)
	{
		//Span* span = PageCache::GetInstance()->MapObjectToSpan(ptr);

		PageCache::GetInstance()->_pageMtx.lock();
		PageCache::GetInstance()->ReleaseSpanToPageCache(span);
		PageCache::GetInstance()->_pageMtx.unlock();
	}
	else
	{
		assert(pTLSThreadCache);
		pTLSThreadCache->Deallocate(ptr, size);
	}
}

  因此page cache在回收span时也需要进行判断,如果该span的大小是小于等于128页的,那么直接还给page cache进行了,page cache会尝试对其进行合并。而如果该span的大小是大于128页的,那么说明该span是直接向堆申请的,我们直接将这块内存释放给堆,然后将这个span结构进行delete就行了。

void PageCache::ReleaseSpanToPageCache(Span* span)
{
	//大于128 page的直接还给堆
	if (span->_n > NPAGES - 1)
	{
		void* ptr = (void*)(span->_pageId << PAGE_SHIFT);
		SystemFree(ptr);
		//delete span;
		_spanPool.Delete(span);

		return;
	}

	//对span前后的页,尝试进行合并,缓解内存碎片问题
	while (1)
	{
		PAGE_ID prevId = span->_pageId - 1;
		//auto ret = _idSpanMap.find(prevId);
		前面的页号没有,不合并了
		//if (ret == _idSpanMap.end())
		//{
		//	break;
		//}

		auto ret = (Span*)_idSpanMap.get(prevId);
		if (ret == nullptr)
		{
			break;
		}

		//前面相邻页的span在使用,不合并了
		//Span* prevSpan = ret->second;
		Span* prevSpan = ret;
		if (prevSpan->_isUse==true)
		{
			break;
		}

		//合并出超过128页的span没办法管理,不合并了
		if (prevSpan->_n + span->_n > NPAGES - 1)
		{
			break;
		}

		span->_pageId = prevSpan->_pageId;
		span->_n += prevSpan->_n;

		_spanLists[prevSpan->_n].Erase(prevSpan);
		//delete prevSpan;
		_spanPool.Delete(prevSpan);
	}

	//向后合并
	while (1)
	{
		PAGE_ID nextId = span->_pageId + span->_n;
		/*auto ret = _idSpanMap.find(nextId);
		if (ret==_idSpanMap.end())
		{
			break;
		}*/

		auto ret = (Span*)_idSpanMap.get(nextId);
		if (ret == nullptr)
		{
			break;
		}

		//Span* nextSpan = ret->second;
		Span* nextSpan = ret;
		if (nextSpan->_isUse == true)
		{
			break;
		}

		if (nextSpan->_n + span->_n > NPAGES - 1)
		{
			break;
		}

		span->_n += nextSpan->_n;

		_spanLists[nextSpan->_n].Erase(nextSpan);
		//delete nextSpan;
		_spanPool.Delete(nextSpan);
	}

	_spanLists[span->_n].PushFront(span);
	span->_isUse = false;

	//_idSpanMap[span->_pageId] = span;
	//_idSpanMap[span->_pageId + span->_n - 1] = span;

	_idSpanMap.set(span->_pageId, span);
	_idSpanMap.set(span->_pageId + span->_n - 1, span);
}

  注意:直接向堆申请内存时我们调用的接口是VirtualAlloc,与之对应的将内存释放给堆的接口叫做VirtualFree,而Linux下的brk和mmap对应的释放接口叫做sbrk和unmmap。此时我们也可以将这些释放接口封装成一个叫做SystemFree的接口,当我们需要将内存释放给堆时直接调用SystemFree即可。

十四、使用定长内存池配合脱离使用new

  tcmalloc是要在高并发场景下替代malloc进行内存申请的,因此tcmalloc在实现的时,其内部是不能调用malloc函数的,我们当前的代码中存在通过new获取到的内存,而new在底层实际上就是封装了malloc。
  为了完全脱离掉malloc函数,此时我们之前实现的定长内存池就起作用了,代码中使用new时基本都是为Span结构的对象申请空间,而span对象基本都是在page cache层创建的,因此我们可以在PageCache类当中定义一个_spanPool,用于span对象的申请和释放。

//单例模式
class PageCache
{
public:
	//...
private:
	ObjectPool<Span> _spanPool;
};

  然后将代码中使用new的地方替换为调用定长内存池当中的New函数,将代码中使用delete的地方替换为调用定长内存池当中的Delete函数。

//申请span对象
Span* span = _spanPool.New();
//释放span对象
_spanPool.Delete(span);

  注意:当使用定长内存池当中的New函数申请Span对象时,New函数通过定位new也是对Span对象进行初始化的。
  此外,每个线程第一次申请内存时都会创建其专属的thread cache,而这个thread cache目前也是new出来的,我们也需要对其进行替换。

//通过TLS,每个线程无锁的获取自己专属的ThreadCache对象
if (pTLSThreadCache == nullptr)
{
	static std::mutex tcMtx;
	static ObjectPool<ThreadCache> tcPool;
	tcMtx.lock();
	pTLSThreadCache = tcPool.New();
	tcMtx.unlock();
}

  这里我们将用于申请ThreadCache类对象的定长内存池定义为静态的,保持全局只有一个,让所有线程创建自己的thread cache时,都在这个定长内存池中申请内存就行了。
  但注意在从该定长内存池中申请内存时需要加锁,防止多个线程同时申请自己的ThreadCache对象而导致线程安全问题。
  最后在SpanList的构造函数中也用到了new,因为SpanList是带头循环双向链表,所以在构造期间我们需要申请一个span对象作为双链表的头结点。

//带头双向循环链表
class SpanList
{
public:
	SpanList()
	{
		_head = _spanPool.New();
		_head->_next = _head;
		_head->_prev = _head;
	}
private:
	Span* _head;
	static ObjectPool<Span> _spanPool;
};

  由于每个span双链表只需要一个头结点,因此将这个定长内存池定义为静态的,保持全局只有一个,让所有span双链表在申请头结点时,都在一个定长内存池中申请内存就行了。

十五、释放对象时优化为不传对象大小

  当我们使用malloc函数申请内存时,需要指明申请内存的大小;而当我们使用free函数释放内存时,只需要传入指向这块内存的指针即可。
  而我们目前实现的内存池,在释放对象时除了需要传入指向该对象的指针,还需要传入该对象的大小。

原因

  • 如果释放的是大于256KB的对象,需要根据对象的大小来判断这块内存到底应该还给page cache,还是应该直接还给堆。
    如果释放的是小于等于256KB的对象,需要根据对象的大小计算出应该还给thread cache的哪一个哈希桶。

  如果我们也想做到,在释放对象时不用传入对象的大小,那么我们就需要建立对象地址与对象大小之间的映射。由于现在可以通过对象的地址找到其对应的span,而span的自由链表中挂的都是相同大小的对象。因此我们可以在Span结构中再增加一个_objSize成员,该成员代表着这个span管理的内存块被切成的一个个对象的大小。

struct Span
{
	PAGE_ID _pageId=0;   //大块内存起始页的页号
	size_t _n=0;         //页的数量

	Span* _next=nullptr;       //双向链表的结构
	Span* _prev=nullptr;   

	size_t _objSize = 0;    //切好的小对象的大小
	size_t _useCount=0;  //切好的小块内存,被分配给thread cache的计数
	void* _freeList=nullptr;   //切好的小块内存的自由链表

	bool _isUse = false;      //是否在被使用
};

  而所有的span都是从page cache中拿出来的,因此每当我们调用NewSpan获取到一个k页的span时,就应该将这个span的_objSize保存下来。

Span* span = PageCache::GetInstance()->NewSpan(SizeClass::NumMovePage(size));
span->_objSize = size;

  代码中有两处,一处是在central cache中获取非空span时,如果central cache对应的桶中没有非空的span,此时会调用NewSpan获取一个k页的span;另一处是当申请大于256KB内存时,会直接调用NewSpan获取一个k页的span。
  此时当我们释放对象时,就可以直接从对象的span中获取到该对象的大小,准确来说获取到的是对齐以后的大小。

static void ConcurrentFree(void* ptr)
{
	Span* span = PageCache::GetInstance()->MapObjectToSpan(ptr);
	size_t size = span->_objSize;

	if (size > MAX_BYTES)
	{
		PageCache::GetInstance()->_pageMtx.lock();
		PageCache::GetInstance()->ReleaseSpanToPageCache(span);
		PageCache::GetInstance()->_pageMtx.unlock();
	}
	else
	{
		assert(pTLSThreadCache);
		pTLSThreadCache->Deallocate(ptr, size);
	}
}

15.1 读取映射关系时的加锁问题

  我们将页号与span之间的映射关系是存储在PageCache类当中的,当我们访问这个映射关系时是需要加锁的,因为STL容器是不保证线程安全的。
  对于当前代码来说,如果我们此时正在page cache进行相关操作,那么访问这个映射关系是安全的,因为当进入page cache之前是需要加锁的,因此可以保证此时只有一个线程在进行访问。
  但如果我们是在central cache访问这个映射关系,或是在调用ConcurrentFree函数释放内存时访问这个映射关系,那么就存在线程安全的问题。因为此时可能其他线程正在page cache当中进行某些操作,并且该线程此时可能也在访问这个映射关系,因此当我们在page cache外部访问这个映射关系时是需要加锁的。
  实际就是在调用page cache对外提供访问映射关系的函数时需要加锁,这里我们可以考虑使用C++当中的unique_lock,当然你也可以用普通的锁。

Span* PageCache::MapObjectToSpan(void* obj)
{
	PAGE_ID id = ((PAGE_ID)obj >> PAGE_SHIFT);

	std::unique_lock<std::mutex> lock(_pageMtx); //构造时加锁,析构时自动解锁

	auto ret = _idSpanMap.find(id);
	if (ret != _idSpanMap.end())
	{
		return ret->second;
	}
	else
	{
		assert(false);
		return nullptr;
	}
}

十六、多线程环境下对比malloc测试

  之前我们只是对代码进行了一些基础的单元测试,下面我们在多线程场景下对比malloc进行测试。

// ntimes 一轮申请和释放内存的次数
// rounds 轮次
// nworks 线程数
void BenchmarkMalloc(size_t ntimes, size_t nworks, size_t rounds)
{
	std::vector<std::thread> vthread(nworks);
	std::atomic<size_t> malloc_costtime = 0;
	std::atomic<size_t> free_costtime = 0;

	for (size_t k = 0; k < nworks; ++k)
	{
		vthread[k] = std::thread([&, k]() {
			std::vector<void*> v;
			v.reserve(ntimes);

			for (size_t j = 0; j < rounds; ++j)
			{
				size_t begin1 = clock();
				for (size_t i = 0; i < ntimes; i++)
				{
					v.push_back(malloc(16));
					//v.push_back(malloc((16 + i) % 8192 + 1));
				}
				size_t end1 = clock();
				size_t begin2 = clock();
				for (size_t i = 0; i < ntimes; i++)
				{
					free(v[i]);
				}
				size_t end2 = clock();
				v.clear();
				malloc_costtime += (end1 - begin1);
				free_costtime += (end2 - begin2);
			}
			});
	}
	for (auto& t : vthread)
	{
		t.join();
	}
	cout << nworks << "个线程并发执行" << rounds << "轮次,每轮次malloc "
		<< ntimes << "次: 花费:" << malloc_costtime << "ms" << endl;

	cout << nworks << "个线程并发执行" << rounds << "轮次,每轮次free "
		<< ntimes << "次: 花费:" << free_costtime << "ms" << endl;

	cout << nworks << "个线程并发执行" << rounds << "malloc&free "
		<< nworks * rounds * ntimes << "次,总计花费:" << malloc_costtime + free_costtime << "ms" << endl;
}

// 单轮次申请释放次数 线程数 轮次
void BenchmarkConcurrentMalloc(size_t ntimes, size_t nworks, size_t rounds)
{
	std::vector<std::thread> vthread(nworks);
	std::atomic<size_t> malloc_costtime = 0;
	std::atomic<size_t> free_costtime = 0;


	for (size_t k = 0; k < nworks; ++k)
	{
		vthread[k] = std::thread([&]() {
			std::vector<void*> v;
			v.reserve(ntimes);
			for (size_t j = 0; j < rounds; ++j)
			{
				size_t begin1 = clock();
				for (size_t i = 0; i < ntimes; i++)
				{
					v.push_back(ConcurrentAlloc(16));
					//v.push_back(ConcurrentAlloc((16 + i) % 8192 + 1));
				}
				size_t end1 = clock();
				size_t begin2 = clock();
				for (size_t i = 0; i < ntimes; i++)
				{
					ConcurrentFree(v[i]);
				}
				size_t end2 = clock();
				v.clear();
				malloc_costtime += (end1 - begin1);
				free_costtime += (end2 - begin2);
			}
			});
	}
	for (auto& t : vthread)
	{
		t.join();
	}
	cout << nworks << "个线程并发执行" << rounds << "轮次,每轮次concurrent alloc "
		<< ntimes << "次: 花费:" << malloc_costtime << "ms" << endl;

	cout << nworks << "个线程并发执行" << rounds << "轮次,每轮次concurrent dealloc "
		<< ntimes << "次: 花费:" << free_costtime << "ms" << endl;

	cout << nworks << "个线程并发执行" << rounds << "concurrent alloc&dealloc "
		<< nworks * rounds * ntimes << "次,总计花费:" << malloc_costtime + free_costtime << "ms" << endl;
}

int main()
{
	size_t n = 100000;
	cout << "==========================================================" <<endl;
	BenchmarkConcurrentMalloc(n, 4, 10);
	//BenchmarkConcurrentMalloc(n, 1, 10);
	cout << endl << endl;

	BenchmarkMalloc(n, 4, 10);
	cout << "==========================================================" <<endl;
	return 0;
}

  在测试函数中,我们通过clock函数分别获取到每轮次申请和释放所花费的时间,然后将其对应累加到malloc_costtime和free_costtime上。最后我们就得到了,nworks个线程跑rounds轮,每轮申请和释放ntimes次,这个过程申请所消耗的时间、释放所消耗的时间、申请和释放总共消耗的时间。
  注意,我们创建线程时让线程执行的是lambda表达式,而我们这里在使用lambda表达式时,以值传递的方式捕捉了变量k,以引用传递的方式捕捉了其他父作用域中的变量,因此我们可以将各个线程消耗的时间累加到一起。
  我们将所有线程申请内存消耗的时间都累加到malloc_costtime上, 将释放内存消耗的时间都累加到free_costtime上,此时malloc_costtime和free_costtime可能被多个线程同时进行累加操作的,所以存在线程安全的问题。鉴于此,我们在定义这两个变量时使用了atomic类模板,这时对它们的操作就是原子操作了。

16.1 固定大小内存的申请和释放

v.push_back(malloc(16));
v.push_back(ConcurrentAlloc(16));

在这里插入图片描述

  此时4个线程执行10轮操作,每轮申请释放10000次,总共申请释放了40万次,运行后可以看到,malloc的效率还是更高的。
  由于此时我们申请释放的都是固定大小的对象,每个线程申请释放时访问的都是各自thread cache的同一个桶,当thread cache的这个桶中没有对象或对象太多要归还时,也都会访问central cache的同一个桶。此时central cache中的桶锁就不起作用了,因为我们让central cache使用桶锁的目的就是为了,让多个thread cache可以同时访问central cache的不同桶,而此时每个thread cache访问的却都是central cache中的同一个桶。

16.2 不同大小内存的申请和释放

v.push_back(malloc((16 + i) % 8192 + 1));
v.push_back(ConcurrentAlloc((16 + i) % 8192 + 1));

在这里插入图片描述

  运行后可以看到,由于申请和释放内存的大小是不同的,此时central cache当中的桶锁就起作用了,ConcurrentAlloc的效率也有了较大增长,但相比malloc来说还是差一点点。

十七、复杂问题的调试技巧

  多线程调试比单线程调试要复杂得多,调试时各个线程之间会相互切换,并且每次调试切换的时机也是不固定的,这就使得调试过程变得非常难以控制。

17.1 条件断点

  一般情况下我们可以直接运行程序,通过报错来查找问题。如果此时报的是断言错误,那么我们可以直接定位到报错的位置,然后将此处的断言改为与断言条件相反的if判断,在if语句里面打上一个断点,但注意空语句是无法打断点的,这时我们随便在if里面加上一句代码就可以打断点了。
在这里插入图片描述
  此外,条件断点也可以通过右击普通断点来进行设置。右击后即可设置相应的条件,程序运行到此处时如果满足该条件则会停下来。运行到条件断点处后,我们就可以对当前程序进行进一步分析,找出断言错误的被触发原因。

17.2 查看函数栈帧

  当程序运行到断点处时,我们需要对当前位置进行分析,如果检查后发现当前函数是没有问题的,这时可能需要回到调用该函数的地方进行进一步分析,此时我们可以依次点击“调试→窗口→调用堆栈”。
在这里插入图片描述
  此时我们就可以看到当前函数栈帧的调用情况,其中黄色箭头指向的是当前所在的函数栈帧。双击函数栈帧中的其他函数,就可以跳转到该函数对应的栈帧,此时浅灰色箭头指向的就是当前跳转到的函数栈帧。需要注意的是,监视窗口只能查看当前栈帧中的变量。如果要查看此时其他函数栈帧中变量的情况,就可以通过函数栈帧跳转来查看。

17.3 疑似死循环时中断程序

  当你在某个地方设置断点后,如果迟迟没有运行到断点处,而程序也没有崩溃,这时有可能是程序进入到某个死循环了。这时我们可以依次点击“调试→全部中断”。这时程序就会在当前运行的地方停下来。

在这里插入图片描述

十八、性能瓶颈分析

  经过前面的测试可以看到,我们的代码此时与malloc之间还是有差距的,此时我们就应该分析分析我们当前项目的瓶颈在哪里,但这不能简单的凭感觉,我们应该用性能分析的工具来进行分析。

18.1 VS编译器下性能分析的操作步骤

  VS编译器中就带有性能分析的工具的,我们可以依次点击“调试→性能和诊断”进行性能分析,注意该操作要在Debug模式下进行。
  同时我们将代码中n的值由10000调成了1000,否则该分析过程可能会花费较多时间,并且将malloc的测试代码进行了屏蔽,因为我们要分析的是我们实现的高并发内存池。

int main()
{
	size_t n = 1000;
	cout << "==========================================================" <<endl;
	BenchmarkConcurrentMalloc(n, 4, 10);
	//BenchmarkConcurrentMalloc(n, 1, 10);
	cout << endl << endl;

	//BenchmarkMalloc(n, 4, 10);
	cout << "==========================================================" <<endl;
	return 0;
}

  在点击了“调试→性能和诊断”后会弹出一个提示框,我们直接点击“开始”进行了。然后会弹出一个选项框,这里我们选择的是第二个,因为我们要分析的是各个函数的用时时间,然后点击下一步。出现以下选项框继续点击下一步。最后点击完成,就可以等待分析结果了。

18.2 分析性能瓶颈

  通过分析结果可以看到,光是Deallocate和MapObjectToSpan这两个函数就占用了一半多的时间。
在这里插入图片描述

  而在Deallocate函数中,调用ListTooLong函数时消耗的时间是最多的。
在这里插入图片描述

  继续往下看,在ListTooLong函数中,调用ReleaseListToSpans函数时消耗的时间是最多的。
在这里插入图片描述

  再进一步看,在ReleaseListToSpans函数中,调用MapObjectToSpan函数时消耗的时间是最多的。
在这里插入图片描述

  也就是说,最终消耗时间最多的实际就是MapObjectToSpan函数,我们这时再来看看为什么调用MapObjectToSpan函数会消耗这么多时间。通过观察我们最终发现,调用该函数时会消耗这么多时间就是因为锁的原因。
在这里插入图片描述

  因此当前项目的瓶颈点就在锁竞争上面,需要解决调用MapObjectToSpan函数访问映射关系时的加锁问题。tcmalloc当中针对这一点使用了基数树进行优化,使得在读取这个映射关系时可以做到不加锁。

十九、针对性能瓶颈使用基数树进行优化

  基数树实际上就是一个分层的哈希表,根据所分层数不同可分为单层基数树、二层基数树、三层基数树等。

19.1 单层基数树

  单层基数树实际采用的就是直接定址法,每一个页号对应span的地址就存储数组中在以该页号为下标的位置。
  最坏的情况下我们需要建立所有页号与其span之间的映射关系,因此这个数组中元素个数应该与页号的数目相同,数组中每个位置存储的就是对应span的指针。

// Single-level array
template <int BITS>
class TCMalloc_PageMap1 {
private:
	static const int LENGTH = 1 << BITS;
	void** array_;

public:
	typedef uintptr_t Number;

	//explicit TCMalloc_PageMap1(void* (*allocator)(size_t)) {
	explicit TCMalloc_PageMap1() {
		//array_ = reinterpret_cast<void**>((*allocator)(sizeof(void*) << BITS));
		size_t size = sizeof(void*) << BITS;
		size_t alignSize = SizeClass::_RoundUp(size, 1 << PAGE_SHIFT);
		array_ = (void**)SystemAlloc(alignSize >> PAGE_SHIFT);
		memset(array_, 0, sizeof(void*) << BITS);
	}

	// Return the current value for KEY.  Returns NULL if not yet set,
	// or if k is out of range.
	void* get(Number k) const {
		if ((k >> BITS) > 0) {
			return NULL;
		}
		return array_[k];
	}

	// REQUIRES "k" is in range "[0,2^BITS-1]".
	// REQUIRES "k" has been ensured before.
	//
	// Sets the value 'v' for key 'k'.
	void set(Number k, void* v) {
		array_[k] = v;
	}
};

  代码中的非类型模板参数BITS表示存储页号最多需要比特位的个数。在32位下我们传入的是32-PAGE_SHIFT,在64位下传入的是64-PAGE_SHIFT。而其中的LENGTH成员代表的就是页号的数目,即 2 B I T S 2^{BITS} 2BITS

19.2 二层基数树

  这里还是以32位平台下,一页的大小为8K为例来说明,此时存储页号最多需要19个比特位。而二层基数树实际上就是把这19个比特位分为两次进行映射。
  比如用前5个比特位在基数树的第一层进行映射,映射后得到对应的第二层,然后用剩下的比特位在基数树的第二层进行映射,映射后最终得到该页号对应的span指针。

// Two-level radix tree
template <int BITS>
class TCMalloc_PageMap2 {
private:
	// Put 32 entries in the root and (2^BITS)/32 entries in each leaf.
	static const int ROOT_BITS = 5;
	static const int ROOT_LENGTH = 1 << ROOT_BITS;

	static const int LEAF_BITS = BITS - ROOT_BITS;
	static const int LEAF_LENGTH = 1 << LEAF_BITS;

	// Leaf node
	struct Leaf {
		void* values[LEAF_LENGTH];
	};

	Leaf* root_[ROOT_LENGTH];             // Pointers to 32 child nodes
	void* (*allocator_)(size_t);          // Memory allocator

public:
	typedef uintptr_t Number;

	//explicit TCMalloc_PageMap2(void* (*allocator)(size_t)) {
	explicit TCMalloc_PageMap2() {
		//allocator_ = allocator;
		memset(root_, 0, sizeof(root_));

		PreallocateMoreMemory();
	}

	void* get(Number k) const {
		const Number i1 = k >> LEAF_BITS;
		const Number i2 = k & (LEAF_LENGTH - 1);
		if ((k >> BITS) > 0 || root_[i1] == NULL) {
			return NULL;
		}
		return root_[i1]->values[i2];
	}

	void set(Number k, void* v) {
		const Number i1 = k >> LEAF_BITS;
		const Number i2 = k & (LEAF_LENGTH - 1);
		ASSERT(i1 < ROOT_LENGTH);
		root_[i1]->values[i2] = v;
	}

	bool Ensure(Number start, size_t n) {
		for (Number key = start; key <= start + n - 1;) {
			const Number i1 = key >> LEAF_BITS;

			// Check for overflow
			if (i1 >= ROOT_LENGTH)
				return false;

			// Make 2nd level node if necessary
			if (root_[i1] == NULL) {
				//Leaf* leaf = reinterpret_cast<Leaf*>((*allocator_)(sizeof(Leaf)));
				//if (leaf == NULL) return false;
				static ObjectPool<Leaf>	leafPool;
				Leaf* leaf = (Leaf*)leafPool.New();

				memset(leaf, 0, sizeof(*leaf));
				root_[i1] = leaf;
			}

			// Advance key past whatever is covered by this leaf node
			key = ((key >> LEAF_BITS) + 1) << LEAF_BITS;
		}
		return true;
	}

	void PreallocateMoreMemory() {
		// Allocate enough to keep track of all possible pages
		Ensure(0, 1 << BITS);
	}
};

  因此在二层基数树中有一个Ensure函数,当需要建立某一页号与其span之间的映射关系时,需要先调用该Ensure函数确保用于映射该页号的空间是开辟了的,如果没有开辟则会立即开辟。

19.3 三层基数树

  上面一层基数树和二层基数树都适用于32位平台,而对于64位的平台就需要用三层基数树了。三层基数树与二层基数树类似,三层基数树实际上就是把存储页号的若干比特位分为三次进行映射。
  此时只有当要建立某一页号的映射关系时,再开辟对应的数组空间,而没有建立映射的页号就可以不用开辟其对应的数组空间,此时就能在一定程度上节省内存空间。

// Three-level radix tree
template <int BITS>
class TCMalloc_PageMap3 {
private:
	// How many bits should we consume at each interior level
	static const int INTERIOR_BITS = (BITS + 2) / 3; // Round-up
	static const int INTERIOR_LENGTH = 1 << INTERIOR_BITS;

	// How many bits should we consume at leaf level
	static const int LEAF_BITS = BITS - 2 * INTERIOR_BITS;
	static const int LEAF_LENGTH = 1 << LEAF_BITS;

	// Interior node
	struct Node {
		Node* ptrs[INTERIOR_LENGTH];
	};

	// Leaf node
	struct Leaf {
		void* values[LEAF_LENGTH];
	};

	Node* root_;                          // Root of radix tree
	void* (*allocator_)(size_t);          // Memory allocator

	Node* NewNode() {
		Node* result = reinterpret_cast<Node*>((*allocator_)(sizeof(Node)));
		if (result != NULL) {
			memset(result, 0, sizeof(*result));
		}
		return result;
	}

public:
	typedef uintptr_t Number;

	explicit TCMalloc_PageMap3(void* (*allocator)(size_t)) {
		allocator_ = allocator;
		root_ = NewNode();
	}

	void* get(Number k) const {
		const Number i1 = k >> (LEAF_BITS + INTERIOR_BITS);
		const Number i2 = (k >> LEAF_BITS) & (INTERIOR_LENGTH - 1);
		const Number i3 = k & (LEAF_LENGTH - 1);
		if ((k >> BITS) > 0 ||
			root_->ptrs[i1] == NULL || root_->ptrs[i1]->ptrs[i2] == NULL) {
			return NULL;
		}
		return reinterpret_cast<Leaf*>(root_->ptrs[i1]->ptrs[i2])->values[i3];
	}

	void set(Number k, void* v) {
		ASSERT(k >> BITS == 0);
		const Number i1 = k >> (LEAF_BITS + INTERIOR_BITS);
		const Number i2 = (k >> LEAF_BITS) & (INTERIOR_LENGTH - 1);
		const Number i3 = k & (LEAF_LENGTH - 1);
		reinterpret_cast<Leaf*>(root_->ptrs[i1]->ptrs[i2])->values[i3] = v;
	}

	bool Ensure(Number start, size_t n) {
		for (Number key = start; key <= start + n - 1;) {
			const Number i1 = key >> (LEAF_BITS + INTERIOR_BITS);
			const Number i2 = (key >> LEAF_BITS) & (INTERIOR_LENGTH - 1);

			// Check for overflow
			if (i1 >= INTERIOR_LENGTH || i2 >= INTERIOR_LENGTH)
				return false;

			// Make 2nd level node if necessary
			if (root_->ptrs[i1] == NULL) {
				Node* n = NewNode();
				if (n == NULL) return false;
				root_->ptrs[i1] = n;
			}

			// Make leaf node if necessary
			if (root_->ptrs[i1]->ptrs[i2] == NULL) {
				Leaf* leaf = reinterpret_cast<Leaf*>((*allocator_)(sizeof(Leaf)));
				if (leaf == NULL) return false;
				memset(leaf, 0, sizeof(*leaf));
				root_->ptrs[i1]->ptrs[i2] = reinterpret_cast<Node*>(leaf);
			}

			// Advance key past whatever is covered by this leaf node
			key = ((key >> LEAF_BITS) + 1) << LEAF_BITS;
		}
		return true;
	}

	void PreallocateMoreMemory() {
	}
};

  因此当我们要建立某一页号的映射关系时,需要先确保存储该页映射的数组空间是开辟好了的,也就是调用代码中的Ensure函数,如果对应数组空间未开辟则会立马开辟对应的空间。

二十、使用基数树进行优化代码实现

20.1 代码更改说明

  现在我们用基数树对代码进行优化,此时将PageCache类当中的unorder_map用基数树进行替换即可,由于当前是32位平台,因此这里随便用几层基数树都可以。

//单例模式
class PageCache
{
public:
	//...
private:
	//std::unordered_map<PAGE_ID, Span*> _idSpanMap;
	TCMalloc_PageMap1<32 - PAGE_SHIFT> _idSpanMap;
};

  此时当我们需要建立页号与span的映射时,就调用基数树当中的set函数。

_idSpanMap.set(span->_pageId, span);

  而当我们需要读取某一页号对应的span时,就调用基数树当中的get函数。

Span* ret = (Span*)_idSpanMap.get(id);

  并且现在PageCache类向外提供的,用于读取映射关系的MapObjectToSpan函数内部就不需要加锁了。

//获取从对象到span的映射
Span* PageCache::MapObjectToSpan(void* obj)
{
	PAGE_ID id = (PAGE_ID)obj >> PAGE_SHIFT; //页号
	Span* ret = (Span*)_idSpanMap.get(id);
	assert(ret != nullptr);
	return ret;
}

20.2 读取基数树映射关系时不需要加锁

  当某个线程在读取映射关系时,可能另外一个线程正在建立其他页号的映射关系,而此时无论我们用的是C++当中的map还是unordered_map,在读取映射关系时都是需要加锁的。
  因为C++中map的底层数据结构是红黑树,unordered_map的底层数据结构是哈希表,而无论是红黑树还是哈希表,当我们在插入数据时其底层的结构都有可能会发生变化。比如红黑树在插入数据时可能会引起树的旋转,而哈希表在插入数据时可能会引起哈希表扩容。此时要避免出现数据不一致的问题,就不能让插入操作和读取操作同时进行,因此我们在读取映射关系的时候是需要加锁的。
  对于基数树来说就不一样了,基数树的空间一旦开辟好了就不会发生变化,因此无论什么时候去读取某个页的映射,都是对应在一个固定的位置进行读取的。并且我们不会同时对同一个页进行读取映射和建立映射的操作,因为我们只有在释放对象时才需要读取映射,而建立映射的操作都是在page cache进行的。也就是说,读取映射时读取的都是对应span的_useCount不等于0的页,而建立映射时建立的都是对应span的_useCount等于0的页,所以说我们不会同时对同一个页进行读取映射和建立映射的操作。

20.3 再次对比malloc进行测试

  还是同样的代码,只不过我们用基数树对代码进行了优化,这时测试固定大小内存的申请和释放的结果如下:

在这里插入图片描述
  可以看到,这时就算申请释放的是固定大小的对象,其效率都是malloc的两倍。下面在申请释放不同大小的对象时,由于central cache的桶锁起作用了,其效率更是变成了malloc的好几倍。
在这里插入图片描述

二十一、项目源码

gitee: https://gitee.com/sushangzl/high-concurrency-memory-pool

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/1497553.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

[c/c++] const

const 和 #define 的区别 ? const 和指针一块出现的时候&#xff0c;到底谁不能修改 &#xff1f; const 和 volatile 能同时修饰一个变量吗 ? const 在 c 中的作用 ? 1 const 和 #define 的区别 const 和 #define 的相同点&#xff1a; (1) 常数 const 和 #define 定…

vue3中el-input输入无效的原因之一

表单的model用的是&#xff1a;reactive let updateForm reactive({ id: 0, className: "" }); reactive的数据不能这么赋值&#xff0c;会破坏响应性 错误方法&#xff08;&#xff09;{ updateForm { id: 0, className: "asdasdas" }; } 解决方法&…

【Qt】初识Qt

文章目录 一. 行业岗位介绍二. 什么是客户端&#xff1f;三. GUI 开发的各自技术方案四. 什么是框架&#xff1f;五. Qt 的发展史五. Qt 支持的系统六. Qt 的优点 一. 行业岗位介绍 二. 什么是客户端&#xff1f; 既然 Qt 是用来进行客户端开发的&#xff0c;那我们就要了解什…

C#使用iText7给PDF文档添加书签

上一篇文章将SqlSugar官网文档中每个链接对应的网页生成独立PDF文档再合并为单个PDF文档&#xff0c;但是没有书签&#xff0c;八百多页的内容查找和跳转都不方便&#xff0c;本文学习和使用iText7给PDF文档添加多级书签。   添加多级书签分为两大步骤&#xff1a;1&#xff…

Java EE之线程安全问题

一.啥是线程安全问题 有些代码&#xff0c;在单个线程执行时完全正确&#xff0c;但同样的代码让多个线程同时执行&#xff0c;就会出现bug。例如以下代码&#xff1a; 给定一个变量count&#xff0c;让线程t1 t2分别自增5000次&#xff0c;然后进行打印&#xff0c;按理说co…

小游戏加固方案已全面适配微信、QQ、抖音、快手、美团、华为、支付宝渠道

2023年&#xff0c;国内移动游戏收入与游戏用户规模双双创下历史新高。其中小游戏异军突起&#xff0c;市场规模达到200亿元&#xff0c;同比增长300%&#xff0c;成了万众瞩目的行业新风口。 小游戏的高速发展带来了更多的活力&#xff0c;产出了多款月流水过亿的热门游戏。行…

深入解析Mybatis-Plus框架:简化Java持久层开发(八)

&#x1f340; 前言 博客地址&#xff1a; CSDN&#xff1a;https://blog.csdn.net/powerbiubiu &#x1f44b; 简介 本章节介绍如何通过Mybatis-Plus更新数据库中的数据。 本章节不需要前置准备&#xff0c;继续使用之前的测试类&#xff0c;数据库表进行操作。 &#x1f4…

基于单片机的机动车智能远光灯系统设计

目 录 摘 要 I Abstract II 引 言 1 1 主要研究内容及总体设计方案 3 1.1 主要研究内容 3 1.2 系统总体方案选择 3 1.3 系统功能的确定 4 2 硬件电路的设计 5 2.1 单片机控制模块设计 5 2.2 液晶显示模块电路设计 7 2.3 远近灯光电路设计 9 2.4 按键电路设计 9 2.5 超声波电路…

XSS渗透与防御

一、HTTP协议回顾 二、客户端的Cookie 三、服务端的Session 四、JavaScript操作Cookie 使用js语法查看当前网站的cookie 使用js语法添加cookie值 添加unamewuya 刷新网页可以看到添加的cookie值已经发送给服务器 五、脚本注入网页-XSS 六、XSS检测和利用 xsser可以检测网页是…

coqui-ai/TTS 安装使用

Coqui AI的TTS是一款开源深度学习文本转语音工具&#xff0c;以高质量、多语言合成著称。它提供超过1100种语言的预训练模型库&#xff0c;能够轻松集成到各种应用中&#xff0c;并允许用户通过简单API进行个性化声音训练与微调。其技术亮点包括但不限于低资源适应性&#xff0…

7.1 支付模块 - 用户选课

支付模块 - 需求分析、添加选课 文章目录 支付模块 - 需求分析、添加选课一、需求分析1.1 选课业务流程1.2 支付业务流程1.3 在线学习业务流程1.4 课程续期业务流程 二、添加选课2.1 执行流程2.2 数据模型2.2.1 选课记录表 choose_course2.2.2 用户课程表 course_tables 2.3 查…

在多文件编译时,如果模板类的成员函数的定义和模板类不在一个文件下会怎么样?

编译器将找不到成员函数的定义&#xff0c;哪怕你将存放成员函数定义的test.cpp一块编译&#xff0c;编译器也无法找到该模板类的成员函数的定义。 正确的做法是&#xff1a; 将模板类的声明和成员函数定义都定义在.h文件下

星辰天合参与编制 国内首个可兼顾 AI 大模型训练的高性能计算存储标准正式发布

近日&#xff0c;在中国电子工业标准化技术协会高标委的支持和指导下&#xff0c;XSKY星辰天合作为核心成员参与编制的《高性能计算分布式存储系统技术要求》团体标准&#xff0c;在中国电子工业标准化技术协会网站正式发布。 该团体标准强调了分布式存储系统对包括传统高性能计…

libftdi库编译

目录 1. 下载源码 2. Ubuntu下编译 2.1 配置编译环境 2.2 编译 3. Android NDK下编译 3.1 编译libconfuse 3.2 编译libusb 3.3 编译libudev 3.3 编译libftdi 分2部分&#xff0c;先在Ubuntu中编译&#xff0c;然后在Android NDK中编译。 1. 下载源码 下载地址&#…

开源文生图大模型Playground v2.5发布:超越SD、DALL·E 3和 Midjourney

前言 在AI技术迅速发展的今天&#xff0c;文生图模型成为了艺术创作、设计创新等领域的重要工具。Playground v2.5的发布&#xff0c;不仅在技术上取得了突破&#xff0c;更在开源文化的推广与实践上迈出了重要一步。 Huggingface模型下载&#xff1a;https://huggingface.co/…

一文读懂 Databend 的开放表格式引擎

本文介绍了 Databend 开放表格式引擎的支持情况&#xff0c;包括优势与不足、使用方法、与 Catalog 方案的对比。此外&#xff0c;还包含一个简单的 Workshop &#xff0c;介绍如何利用 Databend Cloud 分析位于对象存储中的 Delta Table 。 Databend 近期发布 Apache Iceberg …

如何排查合并问题——《OceanBase诊断系列》之七

1. 前言 OceanBase数据库的存储引擎以 LSM-Tree 架构为基础&#xff0c;区分静态基线数据&#xff08;存储在只读SSTable&#xff09;和动态增量数据&#xff08;存储在可读写MemTable&#xff09;。其中 SSTable 是只读的&#xff0c;一旦生成就不再被修改&#xff0c;存储于…

每日OJ题_链表①_力扣2. 两数相加

目录 力扣2. 两数相加 解析代码 力扣2. 两数相加 2. 两数相加 难度 中等 给你两个 非空 的链表&#xff0c;表示两个非负的整数。它们每位数字都是按照 逆序 的方式存储的&#xff0c;并且每个节点只能存储 一位 数字。 请你将两个数相加&#xff0c;并以相同形式返回一个…

MyBatis-Flex学习总结

写在前面的话 MyBatis-Flex 是一个优雅的 MyBatis 增强框架&#xff0c;它非常轻量、同时拥有极高的性能与灵活性。我们可以轻松的使用 Mybaits-Flex 链接任何数据库&#xff0c;其内置的 QueryWrapper 帮助我们极大的减少了 SQL 编写的工作的同时&#xff0c;减少出错的可能性…

VPN应用场景典型案例-站点到站点组网应用

组网需求 站点到站点IPSEC隧道也是LAN -to -LAN IPSec描述的是两个局域网之间建立IPSec隧道的概念,建立站到站IPSec隧道时,两个专用网络之间跨越一个公用网络,这样就可以实现私有网络A:192.168.0.0/24到私有网络B:192.168.1.0/24之间的安全通信。以下是该典型环境的组网图…