TCmalloc (google开源项目核心部分模拟实现)

news2024/11/15 17:26:13

1什么是内存池

1.1池化技术

所谓“池化技术”,就是程序先向系统申请过量的资源,然后自己管理,以备不时之需。之所以要申请过 量的资源,是因为每次申请该资源都有较大的开销,不如提前申请好了,这样使用时就会变得非常快 捷,大大提高程序运行效率。 在计算机中,有很多使用“池”这种技术的地方,除了内存池,还有连接池、线程池、对象池等。以服务 器上的线程池为例,它的主要思想是:先启动若干数量的线程,让它们处于睡眠状态,当接收到客户端 的请求时,唤醒池中某个睡眠的线程,让它来处理客户端的请求,当处理完这个请求,线程又进入睡眠 状态。

1.2内存池

内存池是指程序预先从操作系统申请一块足够大内存,此后,当程序中需要申请内存的时候,不是直接 向操作系统申请,而是直接从内存池中获取;同理,当程序释放内存的时候,并不真正将内存返回给操 作系统,而是返回内存池。当程序退出(或者特定时间)时,内存池才将之前申请的内存真正释放。减少用户层向系统层的内存申请调用,实现高效;

2.开胃菜-设计一个定长内存池

此次设计的定长内存池ObjectPool结构如下:(其单位小内存块的大小为T类型的大小)

添加图片注释,不超过 140 字(可选)

  • 需要申请内存时,一次性申请一个大块空间,记录起始位置_memory;

  • 需要使用size大小时,将大块内存抽象切割成小块,将起始位置_memory向后移动size大小;

  • 释放内存时,将需要释放的空间挂入自由链表_freeList,可供下次申请使用;注意:这个链表不是通过内部指针连接下一个,而是通过前一块空间的前4or8个字节记录后一个小空间的起始地址抽象连接的;

这样做的目的是循环利用预先开辟的一大块空间,减少用户层申请内存时与系统层的交互,提高效率;

代码实现:

ObjectPool.h

#pragma once
#define _CRT_SECURE_NO_WARNINGS 1
#include<iostream>
using namespace std;


#ifdef _WIN32
#include<windows.h>
#else
// 包Linux相关头文件,增加代码的可移植性;
#endif

// 直接去堆上按页申请空间 摆脱malloc;
inline static void* SystemAlloc(size_t kpage)
{
#ifdef _WIN32
	void* ptr = VirtualAlloc(0, kpage << 13, MEM_COMMIT | MEM_RESERVE, PAGE_READWRITE);
#else
	// linux下brk mmap等
#endif

	if (ptr == nullptr)
		throw std::bad_alloc();

	return ptr;
}
	

template<class T>
class ObjectPool
{
	char* _memory = nullptr; // 指向大块内存的指针
	size_t _remainBytes = 0; // 大块内存在切分过程中剩余字节数
	void* _freeList = nullptr; // 还回来过程中链接的自由链表的头指针

public:
	T* New()
	{
		T* obj = nullptr;
		//如果之前有申请的空间被释放,那就先从free了的空间拿;(内存池技术)
		if (_freeList != nullptr)	
		{
			//头删
            //注意!*((void**)_freeList)相当访取_freeList前4or8个字节操作(由32位系统->指针4byte and 64位系统->指针8byte决定);
			void* next = *((void**)_freeList);
			obj = (T*)_freeList;
			_freeList = next;
		}
		else
		{
			if (_remainBytes < sizeof(T))
			{	
				// 剩余内存不够一个对象大小时,则重新开大块空间
				_remainBytes = 128 * 1024;
				//_memory = (char*)malloc(_remainBytes);
                //使用SystemAlloc直接向堆申请内存,脱离malloc,方便体现malloc和该ObjectPool的差别;
				_memory = (char*)SystemAlloc(_remainBytes >> 13);//char*类型内存可以使其+1or-1的单位操作为1字节,方便内存管理;

				if (_memory == nullptr)
				{
					throw std::bad_alloc();
				}
				
			}
			//防止某个T类还没当前系统下一个指针大小大 那么就装不下后一个的地址了,这里做特殊处理;至少保证一个对象内足够装的下一个指针大小;
			int Objsize = sizeof(T) < sizeof(void*) ? sizeof(void*): sizeof(T);

			obj = (T*)_memory;
			_memory += Objsize;
			_remainBytes -= Objsize;

		}
		// 定位new,内置类型不处理,自定义类型调用其构造函数;
		new(obj)T;
		return obj;
	}

	void Delete(T* obj)
	{
        //内置类型不处理,自定义类型调用其构析构函数;
		obj->~T();
		//头插(此处不是真正的删除,而是标志为未使用,挂入自由链表)
		*(void**)obj = _freeList;
		_freeList = obj;
	}
}; 

ObjectPool.cpp测试

#include"ObjectPool.h"
#include<vector>
#include "ConcurrentAlloc.h"

//用于测试的类型树节点;
struct TreeNode
{
	int _val;
	TreeNode* _left;
	TreeNode* _right;

	TreeNode()
		:_val(0)
		, _left(nullptr)
		, _right(nullptr)
	{}
};
void TestObjectPool()
{
	// 申请释放的轮次
	const size_t Rounds = 5;

	// 每轮申请释放多少次
	const size_t N = 100000;

	std::vector<TreeNode*> v1;
	v1.reserve(N);

	size_t begin1 = clock();
	for (size_t j = 0; j < Rounds; ++j)
	{
		for (int i = 0; i < N; ++i)
		{
			v1.push_back(new TreeNode);
		}
		for (int i = 0; i < N; ++i)
		{
			delete v1[i];
		}
		v1.clear();
	}

	size_t end1 = clock();

	std::vector<TreeNode*> v2;
	v2.reserve(N);

	ObjectPool<TreeNode> TNPool;
	size_t begin2 = clock();
	for (size_t j = 0; j < Rounds; ++j)
	{
		for (int i = 0; i < N; ++i)
		{
			v2.push_back(TNPool.New());
		}
		for (int i = 0; i < N; ++i)
		{
			TNPool.Delete(v2[i]);
		}
		v2.clear();
	}
	size_t end2 = clock();

	cout << "new cost time:" << end1 - begin1 << endl;
	cout << "object pool cost time:" << end2 - begin2 << endl;
}
int mian()
{
    TestObjectPool();
    return 0;
}

运行结果:

显然,定长内存池的New空间比new(底层封装的是malloc申请空间)更高效

相关视频推荐

面对内存再不发怵,手把手带你实现内存池(自行准备linux环境)

200行代码实现slab,开启内存池的内存管理

linux内存架构,numa的优势,slab的实现,vmalloc的原理

免费学习地址:c/c++ linux服务器开发/后台架构师

需要C/C++ Linux服务器架构师学习资料加qun812855908获取(资料包括C/C++,Linux,golang技术,Nginx,ZeroMQ,MySQL,Redis,fastdfs,MongoDB,ZK,流媒体,CDN,P2P,K8S,Docker,TCP/IP,协程,DPDK,ffmpeg等),免费分享

3.TCmalloc(高并发内存池)整体框架介绍:

谷歌开源项目:TCMalloc (google-perftools) 是用于优化C++写的多线程应用,比glibc 2.3的malloc快。这个模块可以用来让MySQL在高并发下内存占用更加稳定。

ThreadCache

thread cache:线程缓存是每个线程独有的,用于小于256KB的内存的分配(设计规定),线程从这里申请内存不需要加锁,每个线程独享一个cache,这也就是这个并发线程池高效的地方。核心结构是FreeList _freeLists[NFREELIST];eg:_freeLists[2]代表该size对应哈希桶中有n个未使用的obj size大小的小内存块,当需要申请时优先从_freeList中拿无人使用的被挂起的obj空间;

CentralCache

