本篇将着重讲解LWLock涉及的主要API工作流程与实现原理,相关基础知识见回顾:postgres 源码解析50 LWLock轻量锁–1
API介绍
函数API | 功能 |
---|---|
CreateLWLocks | 分配LWLocks所需的内存并进行初始化 |
LWLockNewTrancheId | 分配新的Tranche ID,供用户使用Extension模块中的LWLocks |
LWLockAcquire | 获取指定的LWLock |
LWLockConditionalAcquire | 获取LWLock,与LWLockAcquire不同的是未获取到不等待直接返回 |
LWLockAttemptLock | 尝试获取LWLock |
LWLockRelease | 释放已申请的LWLock |
LWLockWakeup | 唤醒等待队列中LWLock |
LWLock的主要操作
1 LWLock的空间分配 (CreateLWLocks)
该函数为LWLocks分配内存空间并初始化,同时注册扩展NamedTranche。
执行流程:
1) 首先计算出LWLocks和NamedTranche所占用的内存空间,该过程定义在LWLockShmemSize函数中;
2)定义并初始化全局LWLocks数组首地址MainLWLockArray;
3) 调用 InitializeLWLocks函数初始化所有LWLocks, 主要是设置lock->state和lock->tranched_id字段信息;
4) 注册扩展LWLocks,即建立Tranche ID与Tranche Name之间的联系。
/*
* Allocate shmem space for the main LWLock array and all tranches and
* initialize it. We also register extension LWLock tranches here.
*/
void
CreateLWLocks(void)
{
StaticAssertStmt(LW_VAL_EXCLUSIVE > (uint32) MAX_BACKENDS,
"MAX_BACKENDS too big for lwlock.c");
StaticAssertStmt(sizeof(LWLock) <= LWLOCK_PADDED_SIZE,
"Miscalculated LWLock padding");
if (!IsUnderPostmaster)
{
Size spaceLocks = LWLockShmemSize();
int *LWLockCounter;
char *ptr;
/* Allocate space */
ptr = (char *) ShmemAlloc(spaceLocks);
/* Leave room for dynamic allocation of tranches */
ptr += sizeof(int);
/* Ensure desired alignment of LWLock array */
ptr += LWLOCK_PADDED_SIZE - ((uintptr_t) ptr) % LWLOCK_PADDED_SIZE;
MainLWLockArray = (LWLockPadded *) ptr;
/*
* Initialize the dynamic-allocation counter for tranches, which is
* stored just before the first LWLock.
*/
LWLockCounter = (int *) ((char *) MainLWLockArray - sizeof(int));
*LWLockCounter = LWTRANCHE_FIRST_USER_DEFINED;
/* Initialize all LWLocks */
InitializeLWLocks();
}
/* Register named extension LWLock tranches in the current process. */
for (int i = 0; i < NamedLWLockTrancheRequests; i++)
LWLockRegisterTranche(NamedLWLockTrancheArray[i].trancheId,
NamedLWLockTrancheArray[i].trancheName);
}
2 LWLock Tranche ID的分配 (LWLockNewTrancheId)
该函数实现了LWLock的分配,目的是向用户提供使用Extension模块中的轻量锁。
1 首先从共享内存中获取动态分配计数器LWLockCounter的地址;
2 申请spin_t自旋锁,该锁用于保护LWLock的分配过程;
3 自增分配计数器的值,释放锁;
4 最终返回该值作为新分配LWLock的全局标识Tranche ID;
/*
* Allocate a new tranche ID.
*/
int
LWLockNewTrancheId(void)
{
int result;
int *LWLockCounter;
LWLockCounter = (int *) ((char *) MainLWLockArray - sizeof(int));
SpinLockAcquire(ShmemLock);
result = (*LWLockCounter)++;
SpinLockRelease(ShmemLock);
return result;
}
3 LWLock锁的获取(LWLockAcquire)
1 安全性检查,pg规定每个进程最多持有200个轻量锁,若超过报错退出;
2 开中断,调用LWLockAttemptLock函数尝试获取轻量锁(第一次),该函数返回值有两种,一种是该锁处于非空闲状态导致未获取到,则需要等待,另一种是成功获取到锁无需等待;
3 根据2结果进行后续处理。如果不等待,则表明成功获取锁跳出循环,反之进入步骤4;
4 调用LWLockQueueSelf函数将该进程加入到锁的等待队列;
5 再次调用LWLockAttemptLock(第二次)函数尝试获取轻量锁,如果无需等待,则成功获取锁,并将自身进程从锁的等待队列中移除跳出循环;反之在锁队列中等待直至被其他进程唤醒,回到步骤2。
6 更新本进程持有的锁数目和锁模式。
注:从源代码可以看出,有两次相邻的LWLockAttemptLock调用,推测其原因为LWLock锁保护的临界区较小,锁的持有时间较短,可能在将自身加入锁等待队列期间,其他进程已释放该锁,在一定程度上能够减小等待时间(一种优化)。
4 LWLockConditionalAcquire
轻量锁获取的另一种接口函数为LWLockConditionalAcquire,该函数与LWLockAcquire的不同点在于获取不到锁时不等待直接返回。
5 LWLockAttemptLock
该函数的功能是尝试获取LWLock,其本质是通过一系列原子操作来实现指定模式下锁的获取。
1 首先调用 pg_atomic_read_u32读取该锁的状态信息old_state;
2 进入死循环
1)根据指定的锁模式判断锁是否处于空闲lock_free并更新desired_state(该变量初始值=old_state,更新操作是加上LW_VAL_EXCLUSIVE/LW_VAL_SHRAED);
2)如果成功调用pg_atomic_compare_exchange_u32函数实现上述old_stat与desired_state交换,则进一步根据lock_free标识判断:若lock_free = true, 则标记获取该LWLock并结束;反之,获取不到该锁并结束;
3)若未成功调用pg_atomic_compare_exchange_u32函数,则表明该锁已被其他人获取,需要再次循环重试。
/*
* Internal function that tries to atomically acquire the lwlock in the passed
* in mode.
*
* This function will not block waiting for a lock to become free - that's the
* callers job.
*
* Returns true if the lock isn't free and we need to wait.
*/
static bool
LWLockAttemptLock(LWLock *lock, LWLockMode mode)
{
uint32 old_state;
AssertArg(mode == LW_EXCLUSIVE || mode == LW_SHARED);
/*
* Read once outside the loop, later iterations will get the newer value
* via compare & exchange.
*/
old_state = pg_atomic_read_u32(&lock->state);
/* loop until we've determined whether we could acquire the lock or not */
while (true)
{
uint32 desired_state;
bool lock_free;
desired_state = old_state;
if (mode == LW_EXCLUSIVE)
{
lock_free = (old_state & LW_LOCK_MASK) == 0;
if (lock_free)
desired_state += LW_VAL_EXCLUSIVE;
}
else
{
lock_free = (old_state & LW_VAL_EXCLUSIVE) == 0;
if (lock_free)
desired_state += LW_VAL_SHARED;
}
/*
* Attempt to swap in the state we are expecting. If we didn't see
* lock to be free, that's just the old value. If we saw it as free,
* we'll attempt to mark it acquired. The reason that we always swap
* in the value is that this doubles as a memory barrier. We could try
* to be smarter and only swap in values if we saw the lock as free,
* but benchmark haven't shown it as beneficial so far.
*
* Retry if the value changed since we last looked at it.
*/
if (pg_atomic_compare_exchange_u32(&lock->state,
&old_state, desired_state))
{
if (lock_free)
{
/* Great! Got the lock. */
if (mode == LW_EXCLUSIVE)
lock->owner = MyProc;
return false;
}
else
return true; /* somebody else has the lock */
}
}
pg_unreachable();
}
LWLock锁的释放 (LWLockRelease)
LWLock锁的释放定义在LWLockRelease函数中,实现流程如:
1 首先检查该LWLock是否持有,如果不持有则报错退出;
2 将锁从进程自身的持锁队列中移除,更新持锁队列;
3 根据锁模式,调用pg_atomic_sub_fetch_u32函数清除锁标记表明锁释放;
4 如果有其他进程等待该锁,则需根据某种调度策略唤醒等待进程;
LWLockWakeup
该函数负责将锁等待队列中进程唤醒,其唤醒规则为;
1 如果等待队列中的第一个申请者申请的是排他锁,则只有该进程被唤醒,其他进程仍处于等待状态;
2 如果等待队列中的第一个申请者申请的是共享锁,那么所有申请共享锁的进程都可以被唤醒,只留下申请排他锁的进程;
唤醒策略示意图