从零实现高并发内存池

news2024/10/16 9:12:16

目录

  • 1. 项目介绍
    • 1.1 这个项目具体功能是什么?
    • 1.2 本项目的知识储备
  • 2. 什么是内存池
    • 2.1 池化技术
    • 2.2 内存池主要解决的问题
    • 2.3 malloc
  • 3. 定长内存池设计
  • 4. 高并发内存池整体框架设计
    • 4.1 Thread Cache的设计思路
    • 4.2 Central Cache的设计思路
    • 4.3 Page Cache的设计思路
  • 5. 项目具体实现
    • 5.1 上述三个模块公共类的实现
    • 5.2 定长内存池的实现
    • 5.3 Thread cache模块的实现
    • 5.4 TLS--thread local storage的实现
    • 5.5 Central cache模块的实现
    • 5.6 Page cache模块的实现
  • 6. 申请内存的流程解析
    • 6.1 申请内存和计算对齐大小和所属哈希桶详解
    • 6.2 慢反馈算法来获得要需要申请对象的数量
    • 6.3 从CentralCache中获得对象
    • 6.4 从PageCache中获取对象
    • 6.5 切分申请好的span
    • 6.6 从span中取batchNum个对象
  • 7. 释放内存的流程解析
    • 7.1 获取对象到span的映射
    • 7.2 将对象插入自由链表桶
    • 7.3 释放对象到CentralCache中
    • 7.4 释放对象到PageCache
  • 8. 代码测试
  • 9. 性能分析以及优化
    • 9.1 性能分析
    • 9.2 性能优化
  • 10. 项目总结

1. 项目介绍

1.1 这个项目具体功能是什么?

当前项目是实现⼀个高并发的内存池,他的原型是google的⼀个开源项目tcmalloc,tcmalloc全称 Thread-Caching Malloc,即线程缓存的malloc,实现了高效的多线程内存管理,用于替代系统的内 存分配相关的函数(malloc、free)。

本项目是把tcmalloc最核心的框架简化后拿出来,模拟实现出⼀个自己的高并发内存池,目的就是学习tcamlloc的精华。

1.2 本项目的知识储备

本项目会用到C/C++、数据结构(链表、哈希桶)、操作系统内存管理、单例模式、多线程、互斥锁 等等方面的知识。

2. 什么是内存池

2.1 池化技术

  1. 池化技术,就是程序先向系统申请过量的资源,然后自己管理,当程序中需要申请内存时,不是直接向操作系统申请,而是直接从内存池中获取,释放内存时也不是将内存返回给操作系统,而是返回内存池中。这就是内存池的主要功能。
  2. 因为每次申请该资源都有较大的开销,这样提前申请好了,使用时就会非常快捷,能够大大提高程序运行效率。
  3. 在计算机中有很多使用这种池技术的地方,例如线程池、连接池等。
  4. 以服务器上的线程池为例,它的主要思想是:先启动若⼲数量的线程,让它们处于睡眠状态,当接收到客户端的请求时,唤醒池中某个睡眠的线程,让它来处理客户端的请求,当处理完这个请求,线程又进入睡眠状态。

2.2 内存池主要解决的问题

内存池主要解决的肯定就是效率的问题,其次如果作为系统的内存分配器的角度,还需要解决⼀下内存碎片的问题。那么什么是内存碎片呢?

内存碎片分为外碎片和内碎片。外部碎片是⼀些空闲的连续内存区域太小,这些内存空间不连续,以至于合计的内存足够,但是不能满足⼀些的内存分配
申请需求。内部碎片是由于⼀些对齐的需求,导致分配出去的空间中⼀些内存无法被利用。

2.3 malloc

C++中动态申请内存都是通过malloc去申请的,但实际上我们并不是直接去堆中获取内存的,而malloc就是一个内存池。malloc() 相当于向系统 “批发” 了一块较大的内存空间,然后“零售” 给程序使用,当全部使用完或者程序有大量内存需求时,再根据需求向操作系统申请内存。

一文了解,Linux内存管理,malloc、free 实现原理

3. 定长内存池设计

(1)开辟内存:

  • 使用malloc开辟一大块内存,让_memory指针指向这个大块内存
  • _memory 设置为char* 类型,是为了方便切割时_memory向后移动多少字节数。

(2)申请内存:

  • 将_memory强转为对应类型,然后赋值给对方,_memory指针向后移动对应字节数即可。
  • 如果有存在已经切割好的小块内存,则优先使用小块内存。

(3)释放内存:

  • 用类型链表的结构来进行存储。
  • 用当前小块内存的头4字节存储下一个小块内存的地址,最后用_freeList指针指向第一个小块内存的地址(并不是将内存释放给操作系统)
  • 所以开辟内存时,开辟的内存大小必须大于或等于一个指针类型的大小。


(4)代码实现:

#pragma once
#include <iostream>
#include <vector>

using std::cout;
using std::endl;

template<class T>
class ObjectPool
{
public:
    T* New()
    {
        T* obj = nullptr;
        // 如果⾃由链表有对象,直接取⼀个
        if(_freeList)
        {
            obj = (T*)_freeList;
            _freeList = *((void**)_freeList);
        }
        else
        {
            if(_leftBytes < sizeof(T))
            {
                _leftBytes = 128 * 1024;
                _memory = (char*)malloc(_leftBytes);
                if(_memory == nullptr)
                {
                    throw std::bad_alloc();
                }
            }

            obj = (T*)_memory;
            //根据编译器确定指针类型大小
            size_t objsize = sizeof(T) < sizeof(void*) ? sizeof(void*) : sizeof(T);
            _memory += objsize;
            _memory -= objsize;
        }

        // 使⽤定位new调⽤T的构造函数初始化
        new(obj)T;
        return obj;
    }

    void Delete(T* obj)
    {
        obj->~T();

        // 头插到freeList
        *((void**)obj) = _freeList;
        _freeList = obj;
    }

private:
    char* _memory = nullptr;   // 指向内存块的指针
    void* _freeList = nullptr; // 管理还回来的头指针
    size_t _leftBytes = 0;     // 内存块中剩余字节大小

};

//以下是测试性能的代码
struct TreeNode
{
    int _val;
    TreeNode *_left;
    TreeNode *_right;
    TreeNode()
        : _val(0), _left(nullptr), _right(nullptr)
    {}
};

void TestObjectPool()
{
    // 申请释放的轮次
    const size_t Rounds = 3;
    // 每轮申请释放多少次
    const size_t N = 100000;
    size_t begin1 = clock();
    std::vector<TreeNode *> v1;
    v1.reserve(N);
    for (size_t j = 0; j < Rounds; ++j)
    {
        for (int i = 0; i < N; ++i)
        {
            v1.push_back(new TreeNode);
        }
        for (int i = 0; i < N; ++i)
        {
            delete v1[i];
        }
        v1.clear();
    }

    size_t end1 = clock();
    ObjectPool<TreeNode> TNPool;
    size_t begin2 = clock();
    std::vector<TreeNode *> v2;
    v2.reserve(N);
    for (size_t j = 0; j < Rounds; ++j)
    {
        for (int i = 0; i < N; ++i)
        {
            v2.push_back(TNPool.New());
        }
        for (int i = 0; i < 100000; ++i)
        {
            TNPool.Delete(v2[i]);
        }
        v2.clear();
    }

    size_t end2 = clock();
    cout << "new cost time:" << end1 - begin1 << endl;
    cout << "object pool cost time:" << end2 - begin2 << endl;
}

(5)多次运行结果:

(6)代码解析:

4. 高并发内存池整体框架设计

(1)现代很多的开发环境都是多核多线程,在申请内存的场景下,必然存在激烈的锁竞争问题。malloc 本身其实已经很优秀,那么我们项目的原型 tcmalloc 就是在多线程高并发的场景下更胜一筹,所以我们实现的内存池需要考虑以下几方面的问题。

  • 性能问题。
  • 内存碎片问题。
  • 多线程环境下,锁竞争问题。

(2)高并发内存池主要由如下3个部分组成:

  • Thread Cache线程缓存是每个线程独有的用于小于256KB的内存的分配,线程从这里申请内存不需要加锁,每个线程独享一个 Cache,这也就是这个并发线程池高效的地方。
  • Central Cache中心缓存是所有线程所共享,Thread Cache 是按需从 Central Cache 中获取的对象。Central Cache 合适的时机回收 Thread Cache 中的对象,避免一个线程占用了太多的内存,而其他线程的内存吃紧,达到内存分配在多个线程中更均衡的按需调度的目的。Central Cache是存在竞争的,所以从这里取内存对象是需要加锁,首先这里用的是桶锁,其次只有 Thread Cache 的没有内存对象时才会找 Central Cache,所以这里竞争不会很激烈。
  • Page Cache:页缓存是在 Central Cache缓存上面的一层缓存,存储的内存是以页为单位存储及分配的,Central Cache没有内存对象时,从 Page Cache 分配出一定数量的 page,并切割成定长大小的小块内存,分配给 Central Cache。当一个 span 的几个跨度页的对象都回收以后,Page Cache 会回收 Central Cache 满足条件的span对象,并且合并相邻的页,组成更大的页,缓解内存碎片的问题。