central cache:中心缓存是所有线程所共享,thread cache是按需从central cache中获取的对 象。central cache合适的时机回收thread cache中的对象,避免一个线程占用了太多的内存,而 其他线程的内存吃紧,达到内存分配在多个线程中更均衡的按需调度的目的。central cache是存在竞争的,所以从这里取内存对象是需要加锁,首先这里用的是桶锁,其次只有thread cache的 没有内存对象时才会找central cache,所以这里竞争不会很激烈。其核心结构是SpanList _spanLists[NFREELIST];eg:_spanLists[2]代表该size对应链桶中的n个span,每个span下面挂了n个大小为size的obj小内存块;(size大小是指通过测试和计算,将所需内存大小x字节依据某种对齐规则对其为size字节,使span的链式桶结构数量合适,大小统一)

page cache

page cache:页缓存是在central cache缓存上面的一层缓存,存储的内存是以页为单位存储及分 配的,central cache没有内存对象时,从page cache分配出一定数量的page,并切割成定长大小 的小块内存,分配给central cache。当一个span的几个跨度页的对象都回收以后,page cache 会回收central cache满足条件的span对象,并且合并相邻的页,组成更大的页,缓解内存碎片 的问题。Page中的核心结构是SpanList _spanLists[NPAGES];eg:_spanLists[2]代表该链桶含有n个大小为2页(2*8字节)的span;如果从page[1]到page[128]都没有可用span,那么只能从系统堆直接申请一块大空间放进去,再给central thread层用了;

本文TCmalloc整体项目围绕上述三层结构实现,抽取原项目的小部分核心内容学习;这个项目会用到C/C++、数据结构(链表、哈希桶)、操作系统内存管理、单例模式、多线程、互斥锁 等等方面的知识。

4.代码实现:

Common.h

这是一个通用头文件,包含各种所需要的库文件和三层结构需要的对其数和size-申请数量转换的函数;

#pragma once

#include <iostream>
#include <vector>
#include <unordered_map>
#include <map>
#include <algorithm>

#include <time.h>
#include <assert.h>

#include <thread>
#include <mutex>
#include <atomic>

using std::cout;
using std::endl;

#ifdef _WIN32
	#include <windows.h>
#else
	// ...
#endif

static const size_t MAX_BYTES = 256 * 1024;//ThreadCache中允许申请的最大字节;
static const size_t NFREELIST = 208;//通过对齐计算的thread和central中最大哈希桶下表;
static const size_t NPAGES = 129;//最多128页,为了方便映射哈希,从第1页开始计算;
static const size_t PAGE_SHIFT = 13;//一页==8kb==2^13字节大小  所以 p地址>>PAGE_SHIFT ==PAGE_ID 可相当于将某连续地址以2^13字节对齐,并标上页号方便管理;eg:0x00000000~0x2^13这连续的2^13个地址 经过>> 为PAGE_ID==1,后连续的2^13个地址 经过>> 为PAGE_ID==2;

//多系统编程;
#ifdef _WIN64
	typedef unsigned long long PAGE_ID;
#elif _WIN32
	typedef size_t PAGE_ID;
#else
	// linux
#endif

// 直接去堆上按页申请空间
inline static void* SystemAlloc(size_t kpage)
{
#ifdef _WIN32
	void* ptr = VirtualAlloc(0, kpage << 13, MEM_COMMIT | MEM_RESERVE, PAGE_READWRITE);
#else
	// linux下brk mmap等
#endif

	if (ptr == nullptr)
		throw std::bad_alloc();

	return ptr;
}


inline static void SystemFree(void* ptr)
{
#ifdef _WIN32
	VirtualFree(ptr, 0, MEM_RELEASE);
#else
	// sbrk unmmap等
#endif
}

//NextObj(obj) 等价于 *(void**)obj,取obj前4or8个字节(存放下一个小空间地址的位置)进行操作,增加语义;
static void*& NextObj(void* obj)
{
	return *(void**)obj;
}


// 管理切分好的小对象的自由链表
class FreeList
{
public:
	void Push(void* obj)
	{
		assert(obj);

		// 头插
		//*(void**)obj = _freeList;
		NextObj(obj) = _freeList;
		_freeList = obj;

		++_size;
	}

    //从central中申请一批obj大小的内存块,range插入;
	void PushRange(void* start, void* end, size_t n)
	{
		NextObj(end) = _freeList;
		_freeList = start;
		_size += n;
	}

    ///threadcache还一段list给central cache
	void PopRange(void*& start, void*& end, size_t n)//n==list.size
	{
		assert(n <= _size);
		start = _freeList;
		end = start;

		for (size_t i = 0; i < n - 1; ++i)
		{
			end = NextObj(end);
		}

		_freeList = NextObj(end);
		NextObj(end) = nullptr;
		_size -= n;
	}

	void* Pop()
	{
		assert(_freeList);

		// 头删
		void* obj = _freeList;
		_freeList = NextObj(obj);
		--_size;

		return obj;
	}

	bool Empty()
	{
		return _freeList == nullptr;
	}

	size_t& MaxSize()
	{
		return _maxSize;
	}

	size_t Size()
	{
		return _size;
	}

private:
	void* _freeList = nullptr;
	size_t _maxSize = 1;//慢开始算法--从1启动;
	size_t _size = 0;//当前_freeList桶里的obj小空间个数
};

// 计算对象大小的对齐映射规则 方便控制哈希桶用的数量(不要太多) 便于管理obj小内存空间
class SizeClass
{
public:
	// 整体控制在最多10%左右的内碎片浪费的对齐规则;
	// [1,128]					8byte对齐	    freelist[0,16)       eg:1~128字节的obj   按照8byte对齐(eg:1~7字节的对象都放入_freeList[0]) 	 											  			      则1~128字节的obj  需要的桶index为0~16即可;
	// [128+1,1024]				16byte对齐	    freelist[16,72)		                 
	// [1024+1,8*1024]			128byte对齐	    freelist[72,128)
	// [8*1024+1,64*1024]		1024byte对齐     freelist[128,184)
	// [64*1024+1,256*1024]		8*1024byte对齐   freelist[184,208)

	/*size_t _RoundUp(size_t size, size_t alignNum)
	{
		size_t alignSize;
		if (size % alignNum != 0)
		{
			alignSize = (size / alignNum + 1)*alignNum;
		}
		else
		{
			alignSize = size;
		}

		return alignSize;
	}*/
	// 1-8 
    //位运算提高效率
	static inline size_t _RoundUp(size_t bytes, size_t alignNum)
	{
		return ((bytes + alignNum - 1) & ~(alignNum - 1));
	}
	//通过size大小计算对其数函数;
	static inline size_t RoundUp(size_t size)
	{
		if (size <= 128)
		{
			return _RoundUp(size, 8);
		}
		else if (size <= 1024)
		{
			return _RoundUp(size, 16);
		}
		else if (size <= 8*1024)
		{
			return _RoundUp(size, 128);
		}
		else if (size <= 64*1024)
		{
			return _RoundUp(size, 1024);
		}
		else if (size <= 256 * 1024)
		{
			return _RoundUp(size, 8*1024);
		}
		else
		{
			return _RoundUp(size, 1<<PAGE_SHIFT);
		}
	}

	/*size_t _Index(size_t bytes, size_t alignNum)
	{
	if (bytes % alignNum == 0)
	{
	return bytes / alignNum - 1;
	}
	else
	{
	return bytes / alignNum;
	}
	}*/

	
    //位运算提高效率
	static inline size_t _Index(size_t bytes, size_t align_shift)
	{
		return ((bytes + (1 << align_shift) - 1) >> align_shift) - 1;
	}

	// 通过size计算index位置即映射到哪一个自由链表桶
	static inline size_t Index(size_t bytes)
	{
		assert(bytes <= MAX_BYTES);

		// 每个区间有多少个链
		static int group_array[4] = { 16, 56, 56, 56 };
		if (bytes <= 128){
			return _Index(bytes, 3);//3表示按8byte对其
		}
		else if (bytes <= 1024){
			return _Index(bytes - 128, 4) + group_array[0];//4表示按16byte对其, - 128因为前128个字节按照别的对齐规则的,剩下的这些按照自己的对其数对其最后+前面总共的桶数量即可计算自己的index;
		}
		else if (bytes <= 8 * 1024){
			return _Index(bytes - 1024, 7) + group_array[1] + group_array[0];
		}
		else if (bytes <= 64 * 1024){
			return _Index(bytes - 8 * 1024, 10) + group_array[2] + group_array[1] + group_array[0];
		}
		else if (bytes <= 256 * 1024){
			return _Index(bytes - 64 * 1024, 13) + group_array[3] + group_array[2] + group_array[1] + group_array[0];
		}
		else{
			assert(false);
		}

		return -1;
	}

