共享内存
- 共享内存结构与接口定义
- nginx共享内存在操作系统上的兼容性设计
- 互斥锁
- 锁的结构体
- 锁的一系列操作(core/ngx_shmtx.c)
- 创建锁
- 原子操作
- nginx的上锁操作
- 尝试加锁
- 获取锁
- 释放锁
- 强迫解锁
- 唤醒等待进程
- slab共享内存块管理
- nginx的slab大小规格
- 内存池结构体
- 共享内存池结构体slots
- 分配共享内存池
共享内存结构与接口定义
正常来说,通过malloc函数申请的内存都是进程私有的内存但是Linux会提供共享内存的系统调用,如mmap和munmap等
Nginx基于Linux提供的系统调用,封装了共享内存的数据结构以及共享内存的创建与释放函数,其共享内存结构和接口定义如下:
os/unix/ngx_shmem.h
typedef struct {
u_char *addr; //指向申请的共享内存块首地址
size_t size; //共享内存块大小
ngx_str_t name; //共享内存块名字
ngx_log_t *log; //共享内存块日志
ngx_uint_t exists;//标志是否已经存在
} ngx_shm_t;//共享结构
//以下共享接口
ngx_int_t ngx_shm_alloc(ngx_shm_t *shm);//创建共享内存块
void ngx_shm_free(ngx_shm_t *shm);//释放共享内存块
ngx_int_tngx_shm_alloc(ngx_shm_t *shm)
{
shm->addr = (u_char *) mmap(NULL, shm->size, PROT_READ|PROT_WRITE, MAP_ANON|MAP_SHARED, -1, 0); //创建
if (shm->addr == MAP_FAILED) { //错误处理
ngx_log_error(NGX_LOG_ALERT, shm->log, ngx_errno, "mmap(MAP_ANON|MAP_SHARED, %uz) failed", shm->size);
return NGX_ERROR;
}
return NGX_OK;//成功,返回
}
void ngx_shm_free(ngx_shm_t *shm)
{
if (munmap((void *) shm->addr, shm->size) == -1)//是否成功
{ //失败处理,记录
ngx_log_error(NGX_LOG_ALERT, shm->log, ngx_errno, "munmap(%p, %uz) failed", shm->addr, shm->size);
}
}
nginx共享内存在操作系统上的兼容性设计
#if (NGX_HAVE_MAP_ANON)//匿名共享内存
………
#elif (NGX_HAVE_MAP_DEVZERO)//文件共享内存
………
#elif (NGX_HAVE_SYSVSHM)//IPC System V共享内存
………
#endif
互斥锁
- 并发进程访问共享内存时需要加锁。nginx提供了互斥锁的机制,保证了正确的共享内存的访问。nginx的进程主要是通过ngx_shmtx_t进行加锁、解锁等操作。
- nginx实现的时候,如果操作系统提供原子操作机制,就使用操作系统的原子操作实现互斥锁,否则nginx采用文件锁实现互斥。
互斥锁模型
锁的结构体
//core/ngx_shmtx.h
typedef struct {
ngx_atomic_t lock;//0为锁开(空闲),其它(进程号)已上锁
#if (NGX_HAVE_POSIX_SEM) //如果有SEM信号量
ngx_atomic_t wait;//等待共享内存进程总数
#endif
} ngx_shmtx_sh_t;
上锁、解锁的结构体模型:
typedef struct {
#if (NGX_HAVE_ATOMIC_OPS) //若有原子操作
…………
#if (NGX_HAVE_POSIX_SEM) //如果有信号量
…………
#endif
#else
…………
#endif
ngx_uint_t spin;
} ngx_shmtx_t;
typedef struct {
#if (NGX_HAVE_ATOMIC_OPS) //若有原子操作
ngx_atomic_t *lock;//进程内指向共享内存锁的地址
#if (NGX_HAVE_POSIX_SEM) //如果有信号量
ngx_atomic_t *wait; //指向共享内存等待进程总数
ngx_uint_t semaphore; //是否使用信号量,1使用
sem_t sem;//sem_t信号量,可用于线程之中,也可用于进程
#endif
#else //操作系统无原子操作和信号量支持,用文件
ngx_fd_t fd;
u_char *name;
#endif
ngx_uint_t spin;
} ngx_shmtx_t;
锁的一系列操作(core/ngx_shmtx.c)
- ngx_int_t ngx_shmtx_create(ngx_shmtx_t *mtx, ngx_shmtx_sh_t *addr, u_char *name);//创建锁
- void ngx_shmtx_destroy(ngx_shmtx_t *mtx);//销毁锁
- void ngx_shmtx_lock(ngx_shmtx_t *mtx);//获取锁
- ngx_uint_t ngx_shmtx_trylock(ngx_shmtx_t *mtx);//尝试加锁
- void ngx_shmtx_unlock(ngx_shmtx_t *mtx);//释放锁
- ngx_uint_t ngx_shmtx_force_unlock(ngx_shmtx_t *mtx, ngx_pid_t pid);//强制解锁
- static voidngx_shmtx_wakeup(ngx_shmtx_t *mtx)//唤醒等进程
临界区管理的基本思路
①找到临界区
②在临界区前面增加一段用于进行检查的代码,当不满足进入临界区的条件,就不进入,直到满足条件才进入,称为进入区(entry section)。
③在临界区后面加上一段称为离开区(exit section)的代码,作为善后处理。基本形式如下:
创建锁
ngx_int_t ngx_shmtx_create( ngx_shmtx_t *mtx,ngx_shmtx_sh_t *addr, u_char *name);//创建锁
//ngx_shmtx_t *mtx,是进程操作锁结构地址
//ngx_shmtx_sh_t *addr,是共享内存中保存的锁结构地址
//u_char *name,名字(用于区别不同锁)地址
{
mtx->lock = &addr->lock; //将共享内存锁信息储存到进程操作锁结构体中
if (mtx->spin == (ngx_uint_t) -1) { return NGX_OK; }
mtx->spin = 2048;//自旋次数指定
#if (NGX_HAVE_POSIX_SEM) //如果是信号量,初始化sem为1,并将semaphore设为1
mtx->wait = &addr->wait;
if (sem_init(&mtx->sem, 1, 0) == -1) {
ngx_log_error(NGX_LOG_ALERT, ngx_cycle->log, ngx_errno, "sem_init() failed"); }
else { mtx->semaphore = 1; }
#endif
return NGX_OK;
}
- 进程操作锁结构需要获得(保存)共享内存锁的信息,对于自旋锁,
1)保存共享内存锁的lock;
2设置自旋锁的自旋次数;以便于后续进行加锁、解锁等操作;
mtx->lock = &addr->lock;
if (mtx->spin == (ngx_uint_t) -1) { //已经加锁了 ?
return NGX_OK;
}
mtx->spin = 2048;//nginx设置的进程自旋次数
- 对于信号量,
1)保存共享内存锁的保存wait;
2设置信号量的semaphore或sem的值;以便于后续进行加锁、解锁等操作;
mtx->wait = &addr->wait; //保存指向保存共享内存进程总数指针
if (sem_init(&mtx->sem, 1, 0) == -1) //线程信号量初始化失败
{
ngx_log_error(NGX_LOG_ALERT, ngx_cycle->log, ngx_errno, "sem_init() failed");
}
else // 线程信号量初始化成功,初始化semaphore为1
{
mtx->semaphore = 1; //使用信号量
}
- 其它可能需要记录的调试信息以及可能的错误处理等
原子操作
计算机系统并发的基础
- 两个原子操作
ngx_atomic_cmp_set(a,old,new):如果*a==old,将*a赋值为new,返回1。否则返回0。
ngx_atomic_cmp_set(mtx->lock, 0, ngx_pid),(原子操作)若*mtx->lock为0,即将*mtx->lock赋值为ngx_pid。
ngx_atomic_fetch_add(old,v):将*old加上v,并返回*old。
ngx_atomic_fetch_add(mtx->wait, 1),将*mtx->wait加上1,并返回加之前的*mtx->wait值。
nginx的上锁操作
当共享内存lock为0(表示空闲)时可以上锁。对于上锁的操作,Nginx将其标准化为将lock(当其为0时)设为进程的PID。即
*mtx->lock == 0 && ngx_atomic_cmp_set(mtx->lock, 0, ngx_pid)。
尝试加锁
ngx_uint_tngx_shmtx_trylock(ngx_shmtx_t *mtx)
{
return (*mtx->lock == 0 && ngx_atomic_cmp_set(mtx->lock, 0, ngx_pid));
}
获取锁
- 进程自旋锁的获取
-
当共享内存lock为0(表示空闲)时可以上锁。对于上锁的操作,nginx将其标准化为将lock(当其为0时)设为进程的PID。即
*mtx->lock == 0 && ngx_atomic_cmp_set(mtx->lock, 0, ngx_pid)。 -
因为*mtx->lock 为0时,可能有很多进程都来上锁,但只能有一个进程会成功上锁。因此对上锁进程来讲,以上上锁操作可能不成功。
-
此时,当有多个CPU时,上锁进程可以等待一段T时间后,再次尝试上锁操作。Ngnix对T的构造有其独特的方法。
-
上锁失败,放弃使用CPU
-
void ngx_shmtx_lock(ngx_shmtx_t *mtx)//自旋锁
{
ngx_uint_t i, n; //初始化变量
ngx_log_debug0(NGX_LOG_DEBUG_CORE, ngx_cycle->log, 0, "shmtx lock");
for(;;){//不断循环进行
自旋方式加锁;
#if (NGX_HAVE_POSIX_SEM)
信号量方式加锁,wait记录等待共享进程总数,等待进程挂入sem等待队列;
#endif
ngx_sched_yield();//优化方式放弃CPU
}
}
//自旋式加锁
for ( ;; ) {
if (*mtx->lock == 0 && ngx_atomic_cmp_set(mtx->lock, 0, ngx_pid))
{ return; } //成功上锁返回
if (ngx_ncpu > 1) { //当有多个CPU时,等待T时间后,再次尝试上锁
for (n = 1; n < mtx->spin; n <<= 1){//构造等待时间T,再多次尝试上锁
{
for (i = 0; i < n; i++) { //每次都有等待时间T ,每次内循环等待次数不一样
ngx_cpu_pause(); //(借用CPU机制)优化自旋等待
}
if (*mtx->lock == 0 && ngx_atomic_cmp_set(mtx->lock, 0, ngx_pid))
{ return; } //再次尝试上锁若成功,则返回
}
}
ngx_sched_yield();//(优化)上锁失败,放弃使用CPU 。调度选中后,再次自旋上锁(为啥?)。
}
- 信号量处理锁的获取
如果是信号量:- 等待共享内存进程总数(预先)(原子性操作)加一;
- 当lock为0(表示空闲)时可以上锁。按照nginx标准化上锁操作,也就是将lock(当其为0时)设为进程的PID。即
*mtx->lock == 0 && ngx_atomic_cmp_set(mtx->lock, 0, ngx_pid)。
如果成功,将等待共享内存进程数减一(因已成功上锁,预计加需扣除),返回。
(void) ngx_atomic_fetch_add(mtx->wait, 1); //原子操作预加一
if (*mtx->lock == 0 && ngx_atomic_cmp_set(mtx->lock, 0, ngx_pid))
{ //上锁成功了
(void) ngx_atomic_fetch_add(mtx->wait, -1); //原子操作减一
return; //返回
}
- 如果上锁失败,将(该加锁进程)挂入sem的等待队列中。由于挂入sem的等待队列操作可能失败,为了确保1)中的加一操作与实际等待进程总数一致性,需要不断尝试挂入等待队列操作,直至成功挂入为止。否则数据将不一致。挂入等待队列的某进程,由释放锁某进程唤醒。
while (sem_wait(&mtx->sem) == -1) {//如果失败,再次进行挂入sem等待队列操作
ngx_err_t err;
err = ngx_errno; //获取错误原因
if (err != NGX_EINTR) { //若是系统原因,进行错误日志处理后,终止尝试
ngx_log_error(NGX_LOG_ALERT, ngx_cycle->log, err,
"sem_wait() failed while waiting on shmtx");
break;
}
}
ngx_log_debug0(NGX_LOG_DEBUG_CORE, ngx_cycle->log, 0, "shmtx awoke");
continue; //进行下一个循环
释放锁
void ngx_shmtx_unlock(ngx_shmtx_t *mtx);//释放锁
{
if (mtx->spin != (ngx_uint_t) -1) //调试信息处理
{ ngx_log_debug0(NGX_LOG_DEBUG_CORE, ngx_cycle->log, 0,
"shmtx unlock");
}
if (ngx_atomic_cmp_set(mtx->lock, ngx_pid, 0)) {//将lock设为0就是释放
ngx_shmtx_wakeup(mtx); //唤醒等待进程
}
}
强迫解锁
ngx_uint_tngx_shmtx_force_unlock(ngx_shmtx_t *mtx, ngx_pid_t pid)//强迫解锁
{
ngx_log_debug0(NGX_LOG_DEBUG_CORE, ngx_cycle->log, 0,
“shmtx forced unlock”); //记录调试信息
if (ngx_atomic_cmp_set(mtx->lock, pid, 0)) { //共享内存lock为0(空闲)
ngx_shmtx_wakeup(mtx); //唤醒等待共享进程进程
return 1;
}
return 0;//强制失败,返回0;
}
唤醒等待进程
- 如果有信号量支持://因为只有有信号量支持时,才有sem等待队列
- 如果没有标记使用信号量,(没有构造等待队列)返回。
#if (NGX_HAVE_POSIX_SEM)//由信号量支持
if (!mtx->semaphore)
{
return;
}
………//剩下的其它操作实现
#endif
- 不断使用ngx_atomic_cmp_set(mtx->wait, wait, wait - 1)对mtx->wait减一操作,直至成功将mtx->wait(原子操作)减一。
for ( ;; ) { //不断尝试进行以下方式原子操作减一
wait = *mtx->wait;
if ((ngx_atomic_int_t) wait <= 0)
{ return; } //没有等待共享内存进程,返回。
if (ngx_atomic_cmp_set(mtx->wait, wait, wait - 1))
{
break; //成功原子操作减一,终止尝试原子减一。
}
}
- 从sem等待队列中唤醒一个进程;
if (sem_post(&mtx->sem) == -1)
{ //失败唤醒一个进程,错误日志处理
ngx_log_error(NGX_LOG_ALERT, ngx_cycle->log, ngx_errno, "sem_post() failed while wake shmtx");
}
slab共享内存块管理
nginx设计与实现了一种基于slab理念的共享内存块机制,并提供了创建共享内存块、从共享内存块中申请与释放内存的API。其结构体包括(core/ngx_slab.h以及core/ngx_slab.c):
- ngx_slab_page_s:内存块管理结构体
ngx_slab_stat_t:内存页使用信息管理结构体
ngx_slab_pool_t:共享内存块结构体
typedef struct ngx_slab_page_s ngx_slab_page_t;
nginx的slab大小规格
内存池结构体
typedef struct ngx_slab_page_s ngx_slab_page_t;
struct ngx_slab_page_s
{
uintptr_t slab;
ngx_slab_page_t *next; //后向
uintptr_t prev;//前向
};
typedef struct {
ngx_uint_t total; //总数
ngx_uint_t used; //使用总数
ngx_uint_t reqs; //请求总数
ngx_uint_t fails;//失败总数
} ngx_slab_stat_t;
// 共享内存池结构体
typedef struct {
ngx_shmtx_sh_t lock; //内存锁
size_t min_size; //可以分配最小内存大小,即为8
size_t min_shift; //最小slab内存的幂数,即min_size=2^ min_shift
ngx_slab_page_t *pages; //指向第一页的管理结构
ngx_slab_page_t *last; //指向最后页的管理结构
ngx_slab_page_t free; //指向空闲首页的一个结点
ngx_slab_stat_t *stats; //指向记录各种规格slab统计信息链表
ngx_uint_t pfree; //空闲总页数
u_char *start; //空闲页始址
u_char *end;//空闲末址
ngx_shmtx_t mutex; //进程操作锁结构
u_char *log_ctx;
u_char zero;
unsigned log_nomem:1;
void *data;
void *addr;//共享内存池结构地址
} ngx_slab_pool_t;
共享内存池结构体slots
1.初始化共享内存池管理结构体各数据成员的值,理清控制管理关系。
2.分出控制管理结构后,剩余的即为可以共享分配的内存池。
管理不同规格的ngx_slab_page_t的首地址,nginx用宏ngx_slab_slots(pool)描述了这一大小位置关系:
(按情形)初始化共享池为0xA5(除共享池管理结构外)
初始化管理不同大小slab的ngx_slab_page_t
(按情形)初始化ngx_slab_stat_t
计算总页数pages
初始化pool的pages
初始化pool的free
初始化管理空页的首个ngx_slab_page_t
初始化pool的start
初始化pool的start,因对齐,修正总空闲数
初始化pool的其它成员
分配共享内存池
- 理论上,每个大小为KB的系统物理页,可以包含k/m个大小为mB规格的slab块。
- 为了标明一个系统物理页中含有的大小为mB规格slab块的占有情况,Nginx为每个系统物理页使用bitmap描述其含有的每个slab块是否空闲。
- 这样,每个大小为KB的系统物理页,需要k/m位描述其每个slab的空闲占有情况,如位1表示占有,如位0表示空闲。
- 对于小块内存(
大小8Byte~32Byte
),需要较多位(512b~128b
)。nginx在内存首页开辟固定区域,码放这些bitmap。 - 对于精确内存(大小为64Byte),需要64b。nginx使管理内存页的ngx_slab_page_t结构体的slab字段作为bitmap。
- 对于大块内存(
大小128Byte~2048Byte
),需要(32b~2b
)使用slab的前32位作为bitmap
void* ngx_slab_alloc(ngx_slab_pool_t *pool, size_t size)
{
void *p;
ngx_shmtx_lock(&pool->mutex);//互斥分配
p = ngx_slab_alloc_locked(pool, size);
ngx_shmtx_unlock(&pool->mutex); //互斥分配
return p;
}