[项目设计]高并发内存池

news2025/1/11 10:49:18

目录

1、项目介绍

2、高并发内存池整体框架设计

3、thread cache

<1>thread cache 哈希桶对齐规则

<2>Thread Cache类设计

4、Central Cache

<1>Central Cache类设计

5、page cache

<1>Page Cache类设计

 6、性能分析

<1>定长内存池实现

<2>基数树

 7、项目源码及项目总结


1、项目介绍

        当前项目是实现一个高并发的内存池,他的原型是google的一个开源项目tcmalloc,tcmalloc全称Thread-Caching Malloc,即线程缓存的malloc,实现了高效的多线程内存管理,用于替代系统的内存 分配相关的函数(malloc、free)。
        另一方面tcmalloc是全球大厂google开源的,可以认为当时顶尖的C++高手写出来的,他的知名度也是非常高的,不少公司都在用它,Go语言直接用它做了自己内存分配器。
        本文是把tcmalloc最核心的框架简化后拿出来,模拟实现出一个自己的高并发内存池,目的是学习tcamlloc的精华。
应用技术
数据结构(链表、哈希桶)、操作系统内存管理、单例模式、多线程、互斥锁等。
什么是内存池?

想必大家看到这几个字也应该自己能想出个大概,简单来说内存池是指程序预先从操作系统申请一块足够大内存,然后自己管理。此后,当程序中需要申请内存的时候,不是直接向操作系统申请,而是直接从内存池中获取;同理,当程序释放内存的时候,并不真正将内存返回给操作系统,而是返回内存池。当程序退出(或者特定时间)时,内存池才将之前申请的内存真正释放。 

内存池主要解决了效率问题,避免频繁找操作系统申请内存。 其次如果作为系统的内存分配器的角度,还需要解决一下内存碎片的问题。 

那么什么是内存碎片呢?  

内存碎片分为外碎片和内碎片,内碎片我们在下文项目中具体解释(这里我们简单概述一下),这里我们主要看比较容易理解的外碎片如图:

 现有768Byte的空间,但是如果我们要申请超过512Byete的空间却申请不出来,因为这两块空间碎片化不连续了。

内碎片:内部碎片是由于一些对齐的需求,导致分配出去的空间中一些内存无法被利用。(具体见下文哈希桶对齐规则)

 注:我们下面实现的内存池主要是是尝试解决的是外部碎片的问题,同时也尽可能的减少内部碎片的产生。

malloc

C/C++中我们要动态申请内存都是通过malloc去申请内存,但是我们要知道,实际我们不是直接去堆获取内存的, 而malloc就是一个内存池。malloc() 相当于向操作系统“批发”了一块较大的内存空间,然后“零售”给程 序用。当全部“售完”或程序有大量的内存需求时,再根据实际需求向操作系统“进货”。

                 

2、高并发内存池整体框架设计

        现代很多的开发环境都是多核多线程,在申请内存的场景下,必然存在激烈的锁竞争问题。malloc本身其实已经很优秀,那么我们项目的原型tcmalloc就是在多线程高并发的场景下更胜一筹,所以这次我们实现的内存池需要考虑以下几方面的问题
  1. 性能问题。
  2. 多线程环境下,锁竞争问题。
  3. 内存碎片问题。

                      

concurrent memory pool主要由3个部分构成:
1. thread cache:线程缓存是每个线程独有的,用于小于256KB的内存的分配,线程从这里申请内存不需要加锁,每个线程独享一个thread cache,这也就是这个并发线程池高效的地方
2. central cache:中心缓存是所有线程所共享,thread cache是按需从central cache中获取的对象。central cache合适的时机回收thread cache中的对象,避免一个线程占用了太多的内存,而其他线程的内存吃紧,达到内存分配在多个线程中更均衡的按需调度的目的。central cache是存在竞争的,所以从这里取内存对象是需要加锁,首先这里用的是桶锁,其次只有thread cache的对象没有内存时才会找central cache,所以这里竞争不会很激烈
3. page cache:页缓存是在central cache缓存上面的一层缓存,存储的内存是以页为单位存储及分配的,central cache没有内存对象时,从page cache分配出一定数量的page,并切割成定长大小的小块内存,分配给central cache。当一个span的几个跨度页的对象都回收以后,page cache 会回收central cache满足条件的span对象,并且合并相邻的页,组成更大的页,缓解内存碎片的问题

3、thread cache

        thread cache是哈希桶结构,每个桶是一个按桶位置映射大小的内存块对象的自由链表。每个线程都会有一个thread cache对象,这样每个线程在这里获取对象和释放对象时是无锁的。

        thread cache支持小于等于256KB内存的申请,如果我们将每种字节数的内存块都用一个自由链表进行管理的话,那么此时我们就需要20多万个自由链表,光是存储这些自由链表的头指针就需要消耗大量内存,这显然是得不偿失的。

        这时我们可以选择做一些平衡的牺牲,让这些字节数按照某种规则进行对齐(见下文对齐规则),例如我们让这些字节数都按照8字节进行向上对齐(考虑到32位和64位下指针大小),那么thread cache的结构就是下面这样的,此时当线程申请1~8字节的内存时会直接给出8字节,而当线程申请9~16字节的内存时会直接给出16字节,以此类推。

申请内存:
1. 当内存申请 size<=256KB ,先获取到线程本地存储的 thread cache 对象,计算 size 映射的哈希桶自由链表下标i
2. 如果自由链表 _freeLists[i] 中有对象,则直接 Pop 一个内存对象返回。
3. 如果 _freeLists[i] 中没有对象时,则批量从 central cache 中获取一定数量的对象,插入到自由链表并返回一个对象。
释放内存:
1. 当释放内存小于 256k 时将内存释放回 thread cache ,计算 size 映射自由链表桶位置 i ,将对象 Push到_freeLists[i]
2. 当链表的长度过长,则回收一部分内存对象到 central cache