4.1 Thread Cache的设计思路

(1)Thread Cache 是哈希桶结构,每个桶是一个按桶位置映射对应内存块对象大小的 FreeList 自由链表。采用 TLS 技术使每个线程都有一个 Thread Cache 对象,这样每个线程在这里获取对象和释放对象时是无锁的。

(2)申请内存的过程:

  1. 当内存申请size<=256KB,先获取到线程本地存储的thread cache对象,计算size映射的哈希桶自由链表下标i。
  2. 如果自由链表_freeLists[i]中有对象,则直接Pop⼀个内存对象返回。
  3. 如果_freeLists[i]中没有对象时,则批量从central cache中获取⼀定数量的对象,插⼊到自由链表并返回⼀个对象。
  4. 线程申请内存时,会有不同规格的内存申请(4字节、5字节等),根据范围划定不同的自由链表,设计多个自由链表管理不同规格的内存小块。实质就相当于使用多个定长内存池的自由链表。

(3)内存对齐规则:

  • 每个内存小块采用向上对齐原则(可能会出现多申请内存的情况,这就是内碎片)例如:
    • 需要9字节,则开辟一个大小为2个8字节的空间的节点
    • 需要100字节,则开辟一个大小为13个8字节的空间的节点。
  • 整体控制在最多10%左右的内碎片浪费,总计设计208个桶:
申请的内存数对齐数哈希桶分区
[1, 128]8 bytefreelist[0, 16)
[128+1, 1024]16 bytefreelist[16, 72)
[1024+1, 8*1024]128 bytefreelist[72, 128)
[81024+1, 641024]1024 bytefreelist[128, 184)
[641024+1, 2561024]8192 bytefreelist[184, 208)

  • 注意:_freeLists是一个数组,每个元素都是自由链表类型(即存储自由链表的头结点)

(4)释放内存的过程:

  1. 当释放内存小于256k时将内存释放回thread cache,计算size映射自由链表桶位置i,将对象Push到_freeLists[i]。
  2. 具体方法是:用切分好的小块内存的前4字节或8字节来存储下一个小块内存的地址。插入节点时,采用头插的方式。
  3. 链表的长度过长,则回收⼀部分内存对象到central cache。

(6)TLS–thread local storage:

  • TLS:thread local storage 线程本地存储(linux和Windows下有各自的TLS)
  • TLS是一种变量的存储方式,这个变量在它所在的线程内是全局可访问的,但是不能被其他线程访问,这样就保证了线程的独立性。
    • 静态TLS使用方法:static __thread ThreadCache* pTLSThreadCache = nullptr;
    • 声明了一个 __thread 类型的变量,会为每一个线程创建一个单独的拷贝。

原理:

  • 在x86 CPU上,将为每次引用的静态TLS变量生成3个辅助机器指令
  • 如果在进程中创建子线程,那么系统将会捕获它并且自动分配一另一个内存块,以便存放新线程的静态TLS变量。

TLS–thread local storage文章:https://zhuanlan.zhihu.com/p/142418922

(5)Thread Cache代码整体框架:

#pragma once
#include "Common.hpp"
#include "CentralCache.hpp"

class ThreadCache
{
public:
    // 申请内存对象
	void* Allocate(size_t size);

    // 释放内存对象
	void Deallocate(void* ptr, size_t size);

	// 从中心缓存获取对象
	void* FetchFromCentralCache(size_t index, size_t size);
	
	// 当释放对象而链表过长时,将回收的内存还给central cache
	void ListTooLong(FreeList& list, size_t size);

private:
    FreeList _freelists[NFREELIST];

};

// TLS thread local storage
static __thread ThreadCache* pTLSThreadCache = nullptr;

4.2 Central Cache的设计思路

(1)Central cache也是一个哈希桶结构,每个哈希桶位置挂载的是SpanList自由链表结构。Span管理的是以页为单位的大块内存(一页为8kb(32位系统下))。每个Span中的大内存根据映射关系被切成了一个个的小块内存对象,然后挂载在Span上。因为中心缓存是所有线程共享的,只需要定义一个对象,所以这里需要将 Central cache 设计为单例模式(这里采用饿汉模式的设计方法)。

注意:

  • span是双向链表,而span下挂载的小块内存对象是单链表。
  • 中心缓存需要加桶锁。
  • _spanLists 是一个数组,数组中每个元素都是一个span自由链表的_head头指针。
  • 每个span又是一个单向自由链表。

(2)申请内存的过程:

  1. 当 Thread Cache 中没有内存时,就会批量向 Central Cache 申请一些内存对象,这里的批量获取对象的数量使用了类似网络 tcp 协议拥塞控制的慢开始算法;Central Cache 也有一个哈希映射的 SpanList,SpanList 中挂着 span,从span中取出对象给 Thread Cache,这个过程是需要加锁的,不过这里使用的是一个桶锁,尽可能提高效率。
  2. Central Cache 映射的 SpanList 中所有 span 的都没有内存以后,则需要向 Page Cache 申请一个新的 span 对象,拿到 span 以后将 span 管理的内存按大小切好作为自由链表链接到一起。然后从 span 中取对象给 Thread Cache。
  3. Central Cache 中挂的 span 中 use_count 记录分配了多少个对象出去,分配一个对象给Thread Cache,就 ++use_count。

(3)释放内存的过程:

  1. 当thread_cache过长或者线程销毁,则会将内存释放回central cache中的,释放回来时就- -use_count。
  2. 当use_count减到0时则表示所有对象都回到了span,则将span释放回page cache,page cache中会对前后相邻的空闲页进行合并。

(4)CentralCache 代码整体框架:

#pragma once
#include "Common.hpp"
#include "PageCache.hpp"

// 单例模式
class CentralCache
{
public:
    static CentralCache* GetInstance()
    {
        return &_sInst;
    }

    // 获取一个非空的span
	Span* GetOneSpan(SpanList& list, size_t size);

	// 从中心缓存获取一定数量的对象给thread cache
	size_t FetchRangeObj(void*& start, void*& end, size_t batchNum, size_t size);
	
	// 将⼀定数量的对象释放到span跨度
 	void ReleaseListToSpans(void* start, size_t byte_size);

private:
	CentralCache()
	{}

	CentralCache(const CentralCache&) = delete;

	static CentralCache _sInst;
	SpanList _spanLists[NFREELIST];

};

4.3 Page Cache的设计思路

(1)Page cache中也是哈希桶结构,但是每个节点存储的都是span。因为页缓存是所有线程共享的,只需要定义一个对象,所以这里将 page cache 设计为单例模式(这里采用饿汉模式的设计方法)。

注意:Page cache需要加整体锁(因为是所有线程共享的)。

(2)申请内存的过程:

  • 当central cache向page cache申请内存时,page cache先检查对应位置有没有span,如果没有则向更大页寻找⼀个span,如果找到则分裂成两个。比如:申请的是4页page,4页page后⾯没有挂 span,则向后⾯寻找更大的span,假设在10页page位置找到⼀个span,则将10页page span分裂 为⼀个4页page span和⼀个6页page span。
  • 如果找到_spanList[128]都没有合适的span,则向系统使用mmap、brk或者是VirtualAlloc等放式申请128页page span挂在自由链表中,再重复前面的过程。
  • 需要注意的是central cache和page cache 的核心结构都是spanlist的哈希桶,但是他们是有本质区别的,central cache中哈希桶,是按跟thread cache⼀样的大小对齐关系映射的,他的spanlist中挂的span中的内存都被按映射关系切好链接成小块内存的自由链表。而page cache 中的spanlist则是按下标桶号映射的,也就是说第i号桶中挂的span都是i页内存。

(3)释放内存的过程:

  • 如果central cache释放回⼀个span,则依次寻找span的前后page id的没有在使用的空闲span, 看是否可以合并,如果合并继续向前寻找。这样就可以将切小的内存合并收缩成大的span,减少内存碎片。

(4)Page Cache 代码整体框架:

#pragma once
#include "Common.hpp"

class PageCache
{
public:
	static PageCache* GetInstance()
	{
		return &_sInst;
	}

	// 获取一个K页的span
	Span* NewSpan(size_t k);
	