	// 一次thread cache从中心缓存获取多少个,池化技术:当thread cache没有可用obj空间时,会向中心缓存申请一批而不是一个;
	static size_t NumMoveSize(size_t size)
	{
		assert(size > 0);

		// [2, 512],一次批量移动多少个对象的(慢启动)上限值
		// 小对象一次批量上限高
		// 小对象一次批量上限低
		int num = MAX_BYTES / size;
		if (num < 2)
			num = 2;

		if (num > 512)
			num = 512;

		return num;
	}

	// 计算一次向系统获取几个页;当centralCache对应index无可用span时,向pagecache按页大小申请,之后再把申请的span切割成n个index大小的obj内存块;
	// 单个对象 8byte
	// ...
	// 单个对象 256KB
	static size_t NumMovePage(size_t size)
	{
		size_t num = NumMoveSize(size);
		size_t npage = num*size;

		npage >>= PAGE_SHIFT;
		if (npage == 0)
			npage = 1;

		return npage;
	}
};

// 管理多个连续页大块内存跨度结构
struct Span
{
	PAGE_ID _pageId = 0; // 大块内存起始页的页号;将申请的viod*通过>>PAGE_SHIFT(8K),映射成数字方便管理和挂桶;
	size_t  _n = 0;      // 页的数量

	Span* _next = nullptr;	// 双向链表的结构
	Span* _prev = nullptr;

	size_t _objSize = 0;  // 切好的小对象的大小
	size_t _useCount = 0; // 切好小块内存,被分配给thread cache的计数,方便回收span,_useCount==0;
	void* _freeList = nullptr;  // 切好的小块内存的自由链表

	bool _isUse = false;          // 是否在被使用,方便合并span,减少内存碎片;
};

// 带头双向循环链表 封装Span节点
class SpanList
{
public:
	SpanList()
	{
		_head = new Span;
		_head->_next = _head;
		_head->_prev = _head;
	}
	//Begin()和End()模拟了带头双向循环链表;
	Span* Begin()
	{
		return _head->_next;
	}

	Span* End()
	{
		return _head;
	}

	bool Empty()
	{
		return _head->_next == _head;
	}

	void PushFront(Span* span)
	{
		Insert(Begin(), span);
	}

	Span* PopFront()
	{
		Span* front = _head->_next;
		Erase(front);
		return front;
	}
	//“双向链表实现一个Insert即可高效复用”
	void Insert(Span* pos, Span* newSpan)
	{
		assert(pos);
		assert(newSpan);

		Span* prev = pos->_prev;
		// prev newspan pos
		prev->_next = newSpan;
		newSpan->_prev = prev;
		newSpan->_next = pos;
		pos->_prev = newSpan;
	}

	void Erase(Span* pos)
	{
		assert(pos);
		assert(pos != _head);

		Span* prev = pos->_prev;
		Span* next = pos->_next;

		prev->_next = next;
		next->_prev = prev;
	}

private:
	Span* _head;
public:
	std::mutex _mtx; // 桶锁 可能多个threadcache同时访问central中的同一个index桶,加锁-线程安全
};

ThreadCache.h

thread cache是哈希桶结构,每个桶是一个按桶位置映射大小的内存块对象的自由链表。每个线程都会 有一个thread cache对象,这样每个线程在这里获取对象和释放对象时是无锁的。

申请内存:

  1. 当内存申请size<=256KB,先获取到线程本地存储的thread cache对象,计算size映射的哈希桶自 由链表下标i。

  2. 如果自由链表_freeLists[i]中有对象,则直接Pop一个内存对象返回。

  3. 如果_freeLists[i]中没有对象时,则批量从central cache中获取一定数量的对象,插入到自由链表 并返回一个对象。

释放内存:

1. 当释放内存小于256k时将内存释放回thread cache,计算size映射自由链表桶位置i,将对象Push 到_freeLists[i]。

2. 当链表的长度过长,则回收一部分内存对象到central cache。

// thread cache本质是由一个哈希映射的对象自由链表构成
class ThreadCache
{
public:
	// 申请和释放内存对象
	void* Allocate(size_t size);
	void Deallocate(void* ptr, size_t size);
	// 从中心缓存获取对象
	void* FetchFromCentralCache(size_t index, size_t size);
	// 释放对象时,链表过长时,回收内存回到中心缓存
	void ListTooLong(FreeList& list, size_t size);
private:
	FreeList _freeLists[NFREELIST];
};
// TLS thread local storage TLS技术,使每个线程里的ThreadCache*独享,互不影响,实现高并发;
static _declspec(thread) ThreadCache* pTLSThreadCache = nullptr;

// 管理切分好的小对象的自由链表
class FreeList
{
public:
	void Push(void* obj)
	{
		assert(obj);
		// 头插
		//*(void**)obj = _freeList;
		NextObj(obj) = _freeList;
		_freeList = obj;
		++_size;
	}
	void PushRange(void* start, void* end, size_t n)
	{
		NextObj(end) = _freeList;
		_freeList = start;
		_size += n;
	}
	void PopRange(void*& start, void*& end, size_t n)
	{
		assert(n >= _size);


		start = _freeList;
		end = start;
		for (size_t i = 0; i < n - 1; ++i)
		{
			end = NextObj(end);
		}
		_freeList = NextObj(end);
		NextObj(end) = nullptr;
		_size -= n;
	}
	void* Pop()
	{
		assert(_freeList);
		// 头删
		void* obj = _freeList;
		_freeList = NextObj(obj);
		--_size;
		return obj;
	}
	bool Empty()
	{
		return _freeList == nullptr;
	}
	size_t& MaxSize()
	{
		return _maxSize;
	}
	size_t Size()
	{
		return _size;
	}
private:
	void* _freeList = nullptr;
	size_t _maxSize = 1;
	size_t _size = 0;
};

ThreadCache.cpp

#include "ThreadCache.h"
#include "CentralCache.h"

//从thradcache 从 central中要内存
void* ThreadCache::FetchFromCentralCache(size_t index, size_t size)
{
	// 慢开始反馈调节算法
	// 1、最开始不会一次向central cache一次批量要太多,因为要太多了可能用不完
	// 2、如果你不要这个size大小内存需求,那么batchNum就会不断增长,直到上限
	// 3、size越大,一次向central cache要的batchNum就越小
	// 4、size越小,一次向central cache要的batchNum就越大
	size_t batchNum = min(_freeLists[index].MaxSize(), SizeClass::NumMoveSize(size));
	if (_freeLists[index].MaxSize() == batchNum)
	{
		_freeLists[index].MaxSize() += 1;
	}
	//输出型参数 将batchNum个obj小空间从centralcache中带出来;
	void* start = nullptr;
	void* end = nullptr;
	size_t actualNum = CentralCache::GetInstance()->FetchRangeObj(start, end, batchNum, size);
	assert(actualNum > 0);

	if (actualNum == 1)//就申请到一个obj空间 直接返回给申请的人用
	{
		assert(start == end);
		return start;
	}
	else//申请了多个obj空间 只返回第一个 则 剩下的插入_freeList留着后面备用;
	{
		_freeLists[index].PushRange(NextObj(start), end, actualNum-1);
		return start;
	}
}

//多线程申请
void* ThreadCache::Allocate(size_t size)
{
	assert(size <= MAX_BYTES);
	size_t alignSize = SizeClass::RoundUp(size);//计算对齐数 相当于需要obj的size
	size_t index = SizeClass::Index(size);//计算哈希桶index

	if (!_freeLists[index].Empty())//还有剩余直接pop拿走一个
	{
		return _freeLists[index].Pop();
	}
	else
	{
		return FetchFromCentralCache(index, alignSize);//没了 从central中要
	}
}