通过上图分析我们需要一个自由链表来管理内存块,下面我们来对自由链表进行封装(仅写出当前一些很容易想到的接口,后序需要我们在进行添加)。

class FreeList {
private:
	void* _freelist = nullptr;
	size_t _size = 0;    //记录链表长度
	size_t _MaxSize = 1; //控制慢增长
public:
	void push(void* obj) {
		assert(obj);
		//头插
		CurNext(obj) = _freelist;
		_freelist = obj;
		++_size;
	}
	void* popFront() {
		assert(_freelist);
		void* cur = _freelist;
		_freelist = CurNext(_freelist);
		--_size;
		return cur;
	}
	bool Empty() {
		return _freelist == nullptr;
	}
	size_t& Size() {
		return _size;
	}
	size_t& MaxSize() {
		return _MaxSize;
	}
};

<1>thread cache 哈希桶对齐规则

字节数对齐数哈希桶下标区间桶数量
[1,128]8byte对齐freelist[0,16)16
    [128+1,1024]    16byte对齐freelist[16,72)56
[1024+1,8*1024]128byte对齐freelist[72,128)56
[8*1024+1,64*1024]1024byte对齐freelist[128,184)56
[64*1024+1,256*1024]8*1024byte对齐freelist[184,208)24

        上文中我们已经提到过内碎片这个概念,我们应该尽可能减少内碎片的产生。按照上面对齐规则的话整体控制在10%左右的内碎片浪费,第一个区间我们不做考虑因为1字节就算对齐到2字节也会产生50%的空间浪费,我们从第二个区间开始计算比如我们申请130个字节实际给到的是145字节实际多给了15字节。15/145 ≈ 0.103,浪费了大概在10左右,下面几个区间大家可以自己计算下浪费率也是大概在10%左右。

有了对齐规则我们还需要计算出相应的内存对应在哪一个桶中,如上表中每个区间都有一定桶的数量如[1,128]有16个桶那么我们申请1~8字节都对应在下标0号桶中,9~16字节都对应在下标1号桶中于是我们需要一个函数来处理计算。

对齐规则和下标映射规则编写

为了后序使用方便我们,将其封装在一个类当中。

static const size_t PAGE_SHIFT = 13;
static const size_t MAX_BYTES = 256 * 1024; //最大字节数

//计算对象大小对齐规则
class SizeClass {
public:
	//20 8 --> 24
	//110 8 --> 112
	//容易想到的
	//size_t _AlignSize(size_t bytes, size_t alignNum) {
	//	size_t alignSize;
	//	if (bytes % alignNum == 0) {
	//		alignSize = bytes;
	//	}
	//	else {
	//		alignSize = (bytes / alignNum + 1) * alignNum;
	//	}
	//  return alignSize;
	//}

	static inline size_t _AlignSize(size_t bytes, size_t alignNum){
		return ((bytes + alignNum - 1) & ~(alignNum - 1));
	}
	//对齐大小计算
	static inline size_t AlignSize(size_t bytes) {
		//assert(bytes <= MAX_BYTES);

		if (bytes <= 128) {
			return _AlignSize(bytes, 8);
		}
		else if (bytes <= 1024) {
			return _AlignSize(bytes, 16);
		}
		else if (bytes <= 8 * 1024) {
			return _AlignSize(bytes, 128);
		}
		else if (bytes <= 64 * 1024) {
			return _AlignSize(bytes, 1024);
		}
		else if (bytes <= 256 * 1024) {
			return _AlignSize(bytes, 8 * 1024);
		}
		else {
			return _AlignSize(bytes, 1 << PAGE_SHIFT); //页对齐
		}
	}

	//容易想到的
	//size_t _Index(size_t bytes, size_t alignNum) {
	//	if (bytes % alignNum == 0) {
	//		return bytes / alignNum - 1; //下标从0开始
	//	}
	//	else {
	//		return bytes / alignNum;
	//	}
	//}

	//20 3  --> 1
	//130 4 --> 8
    //好的实现方法
	static inline size_t _Index(size_t bytes, size_t align_shift){
		return ((bytes + (1 << align_shift) - 1) >> align_shift) - 1;
	}

	// 计算映射的哪一个自由链表桶
	static inline size_t Index(size_t bytes){
		assert(bytes <= MAX_BYTES);

		static int group_array[4] = { 16, 56, 56, 56 }; // 每个区间有多少个链
		if (bytes <= 128) {
			return _Index(bytes, 3); //传2的次方
		}
		else if (bytes <= 1024) {
			return _Index(bytes - 128, 4) + group_array[0];
		}
		else if (bytes <= 8 * 1024) {
			return _Index(bytes - 1024, 7) + group_array[1] + group_array[0];
		}
		else if (bytes <= 64 * 1024) {
			return _Index(bytes - 8 * 1024, 10) + group_array[2] + group_array[1] + group_array[0];
		}
		else if (bytes <= 256 * 1024) {
			return _Index(bytes - 64 * 1024, 13) + group_array[3] + group_array[2] + group_array[1] + group_array[0];
		}
		else {
			assert(false);
		}

		return -1;
	}
};

<2>Thread Cache类设计

通过上述内存申请分析,可以想到我们需要申请内存函数和释放内存函数(释放内存函数分两种情况:链表长度较短直接将对象挂在链表中,链表长度过长释放链表)以及从中心缓存(central cache)获取对象的一个函数。如下定义:

class ThreadCache {
private:
	FreeList _freelists[NFREELIST];
public:
	void* Allocate(size_t size);  //申请内存
	void Deallocate(void* ptr, size_t size); //释放内存
	void ListTooLong(FreeList& list, size_t size); //链表太长释放链表

	//从中心缓存获取对象
	void* FetchFromCentralCache(size_t index, size_t size);
};

//TLS thread local storage  --> 线程局部存储
static _declspec(thread) ThreadCache* pTLSThreadCache = nullptr;

接口实现:


//申请内存
void* ThreadCache::Allocate(size_t size) {
	assert(size <= MAX_BYTES);
	//传过来申请字节数计算出对齐大小(实际给到的内存大小)
	size_t alignSize = SizeClass::AlignSize(size);
	size_t index = SizeClass::Index(size);
	if (!_freelists[index].Empty()) {
		return _freelists[index].popFront();
	}
	else {
		//从中心缓存获取对象
		//...
	}
}
//释放内存
void ThreadCache::Deallocate(void* ptr, size_t size) {
	assert(ptr);
	assert(size <= MAX_BYTES);
	//找到对应桶位置进行插入
	size_t index = SizeClass::Index(size);
	_freelists[index].push(ptr);

	//当链表长度大于一次申请的最大内存值将其还给central cache
	if (_freelists[index].Size() >= _freelists[index].MaxSize()) {
		ListTooLong(_freelists[index], size);
	}
}
//链表太长释放链表
void ThreadCache::ListTooLong(FreeList& list, size_t size) {
	//首先从原链表中将这段链表删除,接着还给中心缓存
	//在freelist中增加区间删除函数
	void* start = nullptr;
	void* end = nullptr;
	list.PopRange(start, end, list.Size());

	//将链表还给中心缓存
	//...
}

//从中心缓存获取对象
void* ThreadCache::FetchFromCentralCache(size_t index, size_t size) {
	//慢反馈调节算法
	//1、最开始不会一次向central cache批量要太多,因为要太多有可能会用不完
	//2、如果不断size大小内存需求,那么batchNum就会不断增长直到上限
	//3、size越小一次向central cache要的batchNum越小
	//4、size越大一次向central cache要的batchNum越大
	size_t batchNum = std::min(_freelists[index].MaxSize(), SizeClass::NumMoveSize(size));
	if (_freelists[index].MaxSize() == batchNum) {
		_freelists[index].MaxSize() += 1;
	}

	//调用cnetral cache中获取对象接口
	//...
}

线程局部存储TLS(Thread Local Storage)

要实现每个线程无锁的访问属于自己的thread cache,我们需要用到线程局部存储TLS(Thread Local Storage),这是一种变量的存储方法,使用该存储方法的变量在它所在的线程是全局可访问的,但是不能被其他线程访问到,这样就保持了数据的线程独立性。但不是每个线程被创建时就立马有了属于自己的thread cache,而是当该线程调用相关申请内存的接口时才会创建自己的thread cache。

在FreeList类中添加PopRange函数

	void PopRange(void*& start, void*& end, size_t n) {
		assert(n <= _size);
		start = _freelist;
		end = start;
		for (int i = 0; i < n - 1; i++) {
			end = CurNext(end);
		}
		_freelist = CurNext(end);
		CurNext(end) = nullptr;
		_size -= n;
	}

在SizeClass类中添加NumMoveSize函数

	// 一次thread cache从中心缓存获取多少个
	//也就是计算可以给到你几个对象
	//其中上限512,下限2也可以理解为限制桶中链表的长度
	static size_t NumMoveSize(size_t size)
	{
		assert(size > 0);

		// [2, 512],一次批量移动多少个对象的(慢启动)上限值
		//对象越小,计算出的上限越高
		//对象越大,计算出的上限越低
		int num = MAX_BYTES / size;
		if (num < 2)
			num = 2;

		if (num > 512)
			num = 512;

		return num;
	}

4、Central Cache

central cache 也是一个哈希桶结构,他的哈希桶的映射关系跟 thread cache 是一样的。不同的是他的每个哈希桶位置挂是SpanList 链表结构,不过每个映射桶下面的 span 中的大内存块被按映射关系切成了一个个小内存块对象挂在span 的自由链表中。

申请内存:
1. 当thread cache中没有内存时,就会批量向central cache申请一些内存对象,这里的批量获取对象的数量使用了类似网络tcp协议拥塞控制的慢开始算法;central cache也有一个哈希映射的spanlist,spanlist中挂着span,从span中取出对象给thread cache,这个过程是需要加锁的,不过这里使用的是一个桶锁,尽可能提高效率。
2. central cache映射的spanlist中所有span的都没有内存以后,则需要向page cache申请一个新的span对象,拿到span以后将span管理的内存按大小切好作为自由链表链接到一起。然后从span中取对象给thread cache。
3. central cache的中挂的span中use_count记录分配了多少个对象出去,分配一个对象给thread cache,就++use_count。
释放内存:
1. 当thread_cache过长或者线程销毁,则会将内存释放回central cache中的,释放回来时 --use_count。当use_count减到0时则表示所有对象都回到了span,则将span释放回page cache,page cache中会对前后相邻的空闲页进行合并。

<1>Central Cache类设计

根据上图我们可以知道central cache也是一个哈希桶结构,通中挂的是一个个的span并且是双链表,而sapn中又包含了freellist,如下定义:

首先定义一个SpanNode的节点来表示一个个的Span对象


struct SpanNode {
	PAGE_ID _pageId = 0; //大块内存起始页号
	size_t _n = 0; //页的数量