	// 获取从对象到span的映射
	Span* MapObjectToSpan(void* obj);
 
	// 释放空闲span回到给page cache,并合并相邻的span
	void ReleaseSpanToPageCache(Span* span);

public:
	std::mutex _pageMtx;

private:
	SpanList _spanLists[NPAGES];

	PageCache()
	{}

	PageCache(const PageCache&) = delete;
	std::unordered_map<PAGE_ID, Span*> _idSpanMap;
	static PageCache _sInst;

};

5. 项目具体实现

5.1 上述三个模块公共类的实现

(1)主要实现的功能:

  • 申请空间函数。
  • 管理切分好的小对象的自由链表。
  • Sizeclass类:功能是实现内存对齐以及计算映射到哪个自由链表桶。
  • Span类的实现。
  • SpanList类用来链接Span类。

(2)具体实现:

#pragma once
#include <iostream>
#include <vector>
#include <thread>
#include <mutex>
#include <ctime>
#include <cassert>
#include <unordered_map>
#include <stdio.h>
#include <stdlib.h>
#include <sys/mman.h>

using std::cout;
using std::endl;

static const size_t MAX_BYTES = 256 * 1024;
static const size_t NFREELIST = 208;
static const size_t NPAGES = 129;
static const size_t PAGE_SHIFT = 13;

typedef unsigned long long PAGE_ID;

// 直接去堆上按页申请空间
inline static void* SystemAlloc(size_t kpage)
{
#ifdef _WIN32
	void* ptr = VirtualAlloc(0, kpage << 13, MEM_COMMIT | MEM_RESERVE, PAGE_READWRITE);
#else
	void *ptr = mmap(NULL, kpage << 13, PROT_READ | PROT_WRITE, MAP_PRIVATE | MAP_ANONYMOUS, -1, 0);
#endif

	if (ptr == nullptr)
	{
        throw std::bad_alloc();
    }
    
	return ptr;
}

inline static void SystemFree(void* ptr, size_t kpage)
{
#ifdef _WIN32  //windows
	VirtualFree(ptr, 0, MEM_RELEASE);
#else  //linux
	munmap(ptr, kpage << 13);
#endif
}

static void*& NextObj(void* obj)
{
	return *(void**)obj;
}

// 管理切分好的小对象的自由链表
class FreeList
{
public:
    void push(void* obj)
    {
        assert(obj);
        //*(void**)obj = _freeList;
        NextObj(obj) = _freeList;
        _freeList = obj;
        _size++;
    }

    void PushRange(void* start, void* end, size_t n)
    {
        //*(void**)end = _freeList;
        NextObj(end) = _freeList;
        _freeList = start;

        _size += n;
    }

    void PopRange(void*& start, void*& end, size_t n)
	{
        assert(n <= _size);
        start = _freeList;
        end = start;

        for(int i = 0; i < n - 1; i++)
        {
            end = NextObj(end);
        }

        _freeList = NextObj(end);
        NextObj(end) = nullptr;
        _size -= n;
    }

    void* pop()
    {
        assert(_freeList);

        void* ret = _freeList;
        _freeList = NextObj(ret);
        _size--;
        //_freeList = *(void**)_freeList;
        
        return ret;
    }

    bool empty()
    {
        return _freeList == nullptr;
    }

    size_t& GetmaxSize()
    {
        return _maxSize;
    }

    size_t Getsize()
    {
        return _size;
    }

private:
	void* _freeList = nullptr;
    size_t _maxSize = 1;
    size_t _size = 0;

};

class Sizeclass
{
public:
    // 整体控制在最多10%左右的内碎片浪费
	// [1,128]					8byte对齐	     freelist[0,16)
	// [128+1,1024]				16byte对齐	     freelist[16,72)
	// [1024+1,8*1024]			128byte对齐	     freelist[72,128)
	// [8*1024+1,64*1024]		1024byte对齐     freelist[128,184)
	// [64*1024+1,256*1024]		8*1024byte对齐   freelist[184,208)

    static inline size_t _RoundUp(size_t bytes, size_t alignNum)
	{
		return ((bytes + alignNum - 1) & ~(alignNum - 1));
	}

    static size_t RoundUp(size_t size)
    {
        if(size <= 128)
        {
            return _RoundUp(size, 8);
        }
        else if(size <= 1024)
        {
            return _RoundUp(size, 16);
        }
        else if(size <= 8 * 1024)
        {
            return _RoundUp(size, 128);
        }
        else if(size <= 64 * 1024)
        {
            return _RoundUp(size, 1024);
        }
        else if(size <= 256 * 1024)
        {
            return _RoundUp(size, 8 * 1024);
        }
        else
        {
            return _RoundUp(size, 1 << PAGE_SHIFT);
        }
    }

    static inline size_t _Index(size_t bytes, size_t align_shift)
	{
		return ((bytes + (1 << align_shift) - 1) >> align_shift) - 1;
	}

    // 计算映射的哪一个自由链表桶
	static size_t Index(size_t bytes)
	{
        assert(bytes <= MAX_BYTES);

        // 每个区间有多少个链节
		static int group_array[4] = { 16, 56, 56, 56 };
        if(bytes <= 128)
        {
            return _Index(bytes, 3);
        }
        else if(bytes <= 1024)
        {
            return _Index(bytes - 128, 4) + group_array[0];
        }
        else if(bytes <= 8 * 1024)
        {
            return _Index(bytes - 1024, 7) + group_array[0] + group_array[1];
        }
        else if(bytes <= 64 * 1024)
        {
            return _Index(bytes - 8 * 1024, 10) + group_array[0] + group_array[1] + group_array[2];
        }
        else if(bytes <= 256 * 1024)
        {
            return _Index(bytes - 64 * 1024, 13) + group_array[0] + group_array[1] + group_array[2] + group_array[3];
        }
        else
        {
            assert(false);
			return -1;
        }
    }

    // 一次thread cache从中心缓存获取多少个
	static size_t NumMoveSize(size_t size)
	{
		assert(size > 0);

		// [2, 512],一次批量移动多少个对象的(慢启动)上限值
		// 小对象一次批量上限高
		// 小对象一次批量上限低
		int num = MAX_BYTES / size;
		if (num < 2)
		{
            num = 2;
        }

		if (num > 512)
		{
            num = 512;
        }

		return num;
	}

    // 计算一次向系统获取几个页
	// 单个对象 8byte
	// ...
	// 单个对象 256KB
	static size_t NumMovePage(size_t size)
	{
		size_t num = NumMoveSize(size);
		size_t npage = num*size;

		npage >>= PAGE_SHIFT;
		if (npage == 0)
		{
            npage = 1;
        }

		return npage;
	}
};

// 管理多个连续页大块内存跨度结构
struct Span
{
    PAGE_ID _pageId = 0; // 大块内存起始页的页号
	size_t  _n = 0;      // 页的数量

	Span* _next = nullptr;	// 双向链表的结构
	Span* _prev = nullptr;

    size_t _objSize = 0;  // 切好的小对象的大小
	size_t _useCount = 0; // 切好小块内存,被分配给thread cache的计数
	void* _freeList = nullptr;  // 切好的小块内存的自由链表

    bool _isUse = false;
};

class SpanList
{
public:
    SpanList()
    {
        _head = new Span();
        _head->_next = _head;
        _head->_prev = _head;
    }

    Span* Begin()
	{
		return _head->_next;
	}

	Span* End()
	{
		return _head;
	}

	bool empty()
	{
		return _head->_next == _head;
	}

    void pushfront(Span* span)
    {
        insert(Begin(), span);
    }

    Span* popfront()
	{
		Span* front = _head->_next;
		Erase(front);
		return front;
	}
    
    void insert(Span* pos, Span* newSpan)
    {
        assert(pos);
		assert(newSpan);

        Span* prev = pos->_prev;
        prev->_next = newSpan;
        newSpan->_prev = prev;
        newSpan->_next = pos;
        pos->_prev = newSpan;
    }

    void Erase(Span* pos)
	{
		assert(pos);
		assert(pos != _head);

		Span* prev = pos->_prev;
		Span* next = pos->_next;

		prev->_next = next;
		next->_prev = prev;
	}

public:
    std::mutex _mutex; // 桶锁

private:
    Span* _head;

};

5.2 定长内存池的实现

(1)为什么需要定长内存池?

  • 因为在本项目中需要用到new这个函数,new本质上还是malloc,所以在本项目当中需要用到new的地方会减少一点性能,所以使用定长内存池可以提高一点性能。

(2)具体实现:

#pragma once
#include "Common.hpp"