//多线程释放
void ThreadCache::Deallocate(void* ptr, size_t size)
{
	assert(ptr);
	assert(size <= MAX_BYTES);

	// 找对映射的自由链表桶,对象插入进入
	size_t index = SizeClass::Index(size);
	_freeLists[index].Push(ptr);

    //释放完了检查下freelist里的obj,看看需不需要回收到centralcache
	// 设定:当链表长度大于一次批量申请的内存时就开始还一段list给central cache
	if (_freeLists[index].Size() >= _freeLists[index].MaxSize())
	{
		ListTooLong(_freeLists[index], size);
	}
}
//一段list给central cache
void ThreadCache::ListTooLong(FreeList& list, size_t size)
{
    //输出型参数;
	void* start = nullptr;
	void* end = nullptr;
    //先从_freeList中弹出
	list.PopRange(start, end, list.MaxSize());
    //再收回到central中对应index的Span链表中
	CentralCache::GetInstance()->ReleaseListToSpans(start, size);
}

CentralCache.h

central cache也是一个哈希桶结构,他的哈希桶的映射关系跟thread cache是一样的。不同的是他的每 个哈希桶位置挂是SpanList链表结构,不过每个映射桶下面的span中的大内存块被按映射关系切成了一 个个小内存块对象挂在span的自由链表中。

申请内存:

1.当thread cache中没有内存时,就会批量向central cache申请一些内存对象,这里的批量获取对 象的数量使用了类似网络tcp协议拥塞控制的慢开始算法;central cache也有一个哈希映射的 spanlist,spanlist中挂着span,从span中取出对象给thread cache,这个过程是需要加锁的,不 过这里使用的是一个桶锁,尽可能提高效率。

2.central cache映射的spanlist中所有span的都没有内存以后,则需要向page cache申请一个新的 span对象,拿到span以后将span管理的内存按大小切好作为自由链表链接到一起。然后从span 中取对象给thread cache。

3.central cache的中挂的span中use_count记录分配了多少个对象出去,分配一个对象给thread cache,就++use_count。

释放内存:

1.当thread_cache过长或者线程销毁,则会将内存释放回central cache中的,释放回来时-- use_count。当use_count减到0时则表示所有对象都回到了span,则将span释放回page cache, page cache中会对前后相邻的空闲页进行合并。

#pragma once
#include "Common.h"

// 单例模式(饿汉模式)全局就一个CentralCache;
class CentralCache
{
public:
	static CentralCache* GetInstance()
	{
		return &_sInst;
	}

	// 获取一个非空的span
	Span* GetOneSpan(SpanList& list, size_t byte_size);

	// 从中心缓存获取一定数量的对象给thread cache
	size_t FetchRangeObj(void*& start, void*& end, size_t batchNum, size_t size);

	// 将一定数量的obj释放到span跨度
	void ReleaseListToSpans(void* start, size_t byte_size);
private:
	SpanList _spanLists[NFREELIST];

private:
	CentralCache()
	{}

	CentralCache(const CentralCache&) = delete;

	static CentralCache _sInst;
};

CentralCache.cpp

#include "CentralCache.h"
//#include "PageCache.h"

CentralCache CentralCache::_sInst;

// 获取一个非空的span
Span* CentralCache::GetOneSpan(SpanList& list, size_t size)//参数list是某一个确定的index桶
{
	// 查看当前的spanlist中是否有还有未分配对象的span
	Span* it = list.Begin();
	while (it != list.End())
	{
		if (it->_freeList != nullptr)
		{
			return it;//这里不需要改it的span中的属性,因为等最后分给threadcache了obj以后 才算其中的内存被分出去了 里面的usecount等才需要改;
		}
		else
		{
			it = it->_next;
		}
	}
	// 走到这里说没有空闲span了,只能找page cache要
	// 先把central cache的桶锁解掉,这样如果其他线程释放内存对象回来,不会阻塞(你要从page申请了 不能别的线程在这个桶释放)
	list._mtx.unlock();

	
	PageCache::GetInstance()->_pageMtx.lock();//整个pagecache结构可能会从index1~index129挨个操作每个桶 因此需要上大锁;
	Span* span = PageCache::GetInstance()->NewSpan(SizeClass::NumMovePage(size));
	span->_isUse = true;//分跟central的span标记已使用因为马上就要切分给obj用了
	span->_objSize = size;//每个span挂的固定小内存块obj大小size
	PageCache::GetInstance()->_pageMtx.unlock();

	// 对获取span进行切分,不需要加锁,因为这会其他线程访问不到这个span
	// 计算span的大块内存的起始地址和大块内存的大小(字节数)
	char* start = (char*)(span->_pageId << PAGE_SHIFT);//这里的_pageId是从底层按页申请内存的时候地址转换来的,现在需要用地址就转换回去;
	size_t bytes = span->_n << PAGE_SHIFT;
	char* end = start + bytes;

	// 把大块内存切成自由链表链接起来
	// 1、先切一块下来去做头,方便尾插(尾插原因,切出来一般是连续的,那么尾插给到span上挂小obj也是连续,提高内存命中率)
	span->_freeList = start;
	start += size;
	void* tail = span->_freeList;
	int i = 1;

	while (start < end)
	{
		++i;
		NextObj(tail) = start;
		tail = NextObj(tail); // tail = start;
		start += size;
	}

	NextObj(tail) = nullptr;

	// 切好span以后,需要把span挂到桶里面去的时候,再加锁
	list._mtx.lock();
	list.PushFront(span);
	return span;
}

// 从中心缓存获取一定数量的对象给thread cache
size_t CentralCache::FetchRangeObj(void*& start, void*& end, size_t batchNum, size_t size)
{
	size_t index = SizeClass::Index(size);
	_spanLists[index]._mtx.lock();//上桶锁

	Span* span = GetOneSpan(_spanLists[index],size);
	assert(span);
	assert(span->_freeList);
	// 从span中获取batchNum个对象
	// 如果不够batchNum个,有多少拿多少
	start = span->_freeList;
	end = start;
	int n = 1;//n为实际拿到的数量,start直接给了所以起始值为1;
	for (int i = 0; i < batchNum - 1; i++)
	{
		if (NextObj(end) == nullptr) break;
		end = NextObj(end);
		++n;
	}
	//span被切出去给obj使用了  span的一些属性得改变了;
	span->_useCount += n;
	span->_freeList = NextObj(end);
	NextObj(end) = nullptr;
	_spanLists[index]._mtx.unlock();
	return n;
	
}

void CentralCache::ReleaseListToSpans(void* start, size_t size)
{
	size_t index = SizeClass::Index(size);
	_spanLists[index]._mtx.lock();
	while (start)
	{
		void* next = NextObj(start);

		Span* span = PageCache::GetInstance()->MapObjectToSpan(start);
		//小obj头插入span中的_freeList
		NextObj(start) = span->_freeList;
		span->_freeList = start;
		span->_useCount--;

		// 说明span的切分出去的所有小块内存都回来了
		// 这个span就可以再回收给page cache,pagecache可以再尝试去做前后页的合并
		if (span->_useCount == 0)
		{
			_spanLists[index].Erase(span);
			span->_freeList = nullptr;
			span->_next = nullptr;
			span->_prev = nullptr;

			// 释放span给page cache时,使用page cache的锁就可以了
			// 这时把桶锁解掉 不影响其他线程对该index的central操作;
			_spanLists[index]._mtx.unlock();

			PageCache::GetInstance()->_pageMtx.lock();
			PageCache::GetInstance()->ReleaseSpanToPageCache(span);//还给page和page是否需要合并其中的span减少内存碎片都在这函数里
			PageCache::GetInstance()->_pageMtx.unlock();

			_spanLists[index]._mtx.lock();
		}

		start = next;
	}

	_spanLists[index]._mtx.unlock();
}