	SpanNode* _next = nullptr;
	SpanNode* _prev = nullptr;

	void* _freeList = nullptr; //自由链表
	size_t _size = 0; //切好小对象大小
	size_t _useCount = 0; //分配给thread cache小块内存数量
	
	bool _isUse = false; //是否正在被使用

};

接着实现一个双链表结构用来将Span挂接起来

class SpanList {
private:
	SpanNode* _head = nullptr; //头节点
	std::mutex _mtx; //桶锁
public:
	SpanList() {
		_head = new SpanNode;
		_head->_next = _head;
		_head->_prev = _head;
	}
	~SpanList() {
		delete _head;
	}
	std::mutex& getMutex() {
		return _mtx;
	}
	SpanNode* Begin() {
		return _head->_next;
	}
	SpanNode* end() {
		return _head;
	}
	bool Empty() {
		return _head->_next == _head;
	}
	void Insert(SpanNode* pos, SpanNode* newSpan) {
		assert(pos && newSpan);
		SpanNode* prev = pos->_prev;

		prev->_next = newSpan;
		newSpan->_prev = prev;

		pos->_prev = newSpan;
		newSpan->_next = pos;	

	}

	void Erase(SpanNode* pos) {
		assert(pos);
		assert(pos != _head);
		SpanNode* prev = pos->_prev;
		SpanNode* next = pos->_next;

		prev->_next = next;
		next->_prev = prev;
	}

	SpanNode* PopFront() {
		SpanNode* ret = _head->_next;
		Erase(ret);
		return ret;
	}

	void PushFront(SpanNode* spanNode) {
		Insert(Begin(), spanNode);
	}
};

Central Cache类定义

和Thread Cache不同的是,每个线程中都会有一个Thread Cache对象以防止各个线程互相抢占内存的问题。而Central Cache为所有线程所共享且全局只有一个Central Cache对象,故我们应该将其实现为单例模式(其中懒汉模式涉及线程安全问题,故我们将其实现为懒汉模式)。其主要作用是向下为Thread Cache提供内存,向上向Page Cache申请内存。回收内存也是同理。故我们需要定义一下几个接口,从Page Cache获取非空spanNode、为Thread Cache提供内存、将内存释放回Page Cahce三个函数。如下定义:
//单例模式(懒汉)
class CentralCache {
private:
	SpanList _SpanLists[NFREELIST];
private:
	CentralCache() {}
	CentralCache(const CentralCache&) = delete;
	static CentralCache _sInst; //类外初始化
public:
	static CentralCache* GetInStance() {
		return &_sInst;
	}

	// 获取一个非空的spanNode
	SpanNode* GetOneSpan(SpanList& list, size_t size);

	// 从中心缓存获取一定数量的对象给thread cache
	size_t FetchRangeObj(void*& start, void*& end, size_t batchNum, size_t size);

	// 将一定数量的对象释放到span
	void ReleaseListToSpans(void* start, size_t byte_size);
};

接口实现:

// 获取一个非空的spanNode
SpanNode* CentralCache::GetOneSpan(SpanList& list, size_t size) {
	//查看当前桶中的每个SpanNode节点是否有未分配的对象
	SpanNode* it = list.Begin();
	while (it != list.end()) {
		if (it->_freeList != nullptr) {
			return it;
		}
		else {
			it = it->_next;
		}
	}
	//走到这里说明当前桶中每个节点没有未分配的对象了,找pageCache要
	//...
	
}

// 从中心缓存获取一定数量的对象给thread cache
size_t CentralCache::FetchRangeObj(void*& start, void*& end, size_t batchNum, size_t size) {
	size_t index = SizeClass::Index(size);

	//这里会涉及多个线程同时从中心缓存获取内存的情况,故应该加锁
	_SpanLists[index].getMutex().lock();
	SpanNode* spanNode = GetOneSpan(_SpanLists[index], size);
	assert(spanNode);
	assert(spanNode->_freeList);

	//从spanNode中获取batchNum个对象,如果不够有多少拿多少
	start = spanNode->_freeList;
	end = start;
	int i = 0;
	int ActualNum = 1;
	while (i < batchNum - 1 && CurNext(end) != nullptr) {
		end = CurNext(end);
		++i;
		++ActualNum;
	}
	//从spanNode将这段链表删除,并统计出该spanNode中自由链表已经使用的数量
	spanNode->_freeList = CurNext(end);
	CurNext(end) = nullptr;
	spanNode->_useCount += ActualNum;
	_SpanLists[index].getMutex().unlock();

	return ActualNum;
}

关于当链表太长时要将内存还给SpanNode时情况稍微有点复杂,需要好好思考一下。因为还回来的链表中的每个节点的地址我们没办法确定是否连续的。这就导致有可能换回来一个链表但其中的节点分布于_SpanList的多个SpanNode节点中,这就需要对其筛选让其进入不同的桶中(这部分代码我们放到Page Cache中来实现,因为Central Cache中的SpanNode节点都是Page Cache分配给他的),这里我们先把大概逻辑顺一下如下图:

测试代码如下: 

void TestAddressPage() {
	PAGE_ID id1 = 3000;
	PAGE_ID id2 = 3001;
	char* p1 = (char*)(id1 << PAGE_SHIFT);
	char* p2 = (char*)(id2 << PAGE_SHIFT);
	while (p1 < p2) {
		cout << (void*)p1 << " : " << ((PAGE_ID)p1 >> PAGE_SHIFT) << endl;
		p1 += 8;
	}
}

 执行结果:

                  

由上图我们可以看出,事实和我们的推论是一样的。释放内存代码如下图:

//将一定数量的对象释放到span
void CentralCache::ReleaseListToSpans(void* start, size_t byte_size) {
	size_t index = SizeClass::Index(byte_size);

	_SpanLists[index].getMutex().lock();
	//将传过来的链表一个个头插进对应的spanNode中
	while (start) {
		void* next = CurNext(start);
		//通过链表每个节点的地址来获取对应的spanNode地址
		SpanNode* SpanNode = PageCache::GetInstance()->MapObjectToSpan(start);
		CurNext(start) = SpanNode->_freeList;
		SpanNode->_freeList = start;
		SpanNode->_useCount--; //每头插回来一个已使用数量减一

		//当SpanNode已使用数量为0时说明该节点中的自由链表节点都还回来了,继续归还给PageCache处理
		if (SpanNode->_useCount == 0) {
			//归还给PageCache
			//...
		}

		start = next;
	}
	_SpanLists[index].getMutex().unlock();
}

5、page cache

        首先,central cache的映射规则与thread cache保持一致,而page cache的映射规则与它们都不相同。page cache的哈希桶映射规则采用的是直接定址法,比如1号桶挂的都是1页的span,2号桶挂的都是2页的span,以此类推。

  其次,central cache每个桶中的span被切成了一个个对应大小的对象,以供thread cache申请。而page cache当中的span是没有被进一步切小的,因为page cache服务的是central cache,当central cache没有span时,向page cache申请的是某一固定页数的span,而如何切分申请到的这个span就应该由central cache自己来决定。

     

注:上图中1page和3page桶下面挂的链在最开始的时候是没有的他们都是由128page(我们每次向系统申请的是固定大小128page)切分后挂到上面的(切分逻辑见下文代码)。

申请内存:
1. 当central cache向page cache申请内存时,page cache先检查对应位置有没有span,如果没有则向更大页寻找一个span,如果找到则分裂成两个。比如:申请的是4页page,4页page后面没有挂span,则向后面寻找更大的span,假设在10页page位置找到一个span,则将10页page span分裂为一个4页page span和一个6页page span。
2. 如果找到_spanList[128]都没有合适的span,则向系统使用mmap、brk或者是VirtualAlloc等方式申请128页page span挂在自由链表中,再重复1中的过程。
3. 需要注意的是central cache和page cache 的核心结构都是spanlist的哈希桶,但是他们是有本质区别的,central cache中哈希桶,是按跟thread cache一样的大小对齐关系映射的,他的spanlist中挂的span中的内存都被按映射关系切好链接成小块内存的自由链表。而page cache 中的spanlist则是按下标桶号映射的,也就是说第i号桶中挂的span都是i页内存。
释放内存:
1. 如果 central cache 释放回一个 span 则依次寻找 span 的前后 page id 的没有在使用的空闲 span 看是否可以合并,如果合并继续向前寻找。这样就可以将切小的内存合并收缩成大的 span ,减少 内存碎片

<1>Page Cache类设计

通过上面的分析我们可以知道Page Cache和Central Cache一样是被多个线程共享的故也应该将其设计为单例模式,上面在讲Central Cache释放逻辑是提到我们要根据自由链的节点地址找到其对应SpanNode节点故我们还应该添加其映射关系这里我们采用unordered_map进行映射。此外我们还需要对Central Cache提供对象的接口、回收SpanNode的接口、获取映射关系的接口,如下定义:

static const size_t NPAGES = 129; //下标是从0开始的

class PageCache {
private:
	SpanList _pageLists[NPAGES];
	std::unordered_map<PAGE_ID, SpanNode*> _idSpanNodeMap;
	std::mutex _PageMtx;
private:
	PageCache() {};
	PageCache(const PageCache&) = delete;
	static PageCache _sInst;
public:
	static PageCache* GetInstance() {
		return &_sInst;
	}

	std::mutex& GetMutex() {
		return _PageMtx;
	}
	//获取n页SpanNode
	SpanNode* NewSpan(size_t n);

	//获取从对象到SpanNode的映射
	SpanNode* MapObjectToSpanNode(void* obj);

	//释放空闲SpanNode回到PageCache,并合并相邻的SpanNode
	void ReleaseSpanNodeToPageCache(SpanNode* SpanNode);
};

代码实现:

当Page Cache中无内存时我们需要向系统取申请内存,故我们应该提供一个向系统申请内存的接口,如下

windows和Linux下如何直接向堆申请页为单位的大块内存:

// 去堆上按页申请空间
inline static void* SystemAlloc(size_t kpage)
{
#ifdef _WIN32
	void* ptr = VirtualAlloc(0, kpage << 13, MEM_COMMIT | MEM_RESERVE, PAGE_READWRITE);
#else
	// linux下brk mmap等
#endif

	if (ptr == nullptr)
		throw std::bad_alloc();

	return ptr;
}

inline static void SystemFree(void* ptr)
{
#ifdef _WIN32
	VirtualFree(ptr, 0, MEM_RELEASE);
#else
	// linux下sbrk unmmap等
#endif
}
static const size_t NPAGES = 129; //下标是从0开始的

PageCache PageCache::_sInst;