template<class T>
class ObjectPool
{
public:
	T* New()
	{
		T* obj = nullptr;

		// 优先把还回来内存块对象,再次重复利用
		if (_freeList)
		{
			void* next = *((void**)_freeList);
			obj = (T*)_freeList;
			_freeList = next;
		}
		else
		{
			// 剩余内存不够一个对象大小时,则重新开大块空间
			if (_remainBytes < sizeof(T))
			{
				_remainBytes = 128 * 1024;
				//_memory = (char*)malloc(_remainBytes);
				_memory = (char*)SystemAlloc(_remainBytes >> 13);
				if (_memory == nullptr)
				{
					throw std::bad_alloc();
				}
			}

			obj = (T*)_memory;
			size_t objSize = sizeof(T) < sizeof(void*) ? sizeof(void*) : sizeof(T);
			_memory += objSize;
			_remainBytes -= objSize;
		}

		// 定位new,显示调用T的构造函数初始化
		new(obj)T;

		return obj;
	}

	void Delete(T* obj)
	{
		// 显示调用析构函数清理对象
		obj->~T();

		// 头插
		*(void**)obj = _freeList;
		_freeList = obj;
	}

private:
	char* _memory = nullptr; // 指向大块内存的指针
	size_t _remainBytes = 0; // 大块内存在切分过程中剩余字节数

	void* _freeList = nullptr; // 还回来过程中链接的自由链表的头指针
};

5.3 Thread cache模块的实现

(1)实现的主要功能:

  • 申请和释放内存对象。
  • 当空间不足时从中心缓存获取对象。

(2)具体实现:

#pragma once
#include "Common.hpp"
#include "CentralCache.hpp"

class ThreadCache
{
public:
    // 申请内存对象
	void* Allocate(size_t size)
    {
        assert(size <= MAX_BYTES);

        size_t alignSize = Sizeclass::RoundUp(size);
        size_t index = Sizeclass::Index(size);

        if(!_freelists[index].empty())
        {
            return _freelists[index].pop();
        }
        else
        {
            return FetchFromCentralCache(index, alignSize);
        }
    }

    // 释放内存对象
	void Deallocate(void* ptr, size_t size)
    {
        assert(ptr);
	    assert(size <= MAX_BYTES);

        // 找对映射的自由链表桶,对象插入进入
        size_t index = Sizeclass::Index(size);
        _freelists[index].push(ptr);

        // 当链表长度大于一次批量申请的内存时就开始还一段list给central cache
	    if (_freelists[index].Getsize() >= _freelists[index].GetmaxSize())
	    {
		    ListTooLong(_freelists[index], size);
        }
    }

    void ListTooLong(FreeList& list, size_t size)
    {
        void* start = nullptr;
	    void* end = nullptr;
	    list.PopRange(start, end, list.GetmaxSize());

	    CentralCache::GetInstance()->ReleaseListToSpans(start, size);
    }

	// 从中心缓存获取对象
	void* FetchFromCentralCache(size_t index, size_t size)
    {
        // 慢开始反馈调节算法
	    // 1、最开始不会一次向central cache一次批量要太多,因为要太多了可能用不完
	    // 2、如果你不要这个size大小内存需求,那么batchNum就会不断增长,直到上限
	    // 3、size越大,一次向central cache要的batchNum就越小
	    // 4、size越小,一次向central cache要的batchNum就越大
        size_t batchNum = std::min(_freelists[index].GetmaxSize(), Sizeclass::NumMoveSize(size));
        if(_freelists[index].GetmaxSize() == batchNum)
        {
            _freelists[index].GetmaxSize() += 1;
        }

        void* start = nullptr;
	    void* end = nullptr;
        size_t actualNum = CentralCache::GetInstance()->FetchRangeObj(start, end, batchNum, size);
        assert(actualNum >= 1);
        
        if (actualNum == 1)
        {
            assert(start == end);
            return start;
        }
        else
        {
            _freelists[index].PushRange(NextObj(start), end, actualNum - 1);
            return start;
        }
    }

private:
    FreeList _freelists[NFREELIST];

};

// TLS thread local storage
static __thread ThreadCache* pTLSThreadCache = nullptr;

5.4 TLS–thread local storage的实现

(1)实现的主要功能:

  • 实现不同的线程调用自己专属的Thread cache模块。

(2)具体实现:

#pragma once
#include "Common.hpp"
#include "ThreadCache.hpp"
#include "PageCache.hpp"
#include "ObjectPool.hpp"

static void* ConcurrentAlloc(size_t size)
{
    if(size > MAX_BYTES)
    {
        size_t alignSize = Sizeclass::RoundUp(size);
        size_t kpage = alignSize >> PAGE_SHIFT;

        PageCache::GetInstance()->_pageMtx.lock();
        Span* span = PageCache::GetInstance()->NewSpan(kpage);
        span->_objSize = size;
		PageCache::GetInstance()->_pageMtx.unlock();

		void* ptr = (void*)(span->_pageId << PAGE_SHIFT);
		return ptr;
    }
    else
    {
        if(pTLSThreadCache == nullptr)
        {
            static ObjectPool<ThreadCache> tcPool;
            //pTLSThreadCache = new ThreadCache;
            pTLSThreadCache = tcPool.New();
        }

        //cout << std::this_thread::get_id() << " " << ":" << " " << pTLSThreadCache << endl;
        return pTLSThreadCache->Allocate(size);
    }
}

static void ConcurrentFree(void* ptr)
{
    Span* span = PageCache::GetInstance()->MapObjectToSpan(ptr);
    size_t size = span->_objSize;

	if(size > MAX_BYTES)
    {

        PageCache::GetInstance()->_pageMtx.lock();
		PageCache::GetInstance()->ReleaseSpanToPageCache(span);
		PageCache::GetInstance()->_pageMtx.unlock();
    }
    else
    {
        assert(pTLSThreadCache);

	    pTLSThreadCache->Deallocate(ptr, size);
    }
}

5.5 Central cache模块的实现

(1)实现的主要功能:

  • 从中心缓存获取一定数量的对象给Thread cache。
  • 获取一个非空的Span,当Span不足时从Page cache中获取。

(2)具体实现:

#pragma once
#include "Common.hpp"
#include "PageCache.hpp"

// 单例模式
class CentralCache
{
public:
    static CentralCache* GetInstance()
    {
        return &_sInst;
    }

    // 获取一个非空的span
	Span* GetOneSpan(SpanList& list, size_t size)
    {
        // 查看当前的spanlist中是否有还有未分配对象的span
        Span* iter = list.Begin();
        while(iter != list.End())
        {
            if(iter->_freeList != nullptr)
            {
                return iter;
            }
            else
            {
                iter = iter->_next;
            }
        }
        
        // 先把central cache的桶锁解掉,这样如果其他线程释放内存对象回来,不会阻塞
	    list._mutex.unlock();

        // 走到这里说没有空闲span了,只能找page cache要
	    PageCache::GetInstance()->_pageMtx.lock();
	    Span* span = PageCache::GetInstance()->NewSpan(Sizeclass::NumMovePage(size));
        span->_isUse = true;
        span->_objSize = size;
	    PageCache::GetInstance()->_pageMtx.unlock();

        // 对获取span进行切分,不需要加锁,因为这会其他线程访问不到这个span
	    // 计算span的大块内存的起始地址和大块内存的大小(字节数)
        char* start = (char*)(span->_pageId << PAGE_SHIFT);
        //cout << (span->_pageId << PAGE_SHIFT) << endl;
        size_t bytes = span->_n << PAGE_SHIFT;
	    char* end = start + bytes;
        
        // 把大块内存切成自由链表链接起来
	    // 1、先切一块下来去做头,方便尾插
        span->_freeList = start;
        start += size;
        void* tail = span->_freeList;
	    int i = 1;
        while (start < end)
        {
            ++i;
            NextObj(tail) = start;
            tail = NextObj(tail);
            start += size;
        }
        
        NextObj(tail) = nullptr;
        // 切好span以后,需要把span挂到桶里面去的时候,再加锁
	    list._mutex.lock();
	    list.pushfront(span);

	    return span;
        
    }

	// 从中心缓存获取一定数量的对象给thread cache
	size_t FetchRangeObj(void*& start, void*& end, size_t batchNum, size_t size)
    {
        size_t index = Sizeclass::Index(size);
	    _spanLists[index]._mutex.lock();

        Span* span = GetOneSpan(_spanLists[index], size);
        
        assert(span);
	    assert(span->_freeList);

	    // 从span中获取batchNum个对象
	    // 如果不够batchNum个,有多少拿多少
        start = span->_freeList;
        end = start;
        size_t i = 0;
        size_t actualNum = 1;
        while(i < batchNum - 1 && NextObj(end) != nullptr)
        {
            end = NextObj(end);
            ++i;
            ++actualNum;
        }

        span->_freeList = NextObj(end);
        NextObj(end) = nullptr;
        span->_useCount += actualNum;
        
        _spanLists[index]._mutex.unlock();

        return actualNum;
    }