PageMap.h

使用自己定义的PageMap哈希直接映射,避免stl的线程安全,提高效率;

#pragma once
#include"Common.h"

// Single-level array
template <int BITS>
class TCMalloc_PageMap1 {
private:
	static const int LENGTH = 1 << BITS;
	void** array_;

public:
	typedef uintptr_t Number;

	//explicit TCMalloc_PageMap1(void* (*allocator)(size_t)) {
	explicit TCMalloc_PageMap1() {
		//array_ = reinterpret_cast<void**>((*allocator)(sizeof(void*) << BITS));
		size_t size = sizeof(void*) << BITS;
		size_t alignSize = SizeClass::_RoundUp(size, 1 << PAGE_SHIFT);
		array_ = (void**)SystemAlloc(alignSize >> PAGE_SHIFT);
		memset(array_, 0, sizeof(void*) << BITS);
	}

	// Return the current value for KEY.  Returns NULL if not yet set,
	// or if k is out of range.
	void* get(Number k) const {
		if ((k >> BITS) > 0) {
			return NULL;
		}
		return array_[k];
	}

	// REQUIRES "k" is in range "[0,2^BITS-1]".
	// REQUIRES "k" has been ensured before.
	//
	// Sets the value 'v' for key 'k'.
	void set(Number k, void* v) {
		array_[k] = v;
	}
};

// Two-level radix tree
template <int BITS>
class TCMalloc_PageMap2 {
private:
	// Put 32 entries in the root and (2^BITS)/32 entries in each leaf.
	static const int ROOT_BITS = 5;
	static const int ROOT_LENGTH = 1 << ROOT_BITS;

	static const int LEAF_BITS = BITS - ROOT_BITS;
	static const int LEAF_LENGTH = 1 << LEAF_BITS;

	// Leaf node
	struct Leaf {
		void* values[LEAF_LENGTH];
	};

	Leaf* root_[ROOT_LENGTH];             // Pointers to 32 child nodes
	void* (*allocator_)(size_t);          // Memory allocator

public:
	typedef uintptr_t Number;

	//explicit TCMalloc_PageMap2(void* (*allocator)(size_t)) {
	explicit TCMalloc_PageMap2() {
		//allocator_ = allocator;
		memset(root_, 0, sizeof(root_));

		PreallocateMoreMemory();
	}

	void* get(Number k) const {
		const Number i1 = k >> LEAF_BITS;
		const Number i2 = k & (LEAF_LENGTH - 1);
		if ((k >> BITS) > 0 || root_[i1] == NULL) {
			return NULL;
		}
		return root_[i1]->values[i2];
	}

	void set(Number k, void* v) {
		const Number i1 = k >> LEAF_BITS;
		const Number i2 = k & (LEAF_LENGTH - 1);
		ASSERT(i1 < ROOT_LENGTH);
		root_[i1]->values[i2] = v;
	}

	bool Ensure(Number start, size_t n) {
		for (Number key = start; key <= start + n - 1;) {
			const Number i1 = key >> LEAF_BITS;

			// Check for overflow
			if (i1 >= ROOT_LENGTH)
				return false;

			// Make 2nd level node if necessary
			if (root_[i1] == NULL) {
				//Leaf* leaf = reinterpret_cast<Leaf*>((*allocator_)(sizeof(Leaf)));
				//if (leaf == NULL) return false;
				static ObjectPool<Leaf>	leafPool;
				Leaf* leaf = (Leaf*)leafPool.New();

				memset(leaf, 0, sizeof(*leaf));
				root_[i1] = leaf;
			}

			// Advance key past whatever is covered by this leaf node
			key = ((key >> LEAF_BITS) + 1) << LEAF_BITS;
		}
		return true;
	}

	void PreallocateMoreMemory() {
		// Allocate enough to keep track of all possible pages
		Ensure(0, 1 << BITS);
	}
};

// Three-level radix tree
template <int BITS>
class TCMalloc_PageMap3 {
private:
	// How many bits should we consume at each interior level
	static const int INTERIOR_BITS = (BITS + 2) / 3; // Round-up
	static const int INTERIOR_LENGTH = 1 << INTERIOR_BITS;

	// How many bits should we consume at leaf level
	static const int LEAF_BITS = BITS - 2 * INTERIOR_BITS;
	static const int LEAF_LENGTH = 1 << LEAF_BITS;

	// Interior node
	struct Node {
		Node* ptrs[INTERIOR_LENGTH];
	};

	// Leaf node
	struct Leaf {
		void* values[LEAF_LENGTH];
	};

	Node* root_;                          // Root of radix tree
	void* (*allocator_)(size_t);          // Memory allocator

	Node* NewNode() {
		Node* result = reinterpret_cast<Node*>((*allocator_)(sizeof(Node)));
		if (result != NULL) {
			memset(result, 0, sizeof(*result));
		}
		return result;
	}

public:
	typedef uintptr_t Number;

	explicit TCMalloc_PageMap3(void* (*allocator)(size_t)) {
		allocator_ = allocator;
		root_ = NewNode();
	}

	void* get(Number k) const {
		const Number i1 = k >> (LEAF_BITS + INTERIOR_BITS);
		const Number i2 = (k >> LEAF_BITS) & (INTERIOR_LENGTH - 1);
		const Number i3 = k & (LEAF_LENGTH - 1);
		if ((k >> BITS) > 0 ||
			root_->ptrs[i1] == NULL || root_->ptrs[i1]->ptrs[i2] == NULL) {
			return NULL;
		}
		return reinterpret_cast<Leaf*>(root_->ptrs[i1]->ptrs[i2])->values[i3];
	}

	void set(Number k, void* v) {
		ASSERT(k >> BITS == 0);
		const Number i1 = k >> (LEAF_BITS + INTERIOR_BITS);
		const Number i2 = (k >> LEAF_BITS) & (INTERIOR_LENGTH - 1);
		const Number i3 = k & (LEAF_LENGTH - 1);
		reinterpret_cast<Leaf*>(root_->ptrs[i1]->ptrs[i2])->values[i3] = v;
	}

	bool Ensure(Number start, size_t n) {
		for (Number key = start; key <= start + n - 1;) {
			const Number i1 = key >> (LEAF_BITS + INTERIOR_BITS);
			const Number i2 = (key >> LEAF_BITS) & (INTERIOR_LENGTH - 1);

			// Check for overflow
			if (i1 >= INTERIOR_LENGTH || i2 >= INTERIOR_LENGTH)
				return false;

			// Make 2nd level node if necessary
			if (root_->ptrs[i1] == NULL) {
				Node* n = NewNode();
				if (n == NULL) return false;
				root_->ptrs[i1] = n;
			}

			// Make leaf node if necessary
			if (root_->ptrs[i1]->ptrs[i2] == NULL) {
				Leaf* leaf = reinterpret_cast<Leaf*>((*allocator_)(sizeof(Leaf)));
				if (leaf == NULL) return false;
				memset(leaf, 0, sizeof(*leaf));
				root_->ptrs[i1]->ptrs[i2] = reinterpret_cast<Node*>(leaf);
			}

			// Advance key past whatever is covered by this leaf node
			key = ((key >> LEAF_BITS) + 1) << LEAF_BITS;
		}
		return true;
	}

	void PreallocateMoreMemory() {
	}
};

PageCache.h

申请内存:

  1. 当central cache向page cache申请内存时,page cache先检查对应位置有没有span,如果没有 则向更大页寻找一个span,如果找到则分裂成两个。比如:申请的是4页page,4页page后面没 有挂span,则向后面寻找更大的span,假设在10页page位置找到一个span,则将10页page span分裂为一个4页page span和一个6页page span。

  2. 如果找到_spanList[128]都没有合适的span,则向系统使用mmap、brk或者是VirtualAlloc等方式 申请128页page span挂在自由链表中,再重复1中的过程。 3

  3. 需要注意的是central cache和page cache 的核心结构都是spanlist的哈希桶,但是他们是有本质 区别的,central cache中哈希桶,是按跟thread cache一样的大小对齐关系映射的,他的spanlist 中挂的span中的内存都被按映射关系切好链接成小块内存的自由链表。而page cache 中的 spanlist则是按下标桶号映射的,也就是说第i号桶中挂的span都是i页内存。