//获取n页SpanNode
SpanNode* PageCache::NewSpan(size_t n) {
	assert(n > 0);
	//大于128页直接向堆申请内存
	if (n > NPAGES - 1) {
		void* ptr = SystemAlloc(n);
		SpanNode* spanNode = new SpanNode;
		spanNode->_pageId = (PAGE_ID)ptr >> PAGE_SHIFT; //右移13位相当于除8k
		spanNode->_n = n;

		_idSpanNodeMap[spanNode->_pageId] = spanNode; //建立映射方便释放内存
		return spanNode;
	}

	//检查对应桶中是否有sapnNode
	if (!_pageLists[n].Empty()) {
		SpanNode* nSpanNode = _pageLists[n].PopFront();
		//将其中节点建立映射关系映射
		for (PAGE_ID i = 0; i < nSpanNode->_n; i++) {
			_idSpanNodeMap[nSpanNode->_pageId + i] = nSpanNode;
		}
		return nSpanNode;
	}

	//检查后面桶中是否有SpanNode节点,有则进行切分
	for (size_t i = n + 1; i < NPAGES; i++) {
		if (!_pageLists[i].Empty()) {
			//找到第i个桶不为空时先将其取出,切分出n个后在放回相应桶中
			SpanNode* ISpanNode = _pageLists[i].PopFront();
			//开辟出一个节点进行切分,然后返回
			SpanNode* NSpanNode = new SpanNode;
			NSpanNode->_n = n;
			NSpanNode->_pageId = ISpanNode->_pageId;

			ISpanNode->_pageId += n;
			ISpanNode->_n -= n;

			_pageLists[ISpanNode->_n].PushFront(ISpanNode);

			//存储ISpanNode起始页号映射方便回收
			_idSpanNodeMap[ISpanNode->_pageId] = ISpanNode;
			_idSpanNodeMap[ISpanNode->_pageId + ISpanNode->_n - 1] = ISpanNode;

			for (PAGE_ID i = 0; i < NSpanNode->_n; i++) {
				_idSpanNodeMap[NSpanNode->_pageId + i] = NSpanNode;
			}

			return NSpanNode;
		}
	}

	//走到这里说明后面的桶中没有剩余的SpanNode节点,找系统申请一页
	SpanNode* bigSpanNode = new SpanNode;
	void* ptr = SystemAlloc(NPAGES - 1);
	bigSpanNode->_pageId = (PAGE_ID)ptr >> PAGE_SHIFT; 
	bigSpanNode->_n = NPAGES - 1;

	_pageLists[bigSpanNode->_n].PushFront(bigSpanNode); //将申请的大页span放入哈希桶中

	return NewSpan(n); //重新调用进行切分

}
//获取从对象到SpanNode的映射
SpanNode* PageCache::MapObjectToSpanNode(void* obj) {
	PAGE_ID id = (PAGE_ID)obj >> PAGE_SHIFT;

	std::unique_lock<std::mutex> lock(PageCache::GetInstance()->GetMutex()); //出作用域自动解锁
	auto ret = _idSpanNodeMap.find(id);
	if (ret != _idSpanNodeMap.end()) {
		//找到了进行返回
		return ret->second;
	}
	else {
		//找不到说明出问题了直接报错
		assert(false); 
		return nullptr;
	}
}
//释放空闲SpanNode回到PageCache,并合并相邻的SpanNode
void PageCache::ReleaseSpanNodeToPageCache(SpanNode* spanNode) {

	//大于128页直接还给堆
	if (spanNode->_n > NPAGES - 1) {
		//根据页号算出响应的地址,然后进行释放
		void* ptr = (void*)(spanNode->_pageId << PAGE_SHIFT);
		SystemFree(ptr);
		return;
	}

	//向前合并
	while (1) {
		PAGE_ID prev = spanNode->_pageId - 1;
		//找不到跳出循环
		auto ret = _idSpanNodeMap.find(prev);
		if (ret == _idSpanNodeMap.end()) {
			break;
		}
		//prevSpanNode正在被使用跳出循环
		SpanNode* prevSpanNode = ret->second;
		if (prevSpanNode->_isUse == true) {
			break;
		}
		//合并出超出128kb的spanNode不进行管理
		if (prevSpanNode->_n + spanNode->_n > NPAGES - 1) {
			break;
		}

		//进行合并
		spanNode->_pageId = prevSpanNode->_pageId;
		spanNode->_n += prevSpanNode->_n;

		_pageLists[prevSpanNode->_n].Erase(prevSpanNode);

		delete prevSpanNode;

	}

	//向后合并
	while (1) {
		PAGE_ID next = spanNode->_pageId - 1;
		//找不到跳出循环
		auto ret = _idSpanNodeMap.find(next);
		if (ret == _idSpanNodeMap.end()) {
			break;
		}
		//prevSpanNode正在被使用跳出循环
		SpanNode* nextSpanNode = ret->second;
		if (nextSpanNode->_isUse == true) {
			break;
		}

		//合并出超出128kb的spanNode不进行管理
		if (nextSpanNode->_n + spanNode->_n > NPAGES - 1) {
			break;
		}

		//进行合并
		spanNode->_n += nextSpanNode->_n;
		_pageLists[nextSpanNode->_n].Erase(nextSpanNode);
		delete nextSpanNode;
	}

	//放回哈希桶中
	_pageLists[spanNode->_n].PushFront(spanNode);
	spanNode->_isUse = false;

	存储ISpanNode起始页号映射方便回收
	_idSpanNodeMap[spanNode->_pageId] = spanNode;
	_idSpanNodeMap[spanNode->_pageId + spanNode->_n - 1] = spanNode;
}

6、性能分析


参考了一段别人的测试代码,进行测试: 测试代码链接

运行结果,由下图可见我们当前实现的内存池效率和malloc比起来差了许多。
   

这个时候不要去盲目改代码,我们可以在VS中找到性能分析来分析我们的程序看看是什么原因导致的,如下图:

 

 由此我们可以看出,我们大部分时间都浪费在锁上面了而且是map映射的时候,那么有什么办法呢?