    // 将一定数量的对象释放到span跨度
	void ReleaseListToSpans(void* start, size_t byte_size)
    {
        size_t index = Sizeclass::Index(byte_size);
        _spanLists[index]._mutex.lock();

        while(start)
        {
            void* next = NextObj(start);
            Span* span = PageCache::GetInstance()->MapObjectToSpan(start);
            NextObj(start) = span->_freeList;
            span->_freeList = start;
            span->_useCount--;

            // 说明span的切分出去的所有小块内存都回来了
		    // 这个span就可以再回收给page cache,pagecache可以再尝试去做前后页的合并
            if(span->_useCount == 0)
            {
                _spanLists[index].Erase(span);

                span->_next = nullptr;
                span->_prev = nullptr;
                span->_freeList = nullptr;
                //span->_isUse = false;

                // 释放span给page cache时,使用page cache的锁就可以了
			    // 这时把桶锁解掉
			    _spanLists[index]._mutex.unlock();

                PageCache::GetInstance()->_pageMtx.lock();
			    PageCache::GetInstance()->ReleaseSpanToPageCache(span);
			    PageCache::GetInstance()->_pageMtx.unlock();

                _spanLists[index]._mutex.lock();
            }

            start = next;
        }

		_spanLists[index]._mutex.unlock();
    }

private:
	CentralCache()
	{}

	CentralCache(const CentralCache&) = delete;

	static CentralCache _sInst;
	SpanList _spanLists[NFREELIST];

};

CentralCache CentralCache::_sInst;

5.6 Page cache模块的实现

(1)实现的主要功能:

  • 获取一个K页的span给上层。
  • 获取从对象到span的映射方便在回收Span的时候进行合并回收,减少碎片的产生。
  • 当通过上层回收了Span的时候,需要对回收的Span进行合并。

(2)具体实现:

#pragma once
#include "Common.hpp"
#include "ObjectPool.hpp"
#include "PageMap.hpp"

class PageCache
{
public:
	static PageCache* GetInstance()
	{
		return &_sInst;
	}

	// 获取一个K页的span
	Span* NewSpan(size_t k)
    {
        assert(k > 0);

        // 大于128 page的直接向堆申请
        if (k > NPAGES - 1)
        {
            void* ptr = SystemAlloc(k);
            // Span* span = new Span;
            Span* span = _spanPool.New();

            span->_pageId = (PAGE_ID)ptr >> PAGE_SHIFT;
            span->_n = k;

            _idSpanMap[span->_pageId] = span;
            //_idSpanMap.set(span->_pageId, span);
            return span;
        }

        // 先检查第k个桶里面有没有span
        if (!_spanLists[k].empty())
        {
            Span* kSpan = _spanLists[k].popfront();

            // 建立id和span的映射,方便central cache回收小块内存时,查找对应的span
		    for (PAGE_ID i = 0; i < kSpan->_n; ++i)
		    {
			    _idSpanMap[kSpan->_pageId + i] = kSpan;
                //_idSpanMap.set(kSpan->_pageId + i, kSpan);
		    }

            return kSpan;
        }

        // 检查一下后面的桶里面有没有span,如果有可以把他它进行切分
        for(int i = k + 1; i < NPAGES; i++)
        {
            if(!_spanLists[i].empty())
            {
                Span* nSpan = _spanLists[i].popfront();
                Span* kSpan = _spanPool.New();
			    //Span* kSpan = new Span;

                // 在nSpan的头部切一个k页下来
                // k页span返回
                // nSpan再挂到对应映射的位置
                kSpan->_pageId = nSpan->_pageId;
                kSpan->_n = k;
                //cout << nSpan->_pageId << endl;

                nSpan->_pageId += k;
                nSpan->_n -= k;
                _spanLists[nSpan->_n].pushfront(nSpan);

                // 存储nSpan的首位页号跟nSpan映射,方便page cache回收内存时
			    // 进行的合并查找
                _idSpanMap[nSpan->_pageId] = nSpan;
			    _idSpanMap[nSpan->_pageId + nSpan->_n - 1] = nSpan;
                //_idSpanMap.set(nSpan->_pageId, nSpan);
                //_idSpanMap.set(nSpan->_pageId + nSpan->_n - 1, nSpan);

                // 建立id和span的映射,方便central cache回收小块内存时,查找对应的span
			    for (PAGE_ID i = 0; i < kSpan->_n; ++i)
			    {
				    _idSpanMap[kSpan->_pageId + i] = kSpan;
                    //_idSpanMap.set(kSpan->_pageId + i, kSpan);
                }
                
                return kSpan;
            }
        }

        // 走到这个位置就说明后面没有大页的span了
	    // 这时就去找堆要一个128页的span
	    //Span* bigSpan = new Span;
        Span* bigSpan = _spanPool.New();
        void* ptr = SystemAlloc(NPAGES - 1);
	    bigSpan->_pageId = (PAGE_ID)ptr >> PAGE_SHIFT;
        //cout << bigSpan->_pageId << endl;
	    bigSpan->_n = NPAGES - 1;
	    _spanLists[bigSpan->_n].pushfront(bigSpan);

	    return NewSpan(k);
    }

    // 获取从对象到span的映射
    Span* MapObjectToSpan(void* obj)
    {
        PAGE_ID id = ((PAGE_ID)obj >> PAGE_SHIFT);

        std::unique_lock<std::mutex> lock(_pageMtx);
        auto ret = _idSpanMap.find(id);
        if (ret != _idSpanMap.end())
        {
            return ret->second;
        }
        else
        {
            assert(false);
            return nullptr;
        }

        /*auto ret = (Span*)_idSpanMap.get(id);
	    assert(ret != nullptr);
	    return ret;*/
    }

    // 释放空闲span回到Pagecache,并合并相邻的span
    void ReleaseSpanToPageCache(Span* span)
    {
        // 大于128 page的直接还给堆
        if (span->_n > NPAGES - 1)
        {
            void *ptr = (void *)(span->_pageId << PAGE_SHIFT);
            SystemFree(ptr, span->_n);
            // delete span;
            _spanPool.Delete(span);

            return;
        }

        // 对span前后的页,尝试进行合并,缓解内存碎片问题
	    while (1)
	    {
            PAGE_ID prevId = span->_pageId - 1;
		    auto iter = _idSpanMap.find(prevId);
		    // 前面的页号没有,不合并了
            if(iter == _idSpanMap.end())
            {
                break;
            }

            /*auto ret = (Span*)_idSpanMap.get(prevId);
		    if (ret == nullptr)
		    {
			    break;
		    }*/

            Span* prevSpan = iter->second;
            if(prevSpan->_isUse == true)
            {
                break;
            }

            span->_pageId = prevSpan->_pageId;
            span->_n += prevSpan->_n;

            _spanLists[prevSpan->_n].Erase(prevSpan);
            _spanPool.Delete(prevSpan);
            //delete prevSpan;
        }

        // 向后合并
	    while (1)
	    {
            PAGE_ID nextId = span->_pageId + span->_n;
            auto iter = _idSpanMap.find(nextId);
            if(iter == _idSpanMap.end())
            {
                break;
            }

            /*auto ret = (Span*)_idSpanMap.get(nextId);
		    if (ret == nullptr)
		    {
			    break;
		    }*/

            Span* nextSpan = iter->second;
            if(nextSpan->_isUse == true)
            {
                break;
            }

            if(nextSpan->_n + span->_n > NPAGES - 1)
            {
                break;
            }

            span->_n += nextSpan->_n;

            _spanLists[nextSpan->_n].Erase(nextSpan);
            _spanPool.Delete(nextSpan);
            //delete nextSpan;
        }

        _spanLists[span->_n].pushfront(span);
        span->_isUse = false;
	    _idSpanMap[span->_pageId] = span;
	    _idSpanMap[span->_pageId+span->_n - 1] = span;
        //_idSpanMap.set(span->_pageId, span);
        //_idSpanMap.set(span->_pageId+span->_n - 1, span);
    }

public:
	std::mutex _pageMtx;

private:
	SpanList _spanLists[NPAGES];

	PageCache()
	{}

	PageCache(const PageCache&) = delete;

    std::unordered_map<PAGE_ID, Span*> _idSpanMap;
    //TCMalloc_PageMap1<32 - PAGE_SHIFT> _idSpanMap;
    ObjectPool<Span> _spanPool;
	static PageCache _sInst;

};

PageCache PageCache::_sInst;

6. 申请内存的流程解析

6.1 申请内存和计算对齐大小和所属哈希桶详解