释放内存:

如果central cache释放回一个span,则依次寻找span的前后page id的没有在使用的空闲span ,看是否可以合并,如果合并继续向前寻找。这样就可以将切小的内存合并收缩成大的span,减少内存碎片。

#pragma once

#include "Common.h"
#include "ObjectPool.h"
#include "PageMap.h"

class PageCache
{
public:
	static PageCache* GetInstance()
	{
		return &_sInst;
	}

	// 获取从对象到span的映射
	Span* MapObjectToSpan(void* obj);

	// 释放空闲span回到Pagecache,并合并相邻的span
	void ReleaseSpanToPageCache(Span* span);

	// 获取一个K页的span
	Span* NewSpan(size_t k);

	std::mutex _pageMtx;
private:
	SpanList _spanLists[NPAGES];
	ObjectPool<Span> _spanPool;//这里用上了之前编写的ObjectPool定长内存池  用来New(span)脱离malloc

	//std::unordered_map<PAGE_ID, Span*> _idSpanMap;
	//std::map<PAGE_ID, Span*> _idSpanMap;
	TCMalloc_PageMap1<32 - PAGE_SHIFT> _idSpanMap;

	PageCache()
	{}
	PageCache(const PageCache&) = delete;


	static PageCache _sInst;
};

PageCache.cpp

#include "PageCache.h"

PageCache PageCache::_sInst;

// 获取一个K页的span
Span* PageCache::NewSpan(size_t k)
{
	assert(k > 0);

	// 大于128 page的直接向堆申请
	if (k > NPAGES - 1)
	{
		void* ptr = SystemAlloc(k);
		//Span* span = new Span;
		
		Span* span = _spanPool.New();

		span->_pageId = (PAGE_ID)ptr >> PAGE_SHIFT;
		span->_n = k;

		//_idSpanMap[span->_pageId] = span;
		_idSpanMap.set(span->_pageId, span);

		return span;
	}

	// 先检查第k个桶里面有没有span
	if (!_spanLists[k].Empty())
	{
		Span* kSpan = _spanLists[k].PopFront();

		// 建立id和span的映射,方便central cache回收小块内存时,查找对应的span
		for (PAGE_ID i = 0; i < kSpan->_n; ++i)
		{
			//_idSpanMap[kSpan->_pageId + i] = kSpan;
			_idSpanMap.set(kSpan->_pageId + i, kSpan);
		}
		return kSpan;
	}

	// 检查一下后面的桶里面有没有span,如果有可以把他它进行切分
	for (size_t i = k + 1; i < NPAGES; ++i)
	{
		if (!_spanLists[i].Empty())
		{
			Span* nSpan = _spanLists[i].PopFront();//nSpan代表切割以后剩下的span,他还是未使用的,还在pagecache中是连续的,所以映射首尾即可
			//Span* kSpan = new Span;
			Span* kSpan = _spanPool.New();//kSpan代表从某个大内存n+k Span中切出来的kSpan,他要给到central之后进行obj切割,进而往thread给;

			// 在nSpan的头部切一个k页下来
			// k页span返回
			// nSpan再挂到对应映射的位置
			kSpan->_pageId = nSpan->_pageId;
			kSpan->_n = k;

			nSpan->_pageId += k;
			nSpan->_n -= k;

			_spanLists[nSpan->_n].PushFront(nSpan);

			
			// 存储nSpan的首尾页号跟nSpan映射,方便page cache回收内存时进行的合并查找
		
			//_idSpanMap[nSpan->_pageId] = nSpan;
			//_idSpanMap[nSpan->_pageId + nSpan->_n - 1] = nSpan;
			_idSpanMap.set(nSpan->_pageId, nSpan);
			_idSpanMap.set(nSpan->_pageId + nSpan->_n - 1, nSpan);

			// 建立id和kSpan的映射,方便central cache回收小块内存时,查找对应的span
			for (PAGE_ID i = 0; i < kSpan->_n; ++i)
			{
				//_idSpanMap[kSpan->_pageId + i] = kSpan;
				_idSpanMap.set(kSpan->_pageId + i, kSpan);
			}

			return kSpan;
		}
	}

	// 走到这个位置就说明后面没有大页的span了
	// 这时就去找堆要一个128页的span
	//Span* bigSpan = new Span;
	Span* bigSpan = _spanPool.New();
	void* ptr = SystemAlloc(NPAGES - 1);
	bigSpan->_pageId = (PAGE_ID)ptr >> PAGE_SHIFT;
	bigSpan->_n = NPAGES - 1;

	_spanLists[bigSpan->_n].PushFront(bigSpan);

	return NewSpan(k);
}

//将一个obj小内存块它对应的Span*找到(通过转换成page_id再map出Span*)
Span* PageCache::MapObjectToSpan(void* obj)
{
	PAGE_ID id = ((PAGE_ID)obj >> PAGE_SHIFT);

	/*std::unique_lock<std::mutex> lock(_pageMtx);
	auto ret = _idSpanMap.find(id);
	if (ret != _idSpanMap.end())
	{	
		return ret->second;
	}
	else
	{
		assert(false);
		return nullptr;
	}*/
	auto ret = (Span*)_idSpanMap.get(id);
	assert(ret != nullptr);
	return ret;
}

//从central向page归还span大块内存(这个Span的 usecount==0了即分出去的小obj都还回来了),归还Span并尝试合并!
void PageCache::ReleaseSpanToPageCache(Span* span)
{
	// 大于128 page的直接还给堆
	if (span->_n > NPAGES - 1)
	{
		void* ptr = (void*)(span->_pageId << PAGE_SHIFT);
		SystemFree(ptr);
		//delete span;
		_spanPool.Delete(span);

		return;
	}

	// 对span前后的页,尝试进行合并,缓解内存碎片问题
	while (1)
	{
		PAGE_ID prevId = span->_pageId - 1;
		
		// 前面的页号没有,不合并了
		auto ret = (Span*)_idSpanMap.get(prevId);
		if (ret == nullptr)
		{
			break;
		}

		// 前面相邻页的span在使用,不合并了
		Span* prevSpan = ret;
		if (prevSpan->_isUse == true)
		{
			break;
		}

		// 合并出超过128页的span没办法管理,不合并了
		if (prevSpan->_n + span->_n > NPAGES - 1)
		{
			break;
		}

		span->_pageId = prevSpan->_pageId;
		span->_n += prevSpan->_n;

		_spanLists[prevSpan->_n].Erase(prevSpan);
		//delete prevSpan;
		_spanPool.Delete(prevSpan);
	}

	// 向后合并
	while (1)
	{
		PAGE_ID nextId = span->_pageId + span->_n;
		/*auto ret = _idSpanMap.find(nextId);
		if (ret == _idSpanMap.end())
		{
			break;
		}*/

		auto ret = (Span*)_idSpanMap.get(nextId);
		if (ret == nullptr)
		{
			break;
		}

		Span* nextSpan = ret;
		if (nextSpan->_isUse == true)
		{
			break;
		}

		if (nextSpan->_n + span->_n > NPAGES - 1)
		{
			break;
		}

		span->_n += nextSpan->_n;

		_spanLists[nextSpan->_n].Erase(nextSpan);
		//delete nextSpan;
		_spanPool.Delete(nextSpan);
	}
	//合并的大内存块 插回Pagecache中 设置为未使用(未分到central中)状态;
	_spanLists[span->_n].PushFront(span);
	span->_isUse = false;
	//_idSpanMap[span->_pageId] = span;
	//_idSpanMap[span->_pageId+span->_n-1] = span;

	//记录前后页号方便下次合并;
	_idSpanMap.set(span->_pageId, span);
	_idSpanMap.set(span->_pageId + span->_n - 1, span);
}

ConcurrentAlloc.h

#pragma once