其实我们除了上面的锁浪费时间外,还有程序中的new我们可以替换成一个定长内存池(因为其效率比new要高一些,大家可以自行测试一下)。

<1>定长内存池实现

malloc其实就是一个通用的内存池,在什么场景下都可以使用,但这也意味着malloc在什么场景下都不会有很高的性能,因为malloc并不是针对某种场景专门设计的。定长内存池就是针对固定大小内存块的申请和释放的内存池,由于定长内存池只需要支持固定大小内存块的申请和释放,因此我们性能方面要比malloc高一些,并且在实现定长内存池时不需要考虑内存碎片等问题,因为我们申请/释放的都是固定大小的内存块。

代码如下:

template<class T>
class FiedMemoryPool {
private:
	char* _memory = nullptr;   //指向大块内存的指针
	void* _freeList = nullptr;//还内存链接自由链表头指针
	size_t _residueBytes = 0;  //剩余字节数
public:
	T* New() {
		T* obj = nullptr;
		//优先把还回来的对象重复利用
		if (_freeList) {
			void* next = *((void**)_freeList);
			obj = (T*)_freeList;
			_freeList = next;
		}
		else {
			//剩余内存不够一个对象大小时重新开辟内存
			if (_residueBytes < sizeof(T)) {
				_residueBytes = 128 * 1024;
				_memory = (char*)SystemAlloc(_residueBytes >> PAGE_SHIFT);
				if (_memory == nullptr) {
					throw std::bad_alloc();
				}
			}
			//给目标分配内存
			obj = (T*)_memory;
			size_t objSize = sizeof(T) < sizeof(void*) ? sizeof(void*) : sizeof(T); //至少开辟一个指针大小
			_memory += objSize;
			_residueBytes -= objSize;
		}
		//用定位new显示调用其构造函数
		new (obj)T;

		return obj;
	}
	void Delete(T* obj) {
		//显示调用其析构函数
		obj->~T();

		//头插进自由链表
		*(void**)obj = _freeList;
		_freeList = obj;
	}
};

有了定长内存池我们只需要将代码中new出来的对象替换成用定长内存池来申请就可以了(new 底层依然是调用malloc)由于这部分替换起来比较简单就不在详细讲述了,具体代码参考文章末尾项目源代码。 

 <2>基数树

基数树之所以能够提升效率是因为基数树在使用时是不用加锁的,基数树的空间一旦开辟好了就不会发生变化,因此无论什么时候去读取某个页的映射,都是对应在一个固定的位置进行读取的。并且我们不会同时对同一个页进行读取映射和建立映射的操作,因为我们只有在释放对象时才需要读取映射。

 基数树代码链接

有了基数树我们只需要将PageCache中unorder_map定义的对象用基数树来定义就可以了,具体代码实现参考文章末尾源代码实现。

使用上面两种方法对项目进行优化后在进行测试,如图:

由上图很明显可以看出,此时我们实现的内存池在并发环境下效率更胜一筹。 


 7、项目源码

项目源码

项目到这里就算完成了,感觉有用的话期待大家点赞关注,项目中有哪些地方有疑问的话欢迎大家评论留言。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/338382.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

更换主板开机logo

更换主板开机logo前言详细操作步骤可能遇到的问题素材链接前言 在使用刀锋钛主板后发现&#xff0c;开机logo有些不符合个人喜好&#xff0c;如下图&#xff1a; 于是就有了更换主板logo的想法&#xff0c;确定用刷bios这一方法&#xff0c;注&#xff1a;刷BIOS之前一定要做…

MS14-064(OLE远程代码执行漏洞复现)

✅作者简介&#xff1a;CSDN内容合伙人、信息安全专业在校大学生&#x1f3c6; &#x1f525;系列专栏 &#xff1a;内网安全-漏洞复现 &#x1f4c3;新人博主 &#xff1a;欢迎点赞收藏关注&#xff0c;会回访&#xff01; &#x1f4ac;舞台再大&#xff0c;你不上台&#xf…

Java测试——selenium常见操作(2)

这篇博客继续讲解一些selenium的常见操作 selenium的下载与准备工作请看之前的博客&#xff1a;Java测试——selenium的安装与使用教程 先创建驱动 ChromeDriver driver new ChromeDriver();等待操作 我们上一篇博客讲到&#xff0c;有些时候代码执行过快&#xff0c;页面…

Axios异步请求 json格式

Axios是Ajax的一个框架,简化Ajax操作。需要axios.min.js 和vue.js的jar。发送普通参数异步请求以及相应异常情况客户端向服务器端异步发送普通参数值&#xff1a;- 基本格式&#xff1a; axios().then().catch()- 示例&#xff1a;axios({ // axios表示要发送一个异步请求metho…

12月无情被辞:想给还不会自动化测试的技术人提个醒

公司前段时间缺人&#xff0c;也面了不少测试&#xff0c;结果竟没有一个合适的。一开始瞄准的就是中级的水准&#xff0c;也没指望来大牛&#xff0c;提供的薪资在10-20k&#xff0c;面试的人很多&#xff0c;但是平均水平很让人失望。基本能用一句话概括就是&#xff1a;3年测…

火遍全网的ChatGPT,可免费使用啦

啰嗦几句最近最最最火爆的莫过于ChatGPT了&#xff0c;感觉你不知道ChatGPT是什么做什么&#xff0c;你都没法跟人交流了&#xff01;ChatGPT是美国OpenAI研发的聊天机器人程序&#xff0c;跟小冰、小爱、小度一样&#xff0c;但是不一样的是它拥有强大的信息整合能力&#xff…

【性能】性能测试理论篇_学习笔记_2023/2/11