(1)分配到pTLSThreadCache线程后再去Allocate内存:


(2)内存进行对齐:


(3)Index计算内存在ThreadCache哪个桶中,例如6b大小在第0个桶中:

6.2 慢反馈算法来获得要需要申请对象的数量

(1)刚开始运行项目的时候_freeList[index]为空,需要去Central cache申请内存。_freeLists[index]的maxsize为空,通过慢反馈获得要申请小对象的数量,如果batchNum为maxsize,则申请小对象数量为1。


(2)慢反馈调节,MAX_BYTES为256k,申请字节越大,返回越少:

6.3 从CentralCache中获得对象

(1)定义start和end,通过FetchRangeObj从CentralCache对应哈希桶中获得对象,如果不够则有多少获得多少,数量为actualNum。**大于1的话将多余的对象插入_freeList[index]中。**返回内存起始地址start,一次内存申请完成。


(2)计算size所在的哈希桶,给对应_spanLists[index]加锁,然后获取span对象:


(3)GetOneSpan函数依次判断桶中的各个span有无小对象(也就是FreeList),如果有则返回:

6.4 从PageCache中获取对象

(1)如果CentralCache对应哈希桶中没有小对象,则去PageCache申请一个span大的空间,将span标记为已使用:

(2)慢反馈NumMovePage,size为8b时,通过NumMoveSize慢反馈获得num为512,npage为4096,右移13位为0,申请1页。


(3)NewSpan获取1页的span,先判断PageCache对应的桶中有没有span,如果有则_spanLists[k]头删一个kspan并返回。


(4)如果PageCache的桶中有没有span,检查后面的桶的span,申请一个1页的span,如果有一个10页的span,则切分为一个1页的kSpan和一个9页的nSpan,返回kSpan。如果PageCache中没有Span,则去堆上申请一个128页的内存,放在bigSpan中,将bigSpan头插到第128个_spanLists中,然后再执行一遍NewSpan返回需要的span。

6.5 切分申请好的span

(1)PageCache申请一个span大的空间后将span进行切分,计算起始地址和结束地址,将切分好的小对象依次链接起来:

6.6 从span中取batchNum个对象

(1)从_spanLists[index]中获取了span,start为_freeList头,将end递增到batchNum个对象,之后的内存接到_freeList中,返回实际获得的对象数量actualNum。

7. 释放内存的流程解析

7.1 获取对象到span的映射

(1)获得从对象到span的映射,获到释放对象的字节大小(8b),该线程调用Deallocate释放:


2)将对象地址强转为PAGE_ID(size_t)类型再右移13位得到对象的id,通过id找到PageCache中分出去对应的span,并返回该span。

7.2 将对象插入自由链表桶

(1)获取Index桶号,将对象Push到对应_freeLists[index]中,释放完成。判断_freeLists[index]大于申请时候的值MaxSize,说明申请的内存都还回来了,则统一释放给CentralCache。当链表过长时释放链表到CentralCache对应_spanLists[index]中。先弹出所有的小对象存到start和end中,再通过start和大小释放给CentralCache。


(2)通过所给的起始地址和个数进行删除:

7.3 释放对象到CentralCache中

(1)通过size获得Index所在的桶,将_spanLists[index]加锁便于插入对象,当start还有对象时,获取下一个对象NextObj,获取start指向的对象的span映射,将start头插到span的_freeList链表中,span分出去的小对象计数_useCount- -,start后移到下一个小对象,然后继续头插。直到将所有小对象插入后解开桶锁。


(2)如果_useCount为0说明所有小对象都回来了,将span从该桶中摘除,指针置空。然后PageCache加锁,将该span还给PageCache。

7.4 释放对象到PageCache

(1)先获取span的页号,减1获得前一个页的页号prevId,通过prevId得到前一个span,如果前一个span没有使用,则与当前的span进行合并,将 span的起始页调整为前一个页段的起始页,并更新页数。然后,从_spanLists中删除prevSpan,在_spanPool中调用delete删除prevSpan。


(2)然后获得下一个span的_pageId,检查下一个span是否没有使用,通过nextId返回对应的Span,合并span的_n页数,将nextSpan在_spanLists中摘除,通过_spanPool调用delete释放nextSpan。


(3)PageCache中的_spanLists中插入还回来的span,标记为未使用状态,建立_pageId和span的映射。

8. 代码测试

(1)测试代码:

#include "ConcurrentAlloc.hpp"
#include <atomic>

// ntimes 一轮申请和释放内存的次数
// rounds 轮次
void BenchmarkMalloc(size_t ntimes, size_t nworks, size_t rounds)
{
	std::vector<std::thread> vthread(nworks);
	std::atomic<size_t> malloc_costtime(0);
	std::atomic<size_t> free_costtime(0);

	for (size_t k = 0; k < nworks; ++k)
	{
		vthread[k] = std::thread([&, k]() {
			std::vector<void*> v;
			v.reserve(ntimes);

			for (size_t j = 0; j < rounds; ++j)
			{
				size_t begin1 = clock();
				for (size_t i = 0; i < ntimes; i++)
				{
					v.push_back(malloc(16));
					//v.push_back(malloc((16 + i) % 8192 + 1));
				}
				size_t end1 = clock();

				size_t begin2 = clock();
				for (size_t i = 0; i < ntimes; i++)
				{
					free(v[i]);
				}
				size_t end2 = clock();
				v.clear();

				malloc_costtime += (end1 - begin1);
				free_costtime += (end2 - begin2);
			}
		});
	}

	for (auto& t : vthread)
	{
		t.join();
	}

    cout << nworks << "个线程并发执行" << rounds << "u轮次,每轮次malloc " 
        << ntimes << "次: 花费:" << malloc_costtime << " ms" <<endl;

    cout << nworks << "个线程并发执行" << rounds << "u轮次,每轮次free " 
        << ntimes << "次: 花费:" << free_costtime << " ms" <<endl;

    cout << nworks << "个线程并发malloc&free " << nworks * rounds * ntimes << "次,总计花费:" 
        << malloc_costtime + free_costtime << " ms" <<endl;

	/*printf("%u个线程并发执行%u轮次,每轮次malloc %u次: 花费:%u ms\n",
		nworks, rounds, ntimes, malloc_costtime);

	printf("%u个线程并发执行%u轮次,每轮次free %u次: 花费:%u ms\n",
		nworks, rounds, ntimes, free_costtime);

	printf("%u个线程并发malloc&free %u次,总计花费:%u ms\n",
		nworks, nworks*rounds*ntimes, malloc_costtime + free_costtime);*/
}


// 单轮次申请释放次数 线程数 轮次
void BenchmarkConcurrentMalloc(size_t ntimes, size_t nworks, size_t rounds)
{
	std::vector<std::thread> vthread(nworks);
	std::atomic<size_t> malloc_costtime(0);
	std::atomic<size_t> free_costtime(0);

	for (size_t k = 0; k < nworks; ++k)
	{
		vthread[k] = std::thread([&]() {
			std::vector<void*> v;
			v.reserve(ntimes);

			for (size_t j = 0; j < rounds; ++j)
			{
				size_t begin1 = clock();
				for (size_t i = 0; i < ntimes; i++)
				{
					v.push_back(ConcurrentAlloc(16));
					//v.push_back(ConcurrentAlloc((16 + i) % 8192 + 1));
				}
				size_t end1 = clock();

				size_t begin2 = clock();
				for (size_t i = 0; i < ntimes; i++)
				{
					ConcurrentFree(v[i]);
				}
				size_t end2 = clock();
				v.clear();

				malloc_costtime += (end1 - begin1);
				free_costtime += (end2 - begin2);
			}
		});
	}

	for (auto& t : vthread)
	{
		t.join();
	}

    cout << nworks << "个线程并发执行" << rounds << "u轮次,每轮次concurrent alloc " 
        << ntimes << "次: 花费:" << malloc_costtime << " ms" <<endl;

    cout << nworks << "个线程并发执行" << rounds << "u轮次,每轮次concurrent dealloc " 
        << ntimes << "次: 花费:" << free_costtime << " ms" <<endl;

    cout << nworks << "个线程并发concurrent alloc&dealloc " << nworks * rounds * ntimes << "次,总计花费:" 
        << malloc_costtime + free_costtime << " ms" <<endl;

	/*printf("%u个线程并发执行%u轮次,每轮次concurrent alloc %u次: 花费:%u ms\n",
		nworks, rounds, ntimes, malloc_costtime);

	printf("%u个线程并发执行%u轮次,每轮次concurrent dealloc %u次: 花费:%u ms\n",
		nworks, rounds, ntimes, free_costtime);

	printf("%u个线程并发concurrent alloc&dealloc %u次,总计花费:%u ms\n",
		nworks, nworks*rounds*ntimes, malloc_costtime + free_costtime);*/
}

