🎇C++学习历程:入门
- 博客主页:一起去看日落吗
- 持续分享博主的C++学习历程
博主的能力有限,出现错误希望大家不吝赐教
- 分享给大家一句我很喜欢的话: 也许你现在做的事情,暂时看不到成果,但不要忘记,树🌿成长之前也要扎根,也要在漫长的时光🌞中沉淀养分。静下来想一想,哪有这么多的天赋异禀,那些让你羡慕的优秀的人也都曾默默地翻山越岭🐾。
💐 🌸 🌷 🍀
目录
- 💐1. 高并发内存池整体框架设计
- 💐2. threadcache
- 🌷2.1 threadcache整体设计
- 🌷2.2 threadcache哈希桶映射对齐规则
- 🌷2.3 threadcacheTLS无锁访问
- 💐3. 完整代码
- 🌷3.1 Common.hpp
- 🌷3.2 ThreadCache.cpp
- 🌷3.3 ConcurrentAlloc.hpp
- 🌷3.4 ThreadCache.hpp
💐1. 高并发内存池整体框架设计
- 该项目解决的是什么问题?
现代很多的开发环境都是多核多线程,因此在申请内存的时,必然存在激烈的锁竞争问题。malloc本身其实已经很优秀了,但是在并发场景下可能会因为频繁的加锁和解锁导致效率有所降低,而该项目的原型tcmalloc实现的就是一种在多线程高并发场景下更胜一筹的内存池。
在实现内存池时我们一般需要考虑到效率问题和内存碎片的问题,但对于高并发内存池来说,我们还需要考虑在多线程环境下的锁竞争问题。
- 高并发内存池整体框架设计
高并发内存池主要由以下三个部分构成:
- thread cache: 线程缓存是每个线程独有的,用于小于等于256KB的内存分配,每个线程独享一个thread cache。
- central cache: 中心缓存是所有线程所共享的,当thread cache需要内存时会按需从central cache中获取内存,而当thread cache中的内存满足一定条件时,central cache也会在合适的时机对其进行回收。
- page cache: 页缓存中存储的内存是以页为单位进行存储及分配的,当central cache需要内存时,page cache会分配出一定数量的页分配给central cache,而当central cache中的内存满足一定条件时,page cache也会在合适的时机对其进行回收,并将回收的内存尽可能的进行合并,组成更大的连续内存块,缓解内存碎片的问题。
进一步说明:
-
每个线程都有一个属于自己的thread cache,也就意味着线程在thread cache申请内存时是不需要加锁的,而一次性申请大于256KB内存的情况是很少的,因此大部分情况下申请内存时都是无锁的,这也就是这个高并发内存池高效的地方。
-
每个线程的thread cache会根据自己的情况向central cache申请或归还内存,这就避免了出现单个线程的thread cache占用太多内存,而其余thread cache出现内存吃紧的问题。
-
多线程的thread cache可能会同时找central cache申请内存,此时就会涉及线程安全的问题,因此在访问central cache时是需要加锁的,但central cache实际上是一个哈希桶的结构,只有当多个线程同时访问同一个桶时才需要加锁,所以这里的锁竞争也不会很激烈。
各个部分的主要作用:
-
thread cache主要解决锁竞争的问题,每个线程独享自己的thread cache,当自己的thread cache中有内存时该线程不会去和其他线程进行竞争,每个线程只要在自己的thread cache申请内存就行了。
-
central cache主要起到一个居中调度的作用,每个线程的thread cache需要内存时从central cache获取,而当thread cache的内存多了就会将内存还给central cache,其作用类似于一个中枢,因此取名为中心缓存。
-
page cache就负责提供以页为单位的大块内存,当central cache需要内存时就会去向page cache申请,而当page cache没有内存了就会直接去找系统,也就是直接去堆上按页申请内存块。
💐2. threadcache
🌷2.1 threadcache整体设计
定长内存池只支持固定大小内存块的申请释放,因此定长内存池中只需要一个自由链表管理释放回来的内存块。现在我们要支持申请和释放不同大小的内存块,那么我们就需要多个自由链表来管理释放回来的内存块,因此thread cache实际上一个哈希桶结构,每个桶中存放的都是一个自由链表。
thread cache支持小于等于256KB内存的申请,如果我们将每种字节数的内存块都用一个自由链表进行管理的话,那么此时我们就需要20多万个自由链表,光是存储这些自由链表的头指针就需要消耗大量内存,这显然是得不偿失的。
这时我们可以选择做一些平衡的牺牲,让这些字节数按照某种规则进行对齐,例如我们让这些字节数都按照8字节进行向上对齐,那么thread cache的结构就是下面这样的,此时当线程申请18字节的内存时会直接给出8字节,而当线程申请916字节的内存时会直接给出16字节,以此类推。
因此当线程要申请某一大小的内存块时,就需要经过某种计算得到对齐后的字节数,进而找到对应的哈希桶,如果该哈希桶中的自由链表中有内存块,那就从自由链表中头删一个内存块进行返回;如果该自由链表已经为空了,那么就需要向下一层的central cache进行获取了。
但此时由于对齐的原因,就可能会产生一些碎片化的内存无法被利用,比如线程只申请了6字节的内存,而thread cache却直接给了8字节的内存,这多给出的2字节就无法被利用,导致了一定程度的空间浪费,这些因为某些对齐原因导致无法被利用的内存,就是内存碎片中的内部碎片。
我们首先先实现自由链表所需的三个必要功能:头插,头删,判断空
static void*& NextObj(void* obj)
{
return *(void**)obj;
}
//管理切分好的小对象的自由链表
class FreeList
{
public:
//将释放的头对象头插进自由链表
void Push(void* obj)
{
assert(obj);
//头插
NextObj(obj) = _freeList;
_freeList = obj;
}
//从自由链表头部获取对象
void* Pop()
{
assert(_freeList);
//头删
void* obj = _freeList;
_freeList = NextObj(_freeList);
return obj;
}
bool Empty()
{
return _freeList == nullptr;
}
private:
void* _freeList = nullptr;//自由链表
};
因此thread cache实际就是一个数组,数组中存储的就是一个个的自由链表,至于这个数组中到底存储了多少个自由链表,就需要看我们在进行字节数对齐时具体用的是什么映射对齐规则了。
🌷2.2 threadcache哈希桶映射对齐规则
- 如何进行对齐?
首先,这些内存块是会被链接到自由链表上的,因此一开始肯定是按8字节进行对齐是最合适的,因为我们必须保证这些内存块,无论是在32位平台下还是64位平台下,都至少能够存储得下一个指针。
但如果所有的字节数都按照8字节进行对齐的话,那么我们就需要建立 256 × 1024 ÷ 8 = 32768 256\times1024\div8=32768 256×1024÷8=32768个桶,这个数量还是比较多的,实际上我们可以让不同范围的字节数按照不同的对齐数进行对齐,具体对齐方式如下:
字节数 | 对齐数 | 哈希桶下标 |
---|---|---|
[1,128] | 8 | [0,16) |
[128+1,1024] | 16 | [16,72) |
[1024+1,8*1024] | 128 | [72,128) |
[81024+1,641024] | 1024 | [128,184) |
[641024+1,2561024] | 8*1024 | [184,208) |
- 空间浪费率
虽然对齐产生的内碎片会引起一定程度的空间浪费,但按照上面的对齐规则,我们可以将浪费率控制到百分之十左右。需要说明的是,1~128这个区间我们不做讨论,因为1字节就算是对齐到2字节也有百分之五十的浪费率,这里我们就从第二个区间开始进行计算。
- 对齐和映射相关函数的编写
此时有了字节数的对齐规则后,我们就需要提供两个对应的函数,分别用于获取某一字节数对齐后的字节数,以及该字节数对应的哈希桶下标。关于处理对齐和映射的函数,我们可以将其封装到一个类当中。
//管理对齐和映射等关系
class SizeClass
{
public:
//获取向上对齐后的字节数
static inline size_t RoundUp(size_t bytes);
//获取对应哈希桶的下标
static inline size_t Index(size_t bytes);
};
需要注意的是,SizeClass类当中的成员函数最好设置为静态成员函数,否则我们在调用这些函数时就需要通过对象去调用,并且对于这些可能会频繁调用的函数,可以考虑将其设置为内联函数。
在获取某一字节数向上对齐后的字节数时,可以先判断该字节数属于哪一个区间,然后再通过调用一个子函数进行进一步处理。
//获取向上对齐后的字节数
static inline size_t RoundUp(size_t bytes)
{
if (bytes <= 128)
{
return _RoundUp(bytes, 8);
}
else if (bytes <= 1024)
{
return _RoundUp(bytes, 16);
}
else if (bytes <= 8 * 1024)
{
return _RoundUp(bytes, 128);
}
else if (bytes <= 64 * 1024)
{
return _RoundUp(bytes, 1024);
}
else if (bytes <= 256 * 1024)
{
return _RoundUp(bytes, 8 * 1024);
}
else
{
assert(false);
return -1;
}
}
此时我们就需要编写一个子函数,该子函数需要通过对齐数计算出某一字节数对齐后的字节数,最容易想到的就是下面这种写法。
//一般写法
static inline size_t _RoundUp(size_t bytes, size_t alignNum)
{
size_t alignSize = 0;
if (bytes%alignNum != 0)
{
alignSize = (bytes / alignNum + 1)*alignNum;
}
else
{
alignSize = bytes;
}
return alignSize;
}
除了上述写法,我们还可以通过位运算的方式来进行计算,虽然位运算可能并没有上面的写法容易理解,但计算机执行位运算的速度是比执行乘法和除法更快的。
//位运算写法
static inline size_t _RoundUp(size_t bytes, size_t alignNum)
{
return ((bytes + alignNum - 1)&~(alignNum - 1));
}
在获取某一字节数对应的哈希桶下标时,也是先判断该字节数属于哪一个区间,然后再通过调用一个子函数进行进一步处理。
//获取对应哈希桶的下标
static inline size_t Index(size_t bytes)
{
//每个区间有多少个自由链表
static size_t groupArray[4] = { 16, 56, 56, 56 };
if (bytes <= 128)
{
return _Index(bytes, 3);
}
else if (bytes <= 1024)
{
return _Index(bytes - 128, 4) + groupArray[0];
}
else if (bytes <= 8 * 1024)
{
return _Index(bytes - 1024, 7) + groupArray[0] + groupArray[1];
}
else if (bytes <= 64 * 1024)
{
return _Index(bytes - 8 * 1024, 10) + groupArray[0] + groupArray[1] + groupArray[2];
}
else if (bytes <= 256 * 1024)
{
return _Index(bytes - 64 * 1024, 13) + groupArray[0] + groupArray[1] + groupArray[2] + groupArray[3];
}
else
{
assert(false);
return -1;
}
}
此时我们需要编写一个子函数来继续进行处理,容易想到的就是根据对齐数来计算某一字节数对应的下标。
//一般写法
static inline size_t _Index(size_t bytes, size_t alignNum)
{
size_t index = 0;
if (bytes%alignNum != 0)
{
index = bytes / alignNum;
}
else
{
index = bytes / alignNum - 1;
}
return index;
}
当然,为了提高效率下面也提供了一个用位运算来解决的方法,需要注意的是,此时我们并不是传入该字节数的对齐数,而是将对齐数写成2的n次方的形式后,将这个n值进行传入。比如对齐数是8,传入的就是3。
//位运算写法
static inline size_t _Index(size_t bytes, size_t alignNum)
{
return ((bytes + (1 << alignShift) - 1) >> alignNum) - 1;
}
- ThreadCache类
按照上述的对齐规则,thread cache中桶的个数,也就是自由链表的个数是208,以及thread cache允许申请的最大内存大小256KB,我们可以将这些数据按照如下方式进行定义。
//小于等于MAX_BYTES,就找thread cache申请
//大于MAX_BYTES,就直接找page cache或者系统堆申请
static const size_t MAX_BYTES = 256 * 1024;
//thread cache和central cache自由链表哈希桶的表大小
static const size_t NFREELISTS = 208;
现在就可以对ThreadCache类进行定义了,thread cache就是一个存储208个自由链表的数组
class ThreadCache {
public:
//申请对象
void* Allocate(size_t size);
void* Deallocate(void* ptr,size_t size);
void* FetchFromCentralCache(size_t index,size_t size);
private:
FreeList _freeLists[NFREELISTS];//哈希桶
};
在thread cache申请对象时,通过所给字节数计算出对应的哈希桶下标,如果桶中自由链表不为空,则从该自由链表中取出一个对象进行返回即可;但如果此时自由链表为空,那么我们就需要从central cache进行获取了
//申请内存对象
void* ThreadCache::Allocate(size_t size)
{
assert(size <= MAX_BYTES);
size_t alignSize = SizeClass::RoundUp(size);
size_t index = SizeClass::Index(size);
if(!_freeLists[index].Empty())
{
return _freeLists[index].Pop();
}
else
{
return FetchFromCentralCache(index,alignSize);
}
}
🌷2.3 threadcacheTLS无锁访问
每个线程都有一个自己独享的thread cache,那应该如何创建这个thread cache呢?我们不能将这个thread cache创建为全局的,因为全局变量是所有线程共享的,这样就不可避免的需要锁来控制,增加了控制成本和代码复杂度。
要实现每个线程无锁的访问属于自己的thread cache,我们需要用到线程局部存储TLS(Thread Local Storage),这是一种变量的存储方法,使用该存储方法的变量在它所在的线程是全局可访问的,但是不能被其他线程访问到,这样就保持了数据的线程独立性。
//TLS - Thread Local Storage
static _declspec(thread) ThreadCache* pTLSThreadCache = nullptr;
但不是每个线程被创建时就立马有了属于自己的thread cache,而是当该线程调用相关申请内存的接口时才会创建自己的thread cache,因此在申请内存的函数中会包含以下逻辑。
//通过TLS,每个线程无锁的获取自己专属的ThreadCache对象
if (pTLSThreadCache == nullptr)
{
pTLSThreadCache = new ThreadCache;
}
💐3. 完整代码
🌷3.1 Common.hpp
//
// Common.h
// ThreadCache
//
// Created by 卜绎皓 on 2023/1/31.
//
#ifndef Common_h
#define Common_h
#endif /* Common_h */
#include <iostream>
#include <vector>
#include <assert.h>
#include <thread>
#include <time.h>
//小于等于MAX_BYTES,就找thread cache申请
//大于MAX_BYTES,就直接找page cache或者系统堆申请
static const size_t MAX_BYTES = 256 * 1024;
//thread cache和central cache自由链表哈希桶的表大小
static const size_t NFREELISTS = 208;
static void*& NextObj(void* obj)
{
return *(void**)obj;
}
class FreeList
{
public:
//将释放的头对象头插进自由链表
void Push(void* obj)
{
assert(obj);
//头插
NextObj(obj) = _freeList;
_freeList = obj;
}
//从自由链表头部获取对象
void* Pop()
{
assert(_freeList);
//头删
void* obj = _freeList;
_freeList = NextObj(_freeList);
return obj;
}
bool Empty()
{
return _freeList == nullptr;
}
private:
void* _freeList = nullptr;//自由链表
};
//管理对齐和映射等关系
class SizeClass
{
public:
static inline size_t _RoundUp(size_t bytes,size_t alignNum)
{
// //一般写法
// size_t alignSize = 0;
// if(bytes % alignNum != 0)
// {
// alignSize = (bytes / alignNum + 1)*alignNum;
// }
// else
// {
// alignSize = bytes;
// }
// return alignSize;
//位运算写法
return ((bytes + alignNum - 1)&~(alignNum-1));
}
//获取向上对齐后的字节数
static inline size_t RoundUp(size_t bytes)
{
if(bytes <= 128)
{
return _RoundUp(bytes, 8);
}
else if(bytes <= 1024)
{
return _RoundUp(bytes, 16);
}
else if(bytes <= 8*1024)
{
return _RoundUp(bytes, 128);
}
else if(bytes <= 64*1024)
{
return _RoundUp(bytes, 1024);
}
else if(bytes <= 256*1024)
{
return _RoundUp(bytes, 8*1024);
}
else{
assert(false);
return -1;
}
}
//获取对应哈希桶的下标
static inline size_t _Index(size_t bytes,size_t alignNum)
{
// //一般写法
// size_t index = 0;
// if(bytes % alignNum != 0)
// {
// index = bytes / alignNum;
// }
// else
// {
// index = bytes / alignNum - 1;
// }
// return index;
//位运算写法
return ((bytes + (1 << alignNum) - 1) >> alignNum) - 1;
}
//获取对应哈希桶的下标
static inline size_t Index(size_t bytes)
{
assert(bytes <= MAX_BYTES);
//每个区间有多少个自由链表
static size_t grounpArray[4] = {16,56,56,56};
if(bytes <= 128)
{
return _Index(bytes, 3);
}
else if(bytes <= 1024)
{
return _Index(bytes - 128, 4) + grounpArray[0];
}
else if(bytes <= 8*1024)
{
return _Index(bytes - 128, 7) + grounpArray[0] + grounpArray[1];
}
else if(bytes <= 64*1024)
{
return _Index(bytes - 128, 10) + grounpArray[0] + grounpArray[1] + grounpArray[2];
}
else if(bytes <= 256*1024)
{
return _Index(bytes - 128, 13) + grounpArray[0] + grounpArray[1] + grounpArray[2] + grounpArray[3];
}
else
{
assert(false);
return -1;
}
}
};
🌷3.2 ThreadCache.cpp
//
// ThreadCache.cpp
// ThreadCache
//
// Created by 卜绎皓 on 2023/1/31.
//
#include "ThreadCache.hpp"
//申请内存对象
void* ThreadCache::Allocate(size_t size)
{
assert(size <= MAX_BYTES);
size_t alignSize = SizeClass::RoundUp(size);
size_t index = SizeClass::Index(size);
if(!_freeLists[index].Empty())
{
return _freeLists[index].Pop();
}
else
{
return FetchFromCentralCache(index,alignSize);
}
}
🌷3.3 ConcurrentAlloc.hpp
//
// ConcurrentAlloc.h
// ThreadCache
//
// Created by 卜绎皓 on 2023/1/31.
//
#ifndef ConcurrentAlloc_h
#define ConcurrentAlloc_h
#endif /* ConcurrentAlloc_h */
#include "Common.hpp"
#include "ThreadCache.hpp"
static void* ConcurrentAlloc(size_t size)
{
//通过TLS,每个线程无锁的获取自己专属的ThreadCache对象
if (pTLSThreadCache == nullptr)
{
pTLSThreadCache = new ThreadCache;
}
cout << std::this_thread::get_id() << ":"<<pTLSThreadCache<<endl;
return pTLSThreadCache->Allocate(size);
}
static void ConcurrentFree(void* ptr, size_t size)
{
assert(pTLSThreadCache);
pTLSThreadCache->Deallocate(ptr, size);
//}
🌷3.4 ThreadCache.hpp
//
// ThreadCache.hpp
// ThreadCache
//
// Created by 卜绎皓 on 2023/1/31.
//
#ifndef ThreadCache_hpp
#define ThreadCache_hpp
#endif /* ThreadCache_hpp */
#include "Common.hpp"
class ThreadCache {
public:
//申请对象
void* Allocate(size_t size);
void* Deallocate(void* ptr,size_t size);
void* FetchFromCentralCache(size_t index,size_t size);
private:
FreeList _freeLists[NFREELISTS];//哈希桶
};
//TLS - Thread Local Storage
static _declspec(thread) ThreadCache* pTLSThreadCache = nullptr;