性能测试的目的验证系统是否能满足用户提出的性能指标发现性能瓶颈&#xff0c;优化系统整体性能性能测试的分类注&#xff1a;这些测试类型其实是密切相关&#xff0c;甚至无法区别的&#xff0c;例如几乎所有的测试都有并发测试。在实际中不用纠结具体的概念。而是要明确测试…

子比主题v6.9.2 免费版源码下载及其激活步骤详解

本人版权所有&#xff0c;请勿打回&#xff01; 文章目录一&#xff0c;子比主题v6.9.2 免费版源码下载及其激活步骤1.1什么是Zibll子比主题&#xff1f;1.2特点二.效果展示2.1 部分源码2.2 效果展示三.源码下载及其视频演示3.1源码下载3.2视频演示一&#xff0c;子比主题v6.9.…

Golang map笔记

map定义三种方式package mainimport "fmt"func main() {// map 的基本定义// 第一种方式 使用make分配数据空间var map1 map[string]stringmap1 make(map[string]string, 3)map1["no1"] "北京"map1["no2"] "天津"map1[&q…

Mysql 增删改查(二)—— 增(insert)、删(delete)、改(update)

目录 一、插入 1、insert 2、replace&#xff08;插入否则更新&#xff09; 二、更新&#xff08;update&#xff09; 三、删除 1、delete 2、truncate&#xff08;截断表&#xff0c;慎用&#xff09; 一、插入 1、insert (1) 单行 / 多行插入 全列插入&#xff1a;…

可能是最强的Python可视化神器,建议一试!

数据分析离不开数据可视化&#xff0c;我们最常用的就是Pandas&#xff0c;Matplotlib&#xff0c;Pyecharts当然还有Tableau&#xff0c;看到一篇文章介绍Plotly制图后我也跃跃欲试&#xff0c;查看了相关资料开始尝试用它制图。 1.Plotly Plotly是一款用来做数据分析和可视…

毕业四年换了3份软件测试工作,我为何仍焦虑?

​今天一看日历&#xff1a;2023.2.11 &#xff0c;才突然意识到自己毕业已经四年了。四年时间里一直在测试行业摸爬滚打&#xff0c;现在是时候记录一下了。 下面我来分享下我这4年软件测试经验及成长历程&#xff0c;或许能帮助你解决很多工作中的迷惑。 01、我是如何开始做…

libevent 实现httpserver 终极版C/C++

最近要用C实现哥httpserver,之前探索了很多个http的库。 1. 我之前最习惯用httplib-cpp github.comhttps://github.com/yhirose/cpp-httplib 但是它要求gcc-g版本要大于4.8。然后我用了6.1.0之后&#xff0c;我的其他库比如mysql glog之后怎么都链接不上。换了系统&a…

Https 协议超强讲解(二)

浏览器是如何确保 CA 证书的合法性&#xff1f; 1. 证书包含什么信息&#xff1f; 颁发机构信息 公钥 公司信息 域名 有效期 指纹 …… 2. 证书的合法性依据是什么&#xff1f; 首先&#xff0c;权威机构是要有认证的&#xff0c;不是随便一个机构都有资格颁发证书&am…

【MySQL】第十六部分 MySQL数据类型详解

【MySQL】第十六部分 MySQL数据类型详解 文章目录【MySQL】第十六部分 MySQL数据类型详解16. MySQL数据类型详解16.1 整数类型16.2 浮点类型16.3 定点数类型16.4 位类型 BIT16.5 日期和时间类型16.6 文本字符串类型16.6.1 CHAR VS VARCHAR类型16.6.2 TEXT类型16.6.3 ENUM类型16…

文件管理(9)

文件管理 0 引言 为什么要引入文件系统&#xff1f; 信息管理的需要&#xff1a;用户面前提供一种规格化的机制&#xff0c;方便用户对文件的存取、提高效率。操作系统本身需要–操作系统本身也不是常驻内存的&#xff0c;也有大量的信息需要存于外存。 1 文件定义 文件&a…

Python语言零基础入门教程(十四)

Python 日期和时间 Python 程序能用很多方式处理日期和时间&#xff0c;转换日期格式是一个常见的功能。 Python 提供了一个 time 和 calendar 模块可以用于格式化日期和时间。 时间间隔是以秒为单位的浮点小数。 每个时间戳都以自从1970年1月1日午夜&#xff08;历元&…

Linux上面配置Apache2支持Https(ssl)具体方案实现

虽然Nginx比较流行&#xff0c;但是由于一些老项目用到了Apache2来支持Web服务&#xff0c;最近想给服务上一个Https支持&#xff0c;虽然看似教程简单&#xff0c;但是也遇到一些特殊情况&#xff0c;经历了一番折腾也算是解决了所有问题&#xff0c;将过程记录如下。演示是基…

亿级高并发电商项目-- 实战篇 --万达商城项目 三(通用模块、商品服务模块、后台API模块、IDEA忽略文件显示等开发工作

专栏&#xff1a;高并发项目 &#x1f44f;作者简介&#xff1a;大家好&#xff0c;我是小童&#xff0c;Java开发工程师&#xff0c;CSDN博客博主&#xff0c;Java领域新星创作者 &#x1f4d5;系列专栏&#xff1a;前端、Java、Java中间件大全、微信小程序、微信支付、若依框…

4.SpringWeb

一、创建项目LomBok:辅助开发工具&#xff0c;减少代码编写Spring Web:带上Spring MVC,可以做Web开发了Thymleaf: Web开发末班引擎&#xff08;不常用&#xff09;创建好&#xff0c;如下&#xff1a;static/ 放置静态资源的根目录templates/ 放置模板文件的根目录 二、资源配置…