int main()
{
	size_t n = 1000;
	cout << "==========================================================" << endl;
	BenchmarkConcurrentMalloc(n, 4, 10);
	cout << endl << endl;

	BenchmarkMalloc(n, 4, 10);
	cout << "==========================================================" << endl;

	return 0;
}

(2)运行结果:

9. 性能分析以及优化

9.1 性能分析

可以看到测试结果在多次申请固定内存时,我们实现的内存池和系统的 malloc 在效率上还是略有差距。可以使用 Visual Studio 自带的性能检测工具分析代码。

9.2 性能优化

(1)我们发现在获取页号到 Span 的函数里为了保护该临界资源加了锁,这把锁占用了较多的资源,故采用类似基数树的哈希表进行优化,结构如下:

(2)此前,我们使用 unordered_map 或 map 维护 PageId 与 Span* 的映射关系,在对该表进行写时都会对数据结构进行修改,比如哈希表的扩容、红黑树的结点旋转,多线程对这种临界资源的修改就需要加锁。所以我们把unordered_map 存放PageId 与 Span* 的映射关系改成基数树来进行存放即可。

(3)采用类似基数树的哈希表进行优化后,不用再对查表过程加锁,原因如下:

  • 对该表进行构造时就开好了空间,二同一个线程的读写是分离的,不会改变整个结构。
  • 多线程对表进行读写时,由于外面还有 Page Cache 里的一把大锁的保护,保证了不同线程互斥地对表进行读写的。

(4)基数树代码:

#pragma once
#include <cstring>
#include <cassert>
#include"Common.hpp"

// Single-level array
template <int BITS>
class TCMalloc_PageMap1 {
private:
	static const int LENGTH = 1 << BITS;
	void** array_;

public:
	typedef uintptr_t Number;

	//explicit TCMalloc_PageMap1(void* (*allocator)(size_t)) {
	explicit TCMalloc_PageMap1() {
		//array_ = reinterpret_cast<void**>((*allocator)(sizeof(void*) << BITS));
		size_t size = sizeof(void*) << BITS;
		size_t alignSize = Sizeclass::_RoundUp(size, 1<<PAGE_SHIFT);
		array_ = (void**)SystemAlloc(alignSize>>PAGE_SHIFT);
		memset(array_, 0, sizeof(void*) << BITS);
	}

	// Return the current value for KEY.  Returns NULL if not yet set,
	// or if k is out of range.
	void* get(Number k) const {
		if ((k >> BITS) > 0) {
			return NULL;
		}
		return array_[k];
	}

	// REQUIRES "k" is in range "[0,2^BITS-1]".
	// REQUIRES "k" has been ensured before.
	//
	// Sets the value 'v' for key 'k'.
	void set(Number k, void* v) {
        array_[k] = v;
        cout << v << endl;
	}
};

// Two-level radix tree
template <int BITS>
class TCMalloc_PageMap2 {
private:
	// Put 32 entries in the root and (2^BITS)/32 entries in each leaf.
	static const int ROOT_BITS = 5;
	static const int ROOT_LENGTH = 1 << ROOT_BITS;

	static const int LEAF_BITS = BITS - ROOT_BITS;
	static const int LEAF_LENGTH = 1 << LEAF_BITS;

	// Leaf node
	struct Leaf {
		void* values[LEAF_LENGTH];
	};

	Leaf* root_[ROOT_LENGTH];             // Pointers to 32 child nodes
	void* (*allocator_)(size_t);          // Memory allocator

public:
	typedef uintptr_t Number;

	//explicit TCMalloc_PageMap2(void* (*allocator)(size_t)) {
	explicit TCMalloc_PageMap2() {
		//allocator_ = allocator;
		memset(root_, 0, sizeof(root_));

		PreallocateMoreMemory();
	}

	void* get(Number k) const {
		const Number i1 = k >> LEAF_BITS;
		const Number i2 = k & (LEAF_LENGTH - 1);
		if ((k >> BITS) > 0 || root_[i1] == NULL) {
			return NULL;
		}
		return root_[i1]->values[i2];
	}

	void set(Number k, void* v) {
		const Number i1 = k >> LEAF_BITS;
		const Number i2 = k & (LEAF_LENGTH - 1);
		assert(i1 < ROOT_LENGTH);
		root_[i1]->values[i2] = v;
	}

	bool Ensure(Number start, size_t n) {
		for (Number key = start; key <= start + n - 1;) {
			const Number i1 = key >> LEAF_BITS;

			// Check for overflow
			if (i1 >= ROOT_LENGTH)
				return false;

			// Make 2nd level node if necessary
			if (root_[i1] == NULL) {
				//Leaf* leaf = reinterpret_cast<Leaf*>((*allocator_)(sizeof(Leaf)));
				//if (leaf == NULL) return false;
				static ObjectPool<Leaf>	leafPool;
				Leaf* leaf = (Leaf*)leafPool.New();

				memset(leaf, 0, sizeof(*leaf));
				root_[i1] = leaf;
			}

			// Advance key past whatever is covered by this leaf node
			key = ((key >> LEAF_BITS) + 1) << LEAF_BITS;
		}
		return true;
	}

	void PreallocateMoreMemory() {
		// Allocate enough to keep track of all possible pages
		Ensure(0, 1 << BITS);
	}
};

// Three-level radix tree
template <int BITS>
class TCMalloc_PageMap3 {
private:
	// How many bits should we consume at each interior level
	static const int INTERIOR_BITS = (BITS + 2) / 3; // Round-up
	static const int INTERIOR_LENGTH = 1 << INTERIOR_BITS;

	// How many bits should we consume at leaf level
	static const int LEAF_BITS = BITS - 2 * INTERIOR_BITS;
	static const int LEAF_LENGTH = 1 << LEAF_BITS;

	// Interior node
	struct Node {
		Node* ptrs[INTERIOR_LENGTH];
	};

	// Leaf node
	struct Leaf {
		void* values[LEAF_LENGTH];
	};

	Node* root_;                          // Root of radix tree
	void* (*allocator_)(size_t);          // Memory allocator

	Node* NewNode() {
		Node* result = reinterpret_cast<Node*>((*allocator_)(sizeof(Node)));
		if (result != NULL) {
			memset(result, 0, sizeof(*result));
		}
		return result;
	}

public:
	typedef uintptr_t Number;

	explicit TCMalloc_PageMap3(void* (*allocator)(size_t)) {
		allocator_ = allocator;
		root_ = NewNode();
	}

	void* get(Number k) const {
		const Number i1 = k >> (LEAF_BITS + INTERIOR_BITS);
		const Number i2 = (k >> LEAF_BITS) & (INTERIOR_LENGTH - 1);
		const Number i3 = k & (LEAF_LENGTH - 1);
		if ((k >> BITS) > 0 ||
			root_->ptrs[i1] == NULL || root_->ptrs[i1]->ptrs[i2] == NULL) {
			return NULL;
		}
		return reinterpret_cast<Leaf*>(root_->ptrs[i1]->ptrs[i2])->values[i3];
	}

	void set(Number k, void* v) {
		assert(k >> BITS == 0);
		const Number i1 = k >> (LEAF_BITS + INTERIOR_BITS);
		const Number i2 = (k >> LEAF_BITS) & (INTERIOR_LENGTH - 1);
		const Number i3 = k & (LEAF_LENGTH - 1);
		reinterpret_cast<Leaf*>(root_->ptrs[i1]->ptrs[i2])->values[i3] = v;
	}

	bool Ensure(Number start, size_t n) {
		for (Number key = start; key <= start + n - 1;) {
			const Number i1 = key >> (LEAF_BITS + INTERIOR_BITS);
			const Number i2 = (key >> LEAF_BITS) & (INTERIOR_LENGTH - 1);

			// Check for overflow
			if (i1 >= INTERIOR_LENGTH || i2 >= INTERIOR_LENGTH)
				return false;

			// Make 2nd level node if necessary
			if (root_->ptrs[i1] == NULL) {
				Node* n = NewNode();
				if (n == NULL) return false;
				root_->ptrs[i1] = n;
			}

			// Make leaf node if necessary
			if (root_->ptrs[i1]->ptrs[i2] == NULL) {
				Leaf* leaf = reinterpret_cast<Leaf*>((*allocator_)(sizeof(Leaf)));
				if (leaf == NULL) return false;
				memset(leaf, 0, sizeof(*leaf));
				root_->ptrs[i1]->ptrs[i2] = reinterpret_cast<Node*>(leaf);
			}

			// Advance key past whatever is covered by this leaf node
			key = ((key >> LEAF_BITS) + 1) << LEAF_BITS;
		}
		return true;
	}

	void PreallocateMoreMemory() {
	}
};

10. 项目总结

  1. 本项目实现的是tcmalloc的部分主要代码,并没有全部实现,了解tcmalloc的原理就可以了。

  2. 更加了解tcmalloc的框架。

  3. 参考文章:

    1. 几个内存池库的对比
    2. tcmalloc源码学习。
    3. TCMALLOC 源码阅读。
    4. tcmalloc源代码。
  4. 本项目源码链接:https://gitee.com/liu-yechi/new_code/tree/master/concurrent_memory_pool。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/2216214.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

MySQL数据的导入

【图书推荐】《MySQL 9从入门到性能优化&#xff08;视频教学版&#xff09;》-CSDN博客 《MySQL 9从入门到性能优化&#xff08;视频教学版&#xff09;&#xff08;数据库技术丛书&#xff09;》(王英英)【摘要 书评 试读】- 京东图书 (jd.com) MySQL9数据库技术_夏天又到了…

(36)高分辨率频谱——通过在时域序列后面补零提高频域分辨率

文章目录 前言一、仿真分析较少的采集数据1.MATLAB代码2.仿真结果 二、高分辨率频谱1.有限信号样本高分辨率频谱的计算方法2.仿真结果 前言 在实际工程应用中&#xff0c;我们很多时候所能采集的信号并不正好是整周期的。此时若对信号做傅里叶变化&#xff0c;得到的结果中包含…

跨境电商不同节点的物流配送实现

由于涉及到国际运输、清关、仓储和本地配送等复杂环节,跨境物流与国内物流有着显著区别。本文将详细介绍跨境电商的不同物流配送模式,以及从头程到尾程各个节点的实现过程,并讨论相应的电商ERP系统在各个环节中的设计要点。 一、跨境电商的物流配送模式 跨境电商的物流配送…

【C】分支与循环2--while/for/do-while/goto以及break和continue在不同循环中的辨析~

分支与循环 while循环 if与while的对比 if(表达式)语句&#xff1b;while(表达式)语句&#xff1b;下面来看一个例子&#xff1a; 用 if 写&#xff1a; #define _CRT_SECURE_NO_WARNINGS #include <stdio.h> int main() {if (1)printf("hehe");//if后面条…

数据结构——排序(2)

数据结构——排序(2) 文章目录 数据结构——排序(2)前言&#xff1a;1.快速排序&#xff08;非递归版本&#xff09;基本步骤&#xff1a;代码实现 2.归并排序算法思想&#xff1a;核心步骤&#xff1a;代码实现&#xff1a;特征总结&#xff1a; 3.计数排序&#xff08;非比较…

跨境电商独立站的本地化

随着全球电商的快速发展,越来越多的跨境电商选择建立独立站以摆脱平台限制,打造品牌影响力。独立站的成功不仅依赖于技术能力和供应链管理,更取决于对目标市场的本地化策略。 本文将从网站建设、商品信息展示、SEO推广等方面分析跨境电商独立站如何进行本地化适配,并讨论如…

SMARTFORM 条形码和二维码

本文主要是记录了二维码和条形码的创建以及在SMARTFORMS的使用 文章目录 SMARTFORM 条形码创建新的BARCODE创建条形码样式创建段落样式SMARTFORM调用条形码 SMARTFORM 二维码创建二维码Module Size 调节二维码的尺寸Module Size: 像素值ErrCorrLev : 容错率二维码的使用 SMARTF…

Springboot3+druid+jasypt+application.yml配置文件数据库密码加密技术

说明 开发环境我们经常把数据库密码直接明文暴露在配置文件中,但是在生产环境,出于安全考虑,必须对数据库密码进行加密。 Jasypt是一个简单易用的Java加密工具库。Jasypt支持多种加密算法,如AES、SHA512、AES_256等,以适应不同的安全需求,可以轻松将加密技术应用于配置…

虚拟机错误:‘VirtualBox Host-Only Ethernet Adapter #2‘

这个错误确实让我很难受。同时我也找了很多的方法&#xff0c;最终得到了这种方法是比较有效的。 参考 https://www.virtualbox.org/ticket/16807

2023年“网络建设与运维”广西省赛试题复盘

2023年“网络搭建与应用”省赛试题复盘 第一部分&#xff1a;网络搭建及安全部署项目 &#xff08;500分&#xff09; 一、竞赛内容分布 “网络搭建与应用”竞赛共分二个部分&#xff0c;其中&#xff1a; 第一部分&#xff1a;网络搭建及安全部署项目 第二部分&#xff1a;服…

Uncaught (in promise) TypeError: Cannot convert object to primitive value

使用vue3的时候报了这个错误&#xff0c;而且还同时报了一个警告 说一下我这里这个错误和警告的原因&#xff0c;是因为我把传给 第三方组件的值 也当做了 第三方组件的 ref&#xff0c;可能没太说清楚&#xff0c;所以接下来看代码&#xff0c;我这里使用的第三方组件是 vxe-t…

Vue2项目中使用 echarts(5.2.2)图表组件含代码(二)

1.图表预览 2.注释说明 Vue 的组件开发 通过 props 定义外部传入的属性&#xff0c;例如 className、width、height、autoResize、chartData 等。使用 Vue 的生命周期钩子函数 mounted 进行 ECharts 的实例化&#xff0c;确保组件加载完毕后才初始化图表。通过 watch 监听 ch…

半小时速通RHCSA

1-7章: #01创建以上目录和文件结构&#xff0c;并将/yasuo目录拷贝4份到/目录下 #02查看系统合法shell #03查看系统发行版版本 #04查看系统内核版本 #05临时修改主机名 #06查看系统指令的查找路径 #07查看passwd指令的执行路径 #08为/yasuo/ssh_config文件在/mulu目录下创建软链…

【Vue】Vue扫盲(四)组件化思想与简单应用

【Vue】Vue扫盲&#xff08;一&#xff09;事件标签、事件修饰符&#xff1a;click.prevent click.stop click.stop.prevent、按键修饰符、及常用指令 【Vue】Vue扫盲&#xff08;二&#xff09;指令&#xff1a;v-for 、v-if、v-else-if、v-else、v-show 【Vue】Vue扫盲&…

Oracle-19g数据库的安装

简介 Oracle是一家全球领先的数据库和云解决方案提供商。他们提供了一套完整的技术和产品&#xff0c;包括数据库管理系统、企业级应用程序、人工智能和机器学习工具等。Oracle的数据库管理系统是业界最受欢迎和广泛使用的数据库之一&#xff0c;它可以管理和存储大量结构化和…

内核定时器API实现点灯

1.内核定时器 定时器是一个很常用的功能&#xff0c;需要周期性处理的工作都要用到定时器。 Linux 内核定时器 采用系统时钟来实现&#xff0c;并不是6ull里面的硬件定时器。 Linux 内核定时器使用很简单&#xff0c;只需要提供超时时间(相当于定时值)和定时处理函数即…

高阶数据结构与算法——红黑树の奥秘

1.认识红黑树 1.1红黑树的概念 红⿊树是⼀棵⼆叉搜索树&#xff0c;他的每个结点增加⼀个存储位来表⽰结点的颜⾊&#xff0c;可以是红⾊或者⿊⾊。通过对任何⼀条从根到叶⼦的路径上各个结点的颜⾊进⾏约束&#xff0c;红⿊树确保没有⼀条路径会⽐其他路径⻓出2倍&#xff0c…

graphrag学习总结

学习视频&#xff1a;b站链接 项目链接 GraphRAG 的基本概念 Document&#xff08;文档&#xff09;&#xff1a;系统中的输入文档。这些文档要么代表CSV中的单独行&#xff0c;要么代表单独的txt文件。 TextUnit&#xff08;文本块&#xff09;&#xff1a;要分析的文本块。…

pdf阅读软件有哪些?5个软件帮助你快速进行pdf阅读

pdf阅读软件有哪些&#xff1f;5个软件帮助你快速进行pdf阅读 如果你正在寻找优秀的PDF阅读软件&#xff0c;以下推荐的5款软件能够帮助你快速、轻松地阅读和管理PDF文件。这些工具各具特色&#xff0c;适用于不同的使用需求&#xff0c;无论是注释、签名、还是管理大文件&…

使用Rollup.js快速开始构建一个前端项目

Rollup 是一个用于 JavaScript 项目的模块打包器&#xff0c;它将小块代码编译成更大、更复杂的代码&#xff0c;例如库或应用程序。Rollup 对代码模块使用 ES6 模块标准&#xff0c;它支持 Tree-shaking&#xff08;摇树优化&#xff09;&#xff0c;可以剔除那些实际上没有被…