#include "Common.h"
#include "ThreadCache.h"
#include "PageCache.h"
#include "ObjectPool.h"

//tcmallo
static void* ConcurrentAlloc(size_t size)
{
	//一次申请内存大于thread最大的256kb,则直接向page层按页直接申请,不需要经过Thread层;
	if (size > MAX_BYTES)
	{
		size_t alignSize = SizeClass::RoundUp(size);
		size_t kpage = alignSize >> PAGE_SHIFT;

		PageCache::GetInstance()->_pageMtx.lock();
		Span* span = PageCache::GetInstance()->NewSpan(kpage);
		span->_objSize = size;//这个span就是为了_objsize单位内存而申请的
		PageCache::GetInstance()->_pageMtx.unlock();

		void* ptr = (void*)(span->_pageId << PAGE_SHIFT);
		return ptr;
	}
	else
	{
		// 通过TLS 每个线程无锁的获取自己的专属的ThreadCache对象
		if (pTLSThreadCache == nullptr)
		{
			static ObjectPool<ThreadCache> tcPool;//只需要一个tcpool,用于调用New,所以设置成静态;
			//pTLSThreadCache = new ThreadCache;
			pTLSThreadCache = tcPool.New();
		}

		return pTLSThreadCache->Allocate(size);
	}
}

//tcfree
static void ConcurrentFree(void* ptr)
{
	Span* span = PageCache::GetInstance()->MapObjectToSpan(ptr);
	size_t size = span->_objSize;

	if (size > MAX_BYTES)//证明这个Span直接是按页从page申请的_objsize>MAX_SIZE,没经过thread,那么直接ReleaseSpanToPageCache
	{
		PageCache::GetInstance()->_pageMtx.lock();
		PageCache::GetInstance()->ReleaseSpanToPageCache(span);
		PageCache::GetInstance()->_pageMtx.unlock();
	}
	else
	{
		assert(pTLSThreadCache);
		pTLSThreadCache->Deallocate(ptr, size);
	}
}

5.测试文件:

Test.cpp

#include"ConcurrentAlloc.h"

// ntimes 一轮申请和释放内存的次数
// rounds 轮次
void BenchmarkMalloc(size_t ntimes, size_t nworks, size_t rounds)
{
	std::vector<std::thread> vthread(nworks);
	std::atomic<size_t> malloc_costtime = 0;
	std::atomic<size_t> free_costtime = 0;

	for (size_t k = 0; k < nworks; ++k)
	{
		vthread[k] = std::thread([&, k]() {
			std::vector<void*> v;
			v.reserve(ntimes);

			for (size_t j = 0; j < rounds; ++j)
			{
				size_t begin1 = clock();
				for (size_t i = 0; i < ntimes; i++)
				{
					//v.push_back(malloc(16));
					v.push_back(malloc((16 + i) % 8192 + 1));
				}
				size_t end1 = clock();

				size_t begin2 = clock();
				for (size_t i = 0; i < ntimes; i++)
				{
					free(v[i]);
				}
				size_t end2 = clock();
				v.clear();

				malloc_costtime += (end1 - begin1);
				free_costtime += (end2 - begin2);
			}
		});
	}

	for (auto& t : vthread)
	{
		t.join();
	}

	printf("%u个线程并发执行%u轮次,每轮次malloc %u次: 花费:%u ms\n",
		nworks, rounds, ntimes, malloc_costtime.load());

	printf("%u个线程并发执行%u轮次,每轮次free %u次: 花费:%u ms\n",
		nworks, rounds, ntimes, free_costtime.load());

	printf("%u个线程并发malloc&free %u次,总计花费:%u ms\n",
		nworks, nworks*rounds*ntimes, malloc_costtime.load() + free_costtime.load());
}


// 单轮次申请释放次数 线程数 轮次
void BenchmarkConcurrentMalloc(size_t ntimes, size_t nworks, size_t rounds)
{
	std::vector<std::thread> vthread(nworks);
	std::atomic<size_t> malloc_costtime = 0;
	std::atomic<size_t> free_costtime = 0;

	for (size_t k = 0; k < nworks; ++k)
	{
		vthread[k] = std::thread([&]() {
			std::vector<void*> v;
			v.reserve(ntimes);

			for (size_t j = 0; j < rounds; ++j)
			{
				size_t begin1 = clock();
				for (size_t i = 0; i < ntimes; i++)
				{
					//v.push_back(ConcurrentAlloc(16));
					v.push_back(ConcurrentAlloc((16 + i) % 8192 + 1));
				}
				size_t end1 = clock();

				size_t begin2 = clock();
				for (size_t i = 0; i < ntimes; i++)
				{
					ConcurrentFree(v[i]);
				}
				size_t end2 = clock();
				v.clear();

				malloc_costtime += (end1 - begin1);
				free_costtime += (end2 - begin2);
			}
		});
	}

	for (auto& t : vthread)
	{
		t.join();
	}

	printf("%u个线程并发执行%u轮次,每轮次concurrent alloc %u次: 花费:%u ms\n",
		nworks, rounds, ntimes, malloc_costtime.load());

	printf("%u个线程并发执行%u轮次,每轮次concurrent dealloc %u次: 花费:%u ms\n",
		nworks, rounds, ntimes, free_costtime.load());

	printf("%u个线程并发concurrent alloc&dealloc %u次,总计花费:%u ms\n",
		nworks, nworks*rounds*ntimes.load(), malloc_costtime + free_costtime.load());
}

int main()
{
	size_t n = 10000;
	cout << "==========================================================" << endl;
	BenchmarkConcurrentMalloc(n, 4, 10);
	cout << endl << endl;

	BenchmarkMalloc(n, 4, 10);
	cout << "==========================================================" << endl;

	return 0;
}

malloc与tcmallo测试对比结果

总结

在整体框架编写完毕后,进行测试对比并不能体现tcmalloc比malloc效率高,如下图;但是在tcmalloc进行 map(Span*,PAGE_ID),这个映射采用基数树的优化后效率大大提高,超过了malloc。

不高效存在性能瓶颈的原因

根据测试,map的find越慢,会导致其lock()越慢,因此map中的find占了整体的15%,加锁解锁的过程占据了程序运行的20%times,这就是未优化的性能瓶颈;优化前Span*和page_id的映射使用stl标准库中的unorder_map,是非线程安全的,需要加锁保证线程安全; 优化了后使用PAGE_MAP 设计原理是直接开满page_id所有范围的空间,采取直接映射;这样即便是多线程也不需要加锁;

不加锁的原因

1.两个线程对map中不同的位置读写:因为两个线程在对基数树map一个读,一个写的时候,相应空间早都开好了,不会改变结构,因此读写互不影响,一个在自己的地方读,一个在自己的地方写,而STL容器中map红黑树写入会旋转,unorder_map哈希写入也涉及增容,结构有可能会改变,因此两个位置的读写有可能互相影响导致线程不安全;

2.两个线程对map中相同的位置写:因为写只会在NewSpan和ReleaseSpanToPageCache中,一个是central没有span了从pagecache申请,一个是central 把没使用的usecount==0的span大块内存还给paghecache,因为page只有有span和无span两种状态这两个函数不可能在两个线程中同时执行

3.两个线程对map中相同的位置读:因为读只会在ConcurrentFree和ReleaseListToSpans中,对于同一个位置index,肯定是先ConcurrentFree(void*obj),之后objfree到一定的量(_freeLists[index].Size() >= _freeLists[index].MaxSize(慢开始的maxsize))时,再调用ReleaseListToSpans将freelist[index]从threadcache还给centralcache的,也不会两个线程同时发生;

  • stl的map中的find会依次遍历O(n),而直接映射的时间效率是O(1),相当于以部分空间换取时间的一种举措;

优化了stl中map.find()慢的问题同时不需要加锁了,两个主要问题在牺牲点空间的情况下完美解决,因此性能瓶颈得到优化,tcmalloc高效与malloc;

项目源代码Github链接

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/1125939.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

橙河网络:国外问卷调查赚钱的项目可靠吗?

国外问卷调查项目是可靠的&#xff0c;是一个长期稳定的互联网项目。 大家好&#xff0c;我是橙河网络&#xff0c;今天聊一聊国外问卷调查赚钱的项目可靠吗&#xff1f; 在海外地区&#xff0c;很多公司和机构&#xff0c;它们为了收集一些关于产品和服务的消费者意见&#…

读懂Json文件[妈妈再也不用担心我不读懂了]

第一次见到它&#xff0c;我特别害怕&#xff0c; 很大一个文件&#xff0c;还是二进制的&#xff0c;是什么意思呢&#xff1f;该文件是深度学习模型训练完成之后由模型的输出数据保存的结果数据&#xff1b; 在线JSON编译器 JSON Editor Online 把文件传上去&#xff01; …

React-native-camera 在小米手机上拍照查看闪退

场景&#xff1a;为实现可拍照和录像的相机用react-native-camera这个库手写一个相机&#xff0c;发现了拍出来的图片在小米10上查看闪退 根据手机后台捕获的错误信息是什么玩意太大了&#xff08;之前还以为是图片显示组件的问题&#xff09; 改进&#xff1a;相机吊起的时候…

unity中使用protobuf工具将proto文件转为C#实体脚本

unity中使用protobuf工具将proto文件转为C#实体脚本 介绍优点缺点Protobuf 为什么比 XML 快得多&#xff1f;Protobuf的EncodingProtobuf封解包的过程通常编写一个Google Protocol Buffer应用需要以下几步&#xff1a; Protostuff是什么Protobuf工具总结 介绍 protobuf也就是G…

第三章Maven依赖的特性-基础篇

文章目录 Maven依赖核心配置文件pom.xml依赖的范围概念compile 和 test 对比compile 和 provided 对比结论 测试验证 compile 范围对 main 目录有效验证test范围对main目录无效验证test和provided范围不参与服务器部署验证provided范围对测试程序有效 依赖的传递概念传递的原则…

计算机体系结构实验三-缓存一致性

最近在看CMU15-418&#xff0c;LECTURE10-11就是讲两个缓存一致性协议&#xff0c;刚好这部分内容在实验中学习过&#xff0c;在实验中监听式协议提到的是MSI协议(modified&#xff0c;shared&#xff0c;invalid)&#xff0c;lecture中还讲了优化的版本MESI(应该是实际使用的协…

pytorch 入门 (四)案例二:人脸表情识别-VGG16实现

实战教案二&#xff1a;人脸表情识别-VGG16实现 本文为&#x1f517;小白入门Pytorch内部限免文章 参考本文所写记录性文章&#xff0c;请在文章开头注明以下内容&#xff0c;复制粘贴即可 &#x1f368; 本文为&#x1f517;小白入门Pytorch中的学习记录博客&#x1f366; 参…

CSMM软件能力成熟度评估

CSMM认证&#xff0c;又称为“软件能力成熟度评估”&#xff0c;也有地方称为“CSMM软件能力成熟度模型评估国家标准认证”&#xff0c;也被民间喊作“中国版CMMI认证”。该标准于2021年6月8日发布&#xff0c;是我国自主标准&#xff0c;适合中国国情以及中国软件企业的特点。…

2023腾讯云服务器价格表(轻量/CVM/免费/GPU)

腾讯云服务器租用价格表&#xff0c;轻量应用服务器、云服务器CVM、免费云服务器申请和GPU云服务器配置报价&#xff0c;轻量2核2G4M价格108元一年、2核4G6M带宽159元一年、4核8G10M优惠价425元一年、8核16G14M优惠价1396元一年、16核32G20M价格2775.99元一年&#xff0c;云服务…

Cesium冷知识:API中显示私有方法

在Cesium.js源码中&#xff0c; 某些类或方法的注释中含有private这个“私有”标识 会导致不会在API文档中显示 &#xff08;这是jsdoc的规范&#xff09; 解决方法&#xff1a; Ceisum.js 1.110.0版本的解决方案&#xff1a; 在gulpfile.js中的buildDocs方法中&#xff0…

vscode Coder Runner 运行C++

1. 设置Code Runner 2. 防止输入读不到&#xff0c;把在终端运行勾上。 3. 设置minw/bin的环境变量 安装mingw教程&#xff1a;https://blog.csdn.net/fancy_male/article/details/133992000 4. 见图

ArGIS Engine专题(15)之GP模型在地图服务与地图服务之间实现叠置分析

前一篇文章已经介绍过导入要素范围与地图服务的叠加分析,相当于单要素与多要素之间的分析,这篇文章介绍地图服务与地图服务之间的叠加分析,即是多要素有多要素之间的相交分析,功能基本类似。 一、结果预览 二、需求简介 以下是一些常见的业务场景: (1)空间规划和土地…

淘宝商品详情API接口(H5端和APP端),淘宝详情页,商品属性接口,商品信息查询

一、接口参数说明&#xff1a;提取淘宝商品详情页各项数据&#xff0c;包含skuid、价格、收藏数、加购数、月销售量、主图、标题、详情页图片等页面上可以看奥的数据都可以拿到。 点击获取key和secret 二、使用场景 1、商品销售情况分析&#xff0c;根据销量调整活动方案&am…

【linux系统】如何在服务器上安装Anaconda

文章目录 1. 安装Anconda1.1. 下载Anaconda安装包1.2. 安装Anaconda1.2.1. 点击回车&#xff08;Enter&#xff09;1.2.2. 添加环境变量1.2.3. 激活环境变量 1.3. 检查是否安装成功 2. Anaconda安装pytorch2.1. 创建虚拟环境2.2. 激活(进入)虚拟环境2.3. 安装pytorch 1. 安装An…

51单片机实现换能器超声波测水深

一&#xff0c;超声波换能器定义&#xff1a; 定义1&#xff1a;可把电能、机械能或声能从一种形式转换为另一种形式的能的装置。 所属学科&#xff1a;测绘学下的测绘仪器。 定义2&#xff1a;能量转换的器件。在水声领域中常把声呐换能器、水声换能器、电声换能器统称换能器。…

电脑出现vcomp140.dll错误,五种解决办法分享

电脑出现由于找不到 vcomp140.dll 无法继续执行代码的问题&#xff0c;通常是因为系统缺失了 Microsoft Visual C 2015 Redistributable 导致的。为了解决这个问题&#xff0c;您可以尝试以下 5 个解决方案&#xff1a; 方案1&#xff1a;使用dll修复工具&#xff1a;下载vcomp…

【图灵诸葛】jvm笔记

2023年10月23日14:04:44 jvm 1.jdk体系结构图回顾(Av333129672,P1) jdk jre 底层是hotspot jvm 2.java虚拟机内部组成(Av333129672,P2) 堆 方法区 执行引擎 类加载 本地方法栈 线程栈&#xff08;虚拟机栈&#xff09; 3.java虚拟机栈讲解(Av333129672,P3) 程序计数器&#xf…

Python自动化测试框架之unittest使用详解!

这篇文章主要介绍了Python接口自动化浅析unittest单元测试原理,文中描述了单元测试&#xff0c;unittest模块特性、大致流程、源码及实战例子这几个模块&#xff0c;有需要的朋友可以借鉴参考下 以下主要介绍unittest特性、运行流程及实际案例。 一、单元测试三连问 1、什么是…

二叉树的各类实现判断二叉树的递归套路

如何判断一颗二叉树是否是搜索二叉树&#xff1f; 搜索二叉树 每个子树头节点的左孩子比它小&#xff0c;右孩子比它大 经典的搜索二叉树没有重复的数 判断 将二叉树按照中序遍历&#xff0c;判断是否为升序 1、先将整棵树中序遍历再判断是否升序 //中序遍历public stat…

如何获取ABAP的程序事件顺序的调用堆栈

难道有激情总结下之前做过的事情&#xff0c;话不多说直接上图 重点在于此函数 CALL FUNCTION SYSTEM_CALLSTACK IMPORTING ET_CALLSTACK L_CSTACK_TAB. " internal table