10-pg内核之锁管理器(五)行锁

news2025/1/16 16:14:28

概念

数据库采用MVCC方式进行并发控制,读写并不会互相阻塞,但是写之间仍然存在冲突。如果还是采用常规锁那样加锁,则会耗费大量共享内存,进而影响性能。所以行锁通过元组级常规锁和xmax结合的方式实现。一般先通过xmax进行可见性判断,如果出现冲突,则等待的一方申请元组级常规锁,这样就能避免申请过多的锁。
行锁的加锁方式有两种:

  • 对元组进行DELETE/UPDATE操作
  • 显示指定加行锁

行锁加锁流程主要分以下两种:

  • 如果没有其他事务在更新或对该元组加锁,则当前事务占有该元组,并将xmax置为当前的事务ID;如果已经有事务正在更新该元组,即xmax不为0,且对应事务还未结束,则当前事务等待xmax中的事务结束
  • 如果已经有事务正在更新该元组,即xmax不为0,且对应事务还未结束,则当前事务需要等待xmax中的事务结束,并申请元组级常规锁;如果已经有事务持有该锁,那么还需要等待该锁释放,获取到元组级锁后再等待xmax中的事务释放。

在这里插入图片描述

显式加行锁

  • FOR UPDATE: 针对查询到的元组添加排它锁,此时如果有其他事务修改元组,就需要等待行锁释放才行。
  • FOR NO KEY UPDATE: 对元组加FOR KEY SHARE锁,对应LockTupleNoKey Exclusive
  • FOR ShARE:对元组加共享锁,共享锁之间互不冲突,对应LockTupleShare
  • FOR KEY SHARE: 它允许对非索引键值的列进行更新,但阻止对索引键值的更新操作,对应LockTuplekeyShare

行锁的相容性矩阵

类型FOR KEY SHAREFOR SHAREFOR NO KEY UPDATEFOR UPDATE
FOR KEY SHARE
FOR SHARE
FOR NO KEY UPDATE
FOR UPDATE

DELETE/UPDATE 加锁

状态类型子操作说明
DELETE加行级排他锁,对应LockTupleExclusive
UPDATE不修改键值加行级排他锁,对应LockTupleNoKeyExclusive
UPDATE修改键值加行级排他锁,对应LockTup了Exclusive

结构体

LockTupleMode

行锁的模式,共4个

typedef enum LockTupleMode
{
	/* SELECT FOR KEY SHARE */
	LockTupleKeyShare,
	/* SELECT FOR SHARE */
	LockTupleShare,
	/* SELECT FOR NO KEY UPDATE, and UPDATEs that don't modify key columns */
	LockTupleNoKeyExclusive,
	/* SELECT FOR UPDATE, UPDATEs that modify key columns, and DELETE */
	LockTupleExclusive
} LockTupleMode;

tupleLockExtraInfo

由于行级锁是由常规锁和xmax相结合实现的,对于常规锁来讲,它就需要建立LockTupleMode和常规锁之间的映射关系,映射关系由结构体tupleLockExtraInfo描述。

  • hwlock: 行级锁对应的常规锁模式,因为行级锁模式只有4种,所以其对应的常规锁中的四种:AccessShareLock,RowShareLock,ExclusiveLock,AccessExclusiveLock
  • lockstaus: 显式加锁时的锁模式,有4种,分别为:MultiXactStatusForKeyShare,MultiXactStatusForShare,MultiXactStatusForNoKeyUpdate,MultiXactStatusForUpdate
  • updstatus: 通过DELETE/UPDATE加锁时的锁模式,主要有两种:MultiXactStatusNoKeyUpdate,MultiXactStatusUpdate

上述映射的行锁模式由枚举类型MultiXactStatus描述,类型内前4个对应的是lockstatus的值,后两个对应的是updstatus的值

typedef enum
{
	MultiXactStatusForKeyShare = 0x00,
	MultiXactStatusForShare = 0x01,
	MultiXactStatusForNoKeyUpdate = 0x02,
	MultiXactStatusForUpdate = 0x03,
	/* an update that doesn't touch "key" columns */
	MultiXactStatusNoKeyUpdate = 0x04,
	/* other updates, and delete */
	MultiXactStatusUpdate = 0x05
} MultiXactStatus;

tupleLockExtraInfo描述成员有4个,其结构及成员如下:

static const struct
{
	LOCKMODE	hwlock; //常规锁
	int			lockstatus; //通过显式加锁指定的锁模式
	int			updstatus; //通过DELETE/UPDATE加锁时的锁模式
}

tupleLockExtraInfo[MaxLockTupleMode + 1] =
{
	{	/* LockTupleKeyShare对应的锁 */
		AccessShareLock,
		MultiXactStatusForKeyShare,
		-1	/* KeyShare does not allow updating tuples */
	},
	{	/* LockTupleShare对应的锁 */
		RowShareLock,
		MultiXactStatusForShare,
		-1	/* Share does not allow updating tuples */
	},
	{	/* LockTupleNoKeyExclusive对应的锁 */
		ExclusiveLock,
		MultiXactStatusForNoKeyUpdate,
		MultiXactStatusNoKeyUpdate
	},
	{	/* LockTupleExclusive对应的锁 */
		AccessExclusiveLock,
		MultiXactStatusForUpdate,
		MultiXactStatusUpdate
	}
};

MultiXactID

tupleLockExtraInfo结构的行锁的锁模式都是由MultiXactStatus来表示,那我们首先要了解一下MultiXactID。
正常情况下,如果只有一个事务对某个元组加锁,这时是用不到上述的行锁的,只用将事务ID保存到xmax,并设置一下t_infomask的值即可。但是如果有多个事务同时对元组加锁,xmax就无法准确的记录加锁的事务了,此时可以将多个事务ID的映射关系保存都MultiXact日志中,并生成一个MultiXactID保存到xmax中,这个就能通过MultiXactID从MultiXact日志中找到对应的事务ID的信息。MultiXact日志的信息可以参考 MultiXact日志管理器
需要注意的是,使用MultiXactID时,元组的infomask要添加HEAP_XMAX_IS_MultiXact标记。
只有一个事务对元组加锁时,只需要xmax和t_infomask标记就足以表示。
t_infomask中行锁相关的标记位如下:

  • HEAP_XMAX_EXCL_LOCK:表示加的有排他锁,FOR UPDATE和FOR NO KEY UPDATE共用该标记位。
  • HEAP_XMAX_KEYSHR_LOCK:是一个KEY共享锁,FOR KEY SHARE子句对应该标记。
  • HEAP_XMAX_LOCK_ONLY:只有显式加锁时对应该标记
  • HEAP_XMAX_SHR_LOCK: 共享锁,FOR SHARE子句对于该标记
  • HEAP_XMAX_IS_MULTI:表示多个事务对元组加锁
  • HEAP_KEYS_UPDATED: 元组键值要被更新时添加此标记,DELETE/UPDATE操作加锁时对应该标记

每种行锁模式对应的标记位组合为:

  • FOR UPDATE:HEAP_XMAX_EXCL_LOCK | HEAP_XMAX_LOCK_ONLY |HEAP_KEYS_UPDATED
  • FOR NO KEY UPDATE:HEAP_XMAX_EXCL_LOCK | HEAP_XMAX_LOCK_ONLY
  • FOR SHARE:HEAP_XMAX_SHR_LOCK | HEAP_XMAX_LOCK_ONLY
  • FOR KEY SHARE:HEAP_XMAX_SHR_LOCK | HEAP_XMAX_LOCK_ONLY
  • UPDATE:HEAP_XMAX_EXCL_LOCK | HEAP_KEYS_UPDATED
  • DELETE:HEAP_XMAX_EXCL_LOCK | HEAP_KEYS_UPDATED

主要函数

行锁的使用流程,可通过元组的更新/删除操作时的可见性判断来说明,以更新流程为例,在元组更新前,需要通过MVCC可见性判断函数HeapTupleSatisfiesUpdate来判断元组是否满足可以更新的条件,其中就会判断元组的加锁情况,并给出对应的结果。MVCC机制可以参考[MVCC机制](pg内核之事务管理器(五) MVCC)

HeapTupleSatisfiesUpdate

该函数是利用MVCC机制判断元组是否满足更新的条件,判断的结果有下面几类:

  • TM_OK: 元组可以更新
  • TM_Invisible: 元组不可见,不能被更新
  • TM_SelfModified: 元组已经被当前事务更新过
  • TM_Updated:元组已经被其他事务更新过,且已提交
  • TM_Deleted:元组已经被其他事务删除,且已提交,不能更新
  • TM_BeingModified: 元组正在被其他事务更新
  • TM_WouldBlock:支持行锁的Skip Lock功能

在这里插入图片描述

HeapTupleSatisfiesUpdate函数流程

TM_Result
HeapTupleSatisfiesUpdate(HeapTuple htup, CommandId curcid,
						 Buffer buffer)
{
	HeapTupleHeader tuple = htup->t_data; //获取到元组头信息

	Assert(ItemPointerIsValid(&htup->t_self));
	Assert(htup->t_tableOid != InvalidOid);

	if (!HeapTupleHeaderXminCommitted(tuple))//判断xmin是否提交
	{
		if (HeapTupleHeaderXminInvalid(tuple))//xmin无效,元组无效
			return TM_Invisible;
		else if (TransactionIdIsCurrentTransactionId(HeapTupleHeaderGetRawXmin(tuple)))//如果xmin是为当前事务
		{
			if (HeapTupleHeaderGetCmin(tuple) >= curcid) //插入在快照之后,不可见
				return TM_Invisible;	/* inserted after scan started */

			if (tuple->t_infomask & HEAP_XMAX_INVALID)	/* 无其他事务更新或对该元组加锁 */
				return TM_Ok;

			if (HEAP_XMAX_IS_LOCKED_ONLY(tuple->t_infomask))//该元组被加锁
			{
				TransactionId xmax;

				xmax = HeapTupleHeaderGetRawXmax(tuple); //获取到xmax的值
				if (tuple->t_infomask & HEAP_XMAX_IS_MULTI) //有多个事务对元组加锁
				{
					if (MultiXactIdIsRunning(xmax, true)) //加锁事务正在运行中
						return TM_BeingModified;
					else //加锁事务已结束
						return TM_Ok;
				}
				if (!TransactionIdIsInProgress(xmax))//xmax已结束,可更新
					return TM_Ok;
				return TM_BeingModified; //正在被更新
			}

			if (tuple->t_infomask & HEAP_XMAX_IS_MULTI)//xmax是MultiXact
			{
				TransactionId xmax;

				xmax = HeapTupleGetUpdateXid(tuple);//获取更新元组的事务ID
				if (!TransactionIdIsCurrentTransactionId(xmax))//更新的事务不是当前事务
				{
					if (MultiXactIdIsRunning(HeapTupleHeaderGetRawXmax(tuple),
											 false))//更新事务在运行中
						return TM_BeingModified;
					return TM_Ok;
				}
				else
				{
					if (HeapTupleHeaderGetCmax(tuple) >= curcid) //更新在快照后
						return TM_SelfModified; /* updated after scan started */
					else//更新在快照前
						return TM_Invisible;	/* updated before scan started */
				}
			}

			if (!TransactionIdIsCurrentTransactionId(HeapTupleHeaderGetRawXmax(tuple)))//xmax不是当前事务
			{
				SetHintBits(tuple, buffer, HEAP_XMAX_INVALID,
							InvalidTransactionId);
				return TM_Ok;
			}

			if (HeapTupleHeaderGetCmax(tuple) >= curcid)//更新在快照后
				return TM_SelfModified; /* updated after scan started */
			else//更新在快照前
				return TM_Invisible;	/* updated before scan started */
		}
		else if (TransactionIdIsInProgress(HeapTupleHeaderGetRawXmin(tuple)))//xmin不是当前事务且仍在运行中
			return TM_Invisible;
		else if (TransactionIdDidCommit(HeapTupleHeaderGetRawXmin(tuple))) //xmin不是当前事务且已提交
			SetHintBits(tuple, buffer, HEAP_XMIN_COMMITTED,
						HeapTupleHeaderGetRawXmin(tuple));
		else//其他情况
		{
			/* it must have aborted or crashed */
			SetHintBits(tuple, buffer, HEAP_XMIN_INVALID,
						InvalidTransactionId);
			return TM_Invisible;
		}
	}
	//到这里说明插入元组的事务已经提交,下面是判断xmax的状态

	if (tuple->t_infomask & HEAP_XMAX_INVALID)	/* 元组还未被更新或加锁 */
		return TM_Ok;

	if (tuple->t_infomask & HEAP_XMAX_COMMITTED)//xmax已提交
	{
		if (HEAP_XMAX_IS_LOCKED_ONLY(tuple->t_infomask))//xmax只是加锁
			return TM_Ok;
		if (!ItemPointerEquals(&htup->t_self, &tuple->t_ctid)) //被其他事务更新
			return TM_Updated;	/* updated by other */
		else //被其他事务删除
			return TM_Deleted;	/* deleted by other */
	}

	if (tuple->t_infomask & HEAP_XMAX_IS_MULTI) //xmax是MultiXactID
	{
		TransactionId xmax;

		if (HEAP_LOCKED_UPGRADED(tuple->t_infomask)) //是否是FOR SHARE锁
			return TM_Ok;

		if (HEAP_XMAX_IS_LOCKED_ONLY(tuple->t_infomask)) //xmax仅是加锁
		{
			if (MultiXactIdIsRunning(HeapTupleHeaderGetRawXmax(tuple), true))//xmax正在运行中
				return TM_BeingModified;

			SetHintBits(tuple, buffer, HEAP_XMAX_INVALID, InvalidTransactionId);
			return TM_Ok;
		}

		xmax = HeapTupleGetUpdateXid(tuple);//获取更新的事务ID
		if (!TransactionIdIsValid(xmax)) //xmax无效
		{
			if (MultiXactIdIsRunning(HeapTupleHeaderGetRawXmax(tuple), false))//xmax正在运行中
				return TM_BeingModified;
		}

		/* not LOCKED_ONLY, so it has to have an xmax */
		Assert(TransactionIdIsValid(xmax));

		if (TransactionIdIsCurrentTransactionId(xmax))//xmax是当前事务
		{
			if (HeapTupleHeaderGetCmax(tuple) >= curcid)//更新在快照后
				return TM_SelfModified; /* updated after scan started */
			else//更新在快照前
				return TM_Invisible;	/* updated before scan started */
		}

		if (MultiXactIdIsRunning(HeapTupleHeaderGetRawXmax(tuple), false))//xmax正在运行中
			return TM_BeingModified;

		if (TransactionIdDidCommit(xmax))//xmax已提交
		{
			if (!ItemPointerEquals(&htup->t_self, &tuple->t_ctid)) //是UPDATE
				return TM_Updated;
			else //是DELETE
				return TM_Deleted;
		}

		/* 到这里的话表名xmax已经异常终止了   */
		if (!MultiXactIdIsRunning(HeapTupleHeaderGetRawXmax(tuple), false))//xmax没有在运行
		{
			SetHintBits(tuple, buffer, HEAP_XMAX_INVALID,
						InvalidTransactionId);
			return TM_Ok;
		}
		else//xmax在运行
		{
			return TM_BeingModified;
		}
	}

	if (TransactionIdIsCurrentTransactionId(HeapTupleHeaderGetRawXmax(tuple)))//xmax是当前事务
	{
		if (HEAP_XMAX_IS_LOCKED_ONLY(tuple->t_infomask))//只有加锁
			return TM_BeingModified;
		if (HeapTupleHeaderGetCmax(tuple) >= curcid) //更新在快照后
			return TM_SelfModified; /* updated after scan started */
		else//更新在快照前
			return TM_Invisible;	/* updated before scan started */
	}

	if (TransactionIdIsInProgress(HeapTupleHeaderGetRawXmax(tuple)))//xmax正在运行中
		return TM_BeingModified;

	if (!TransactionIdDidCommit(HeapTupleHeaderGetRawXmax(tuple)))//xmax未提交
	{
		SetHintBits(tuple, buffer, HEAP_XMAX_INVALID,
					InvalidTransactionId);
		return TM_Ok;
	}

	/* xmax transaction committed */
//到这里xmax也已经提交
	if (HEAP_XMAX_IS_LOCKED_ONLY(tuple->t_infomask))
	{
		SetHintBits(tuple, buffer, HEAP_XMAX_INVALID,
					InvalidTransactionId);
		return TM_Ok;
	}

	SetHintBits(tuple, buffer, HEAP_XMAX_COMMITTED,
				HeapTupleHeaderGetRawXmax(tuple));
	if (!ItemPointerEquals(&htup->t_self, &tuple->t_ctid))//是更新操作
		return TM_Updated;		/* updated by other */
	else//是删除操作
		return TM_Deleted;		/* deleted by other */
}

heap_update

以heap_update为例,因为是执行的UPDATE操作,所以行锁的锁模式只有两种,流程如下:

  • 根据是否有主键更新,选择不同的锁模式
//如果没有主键的更新的话,可以申请弱一点的行锁,否则申请级别更高的行锁
if (!bms_overlap(modified_attrs, key_attrs))
	{
		*lockmode = LockTupleNoKeyExclusive;
		mxact_status = MultiXactStatusNoKeyUpdate;
		key_intact = true;
		MultiXactIdSetOldestMember();
	}
	else
	{
		*lockmode = LockTupleExclusive;
		mxact_status = MultiXactStatusUpdate;
		key_intact = false;
	}

  • 调用HeapTupleSatisfiesUpdate函数判断当前元组的状态是否可更新
result = HeapTupleSatisfiesUpdate(&oldtup, cid, buffer); //判断元组可见性,MVCC
  • 如果元组状态为TM_BeingModified,说明元组正在被更新,需要执行下面操作进行加锁。
else if (result == TM_BeingModified && wait) //元组正在被修改,切wait为TRUE时,需要等待
  • 如果是MultiXact,判断MultiXact是否与当前要等到的模式冲突,如果冲突,申请一把行锁,然后等待MultiXact结束(一直自旋),MultiXact结束后,如果元组状态发生了变化,则需要回到第二步继续判断。
if (infomask & HEAP_XMAX_IS_MULTI) //是multixact
		{
			TransactionId update_xact;
			int			remain;
			bool		current_is_member = false;

			if (DoesMultiXactIdConflict((MultiXactId) xwait, infomask,
										*lockmode, &current_is_member)) //multi是否与当前要等待的模式冲突
			{
				LockBuffer(buffer, BUFFER_LOCK_UNLOCK);//睡眠浅需要先释放buffer的锁
				if (!current_is_member) //如果我们还没有持有锁,则申请一把行锁
					heap_acquire_tuplock(relation, &(oldtup.t_self), *lockmode,
										 LockWaitBlock, &have_tuple_lock);

				/* wait for multixact,等待multi结束,自旋查询,直到结束 */
				MultiXactIdWait((MultiXactId) xwait, mxact_status, infomask,
								relation, &oldtup.t_self, XLTW_Update,
								&remain);
				checked_lockers = true;
				locker_remains = remain != 0;
				//此时等待的xmax已经结束
				LockBuffer(buffer, BUFFER_LOCK_EXCLUSIVE);//重新申请buffer的锁

				/*
				 如果其他人又修改了这个元组,也就是说xmax活t_infomask发生了变化,那么需要重新进行检查
				 */
				if (xmax_infomask_changed(oldtup.t_data->t_infomask,
										  infomask) ||
					!TransactionIdEquals(HeapTupleHeaderGetRawXmax(oldtup.t_data),
										 xwait))
					goto l2;
			}
			if (!HEAP_XMAX_IS_LOCKED_ONLY(oldtup.t_data->t_infomask)) 
				update_xact = HeapTupleGetUpdateXid(oldtup.t_data); //如果还是有元组在更新该元组,获取其xmax
			else
				update_xact = InvalidTransactionId; //没有

			/*
			 此时已经没有update操作再multi中了,或者它已经终止了,可以继续
			 */
			if (!TransactionIdIsValid(update_xact) ||
				TransactionIdDidAbort(update_xact))
				can_continue = true;
		}
  • 如果正在更新该元组的事务就是当前事务,那么就不用重新申请锁了,可以继续操作

else if (TransactionIdIsCurrentTransactionId(xwait)) //如果正在更新该元组的事务就是当前事务,那么就不用重新申请锁了,可以继续往下操作了
		{
			checked_lockers = true;
			locker_remains = true;
			can_continue = true;
		}
  • 如果该元组当前存在的是key share锁,并且我们也不会修改主键行,说明不冲突,可以继续执行
		else if (HEAP_XMAX_IS_KEYSHR_LOCKED(infomask) && key_intact) //如果只是存在key-share锁,并且我们也不会修改主键行,那么这里就也不需要等待了
		{
			checked_lockers = true;
			locker_remains = true;
			can_continue = true;
		}
  • 如果上面的情况都不是,说明当前只有一个事务在更新,我们只需要等待xmax事务结束就可以了,等待前需要先申请元组级常规锁,等待结束后,如果元组状态发生了变化,则需要重新回到第二步判断元组状态。
		else
		{
			/*
			 这里就是常规等待xmax结束了,不过等待之前会先申请一把行锁
			 */
			LockBuffer(buffer, BUFFER_LOCK_UNLOCK); //睡眠之前先释放buffer上的锁
			heap_acquire_tuplock(relation, &(oldtup.t_self), *lockmode,
								 LockWaitBlock, &have_tuple_lock); //申请一把行锁
			XactLockTableWait(xwait, relation, &oldtup.t_self, 
							  XLTW_Update);//等待xmax结束
			checked_lockers = true;
			//xmax已经结束,重新申请buffer的锁
			LockBuffer(buffer, BUFFER_LOCK_EXCLUSIVE);

			/*
			 如果元组的xmax和infomask已经发生改变,重新检查
			 */
			if (xmax_infomask_changed(oldtup.t_data->t_infomask, infomask) ||
				!TransactionIdEquals(xwait,
									 HeapTupleHeaderGetRawXmax(oldtup.t_data)))
				goto l2;

			/* 如果存在其他情况,检查是否已经提交了*/
			UpdateXmaxHintBits(oldtup.t_data, buffer, xwait);
			if (oldtup.t_data->t_infomask & HEAP_XMAX_INVALID)
				can_continue = true;
		}
  • 满足更新元组的条件,进行元组更新操作,更新后释放元组锁
	if (have_tuple_lock)
		UnlockTupleTuplock(relation, &(oldtup.t_self), *lockmode);

XactLockTableWait

等待指定的事务结束,会以死循环的方式等待,等待间隔1000us,直到要等待的事务结束为止。


	for (;;)
	{
		SET_LOCKTAG_TRANSACTION(tag, xid);
		(void) LockAcquire(&tag, ShareLock, false, false);//以共享模式申请一把事务锁
		LockRelease(&tag, ShareLock, false);//释放事务锁
		if (!TransactionIdIsInProgress(xid))//如果事务结束,则退出循环
			break;
		if (!first)
			pg_usleep(1000L);
		first = false;
		xid = SubTransGetTopmostTransaction(xid);//如果是子事务,就获取其顶层的父事务
	}

ConditionalXactLockTableWait

等待指定的事务结束,不会阻塞

bool
ConditionalXactLockTableWait(TransactionId xid)
{
	LOCKTAG		tag;
	bool		first = true;
	for (;;)
	{
		SET_LOCKTAG_TRANSACTION(tag, xid);
		if (LockAcquire(&tag, ShareLock, false, true) == LOCKACQUIRE_NOT_AVAIL)//尝试获取事务锁,成功与否都直接返回结果
			return false;
		LockRelease(&tag, ShareLock, false);//获取到锁,表示事务已经结束,释放锁。
		if (!TransactionIdIsInProgress(xid))//判断是否已经结束
			break;
		if (!first)
			pg_usleep(1000L);
		first = false;
		xid = SubTransGetTopmostTransaction(xid);//获取子事务顶层父事务
	}

	return true;
}

LockTupleTuplock/ConditionalLockTuple

根据行锁的模式获取对应的常规锁模式,并调用对应函数锁表

#define LockTupleTuplock(rel, tup, mode) \
	LockTuple((rel), (tup), tupleLockExtraInfo[mode].hwlock)
#define UnlockTupleTuplock(rel, tup, mode) \
	UnlockTuple((rel), (tup), tupleLockExtraInfo[mode].hwlock)
#define ConditionalLockTupleTuplock(rel, tup, mode) \
	ConditionalLockTuple((rel), (tup), tupleLockExtraInfo[mode].hwlock)

LockTuple/ConditionalLockTuple

申请元组级常规锁

void
LockTuple(Relation relation, ItemPointer tid, LOCKMODE lockmode)
{
	LOCKTAG		tag;

	SET_LOCKTAG_TUPLE(tag,
					  relation->rd_lockInfo.lockRelId.dbId,
					  relation->rd_lockInfo.lockRelId.relId,
					  ItemPointerGetBlockNumber(tid),
					  ItemPointerGetOffsetNumber(tid));//设置锁的tag

	(void) LockAcquire(&tag, lockmode, false, false);//申请锁
}

bool
ConditionalLockTuple(Relation relation, ItemPointer tid, LOCKMODE lockmode)
{
	LOCKTAG		tag;

	SET_LOCKTAG_TUPLE(tag,
					  relation->rd_lockInfo.lockRelId.dbId,
					  relation->rd_lockInfo.lockRelId.relId,
					  ItemPointerGetBlockNumber(tid),
					  ItemPointerGetOffsetNumber(tid));//设置锁模式

	return (LockAcquire(&tag, lockmode, false, true) != LOCKACQUIRE_NOT_AVAIL);//非阻塞模式申请锁
}

DoesMultiXactIdConflict

判断MultiXact是否与当前的事务冲突。

static bool
DoesMultiXactIdConflict(MultiXactId multi, uint16 infomask,
						LockTupleMode lockmode, bool *current_is_member)
{
	int			nmembers;
	MultiXactMember *members;
	bool		result = false;
	LOCKMODE	wanted = tupleLockExtraInfo[lockmode].hwlock;

	if (HEAP_LOCKED_UPGRADED(infomask))
		return false;

	nmembers = GetMultiXactIdMembers(multi, &members, false,
									 HEAP_XMAX_IS_LOCKED_ONLY(infomask));//获取MultiXact中的事务信息
	if (nmembers >= 0)
	{
		int			i;

		for (i = 0; i < nmembers; i++)//遍历每个事务
		{
			TransactionId memxid;
			LOCKMODE	memlockmode;

			if (result && (current_is_member == NULL || *current_is_member))
				break;

			memlockmode = LOCKMODE_from_mxstatus(members[i].status);//获取member的事务状态

			memxid = members[i].xid;
			if (TransactionIdIsCurrentTransactionId(memxid))//跳过当前事务
			{
				if (current_is_member != NULL)
					*current_is_member = true;
				continue;
			}
			else if (result)
				continue;
			if (!DoLockModesConflict(memlockmode, wanted))//与当前事务模式不冲突,跳过
				continue;

			if (ISUPDATE_from_mxstatus(members[i].status))//是更新操作
			{
				if (TransactionIdDidAbort(memxid))//member事务已经abort
					continue;
			}
			else
			{
				if (!TransactionIdIsInProgress(memxid))//member事务进程已经结束
					continue;
			}
			result = true;
		}
		pfree(members);
	}

	return result;
}

Do_MultiXactIdWait

等待MultiXact包含的所有事务结束

tatic bool
Do_MultiXactIdWait(MultiXactId multi, MultiXactStatus status,
				   uint16 infomask, bool nowait,
				   Relation rel, ItemPointer ctid, XLTW_Oper oper,
				   int *remaining)
{
	bool		result = true;
	MultiXactMember *members;
	int			nmembers;
	int			remain = 0;
	nmembers = HEAP_LOCKED_UPGRADED(infomask) ? -1 :
		GetMultiXactIdMembers(multi, &members, false,
							  HEAP_XMAX_IS_LOCKED_ONLY(infomask));//获取MultiXact的members信息

	if (nmembers >= 0)
	{
		int			i;

		for (i = 0; i < nmembers; i++)//遍历每个member事务
		{
			TransactionId memxid = members[i].xid;
			MultiXactStatus memstatus = members[i].status;

			if (TransactionIdIsCurrentTransactionId(memxid))//跳过当前事务
			{
				remain++;
				continue;
			}

			if (!DoLockModesConflict(LOCKMODE_from_mxstatus(memstatus),
									 LOCKMODE_from_mxstatus(status)))//是否与当前事务冲突,不冲突跳过
			{
				if (remaining && TransactionIdIsInProgress(memxid))
					remain++;
				continue;
			}
			if (nowait)//如果冲突且需要不需要等待
			{
				result = ConditionalXactLockTableWait(memxid);//判断事务是否结束,立即返回结果
				if (!result)
					break;
			}
			else
				XactLockTableWait(memxid, rel, ctid, oper);//等待事务结束
		}

		pfree(members);
	}

	if (remaining)
		*remaining = remain;

	return result;
}
 

heap_lock_tuple

以共享模式或排他模式锁住一个元组

  • 调用HeapTupleSatisfiesUpdate是否满足更新条件,只有结果为TM_BeingModified、TM_UPdated、TM_Deleted时才需要处理。
	result = HeapTupleSatisfiesUpdate(tuple, cid, *buffer);//判断是否满足更新条件

	if (result == TM_Invisible)
	{
		result = TM_Invisible;
		goto out_locked;
	}
	else if (result == TM_BeingModified ||
			 result == TM_Updated ||
			 result == TM_Deleted)
	{
  • 因为可能会循环从上一步开始检查,所以有些需要第一次才需要执行的操作,操作流程如下:

    • 如果是MultiXact,遍历每个member事务,如果存在比当前要申请的锁模式更高的锁,则不比再申请了,直接调到结束即可;如果没有就设置skip_tuple_lock为true,后面就不需要再申请更高级的锁了,因为可能会与其他已经申请了强锁但是在等待我们事务结束的事务产生死锁冲突,这里只用等待MultiXact事务结束即可
     if (infomask & HEAP_XMAX_IS_MULTI)//是MultiXact
     		{
     			int			i;
     			int			nmembers;
     			MultiXactMember *members;
     			nmembers =
     				GetMultiXactIdMembers(xwait, &members, false,
     									  HEAP_XMAX_IS_LOCKED_ONLY(infomask));//获取MultiXact的成员
    
     			for (i = 0; i < nmembers; i++)
     			{
     				if (!TransactionIdIsCurrentTransactionId(members[i].xid))//跳过当前的事务ID
     					continue;
    
     				if (TUPLOCK_from_mxstatus(members[i].status) >= mode)//如果成员的权限大于当前要申请的,直接结束
     				{
     					pfree(members);
     					result = TM_Ok;
     					goto out_unlocked;
     				}
     				else
     				{
     					skip_tuple_lock = true;
     				}
     			}
    
     			if (members)
     				pfree(members);
     		}    
    
    • 如果xmax就是当前的事务,这里判断一下锁模式与对应的t_infomask标记是否正确,正确的话直接调到结束位置即可
    		else if (TransactionIdIsCurrentTransactionId(xwait))//如果xmax是当前事务,根据锁模式判断t_infomask标记
    		{
    			switch (mode)
    			{
    				case LockTupleKeyShare:
    					Assert(HEAP_XMAX_IS_KEYSHR_LOCKED(infomask) ||
    						   HEAP_XMAX_IS_SHR_LOCKED(infomask) ||
    						   HEAP_XMAX_IS_EXCL_LOCKED(infomask));
    					result = TM_Ok;
    					goto out_unlocked;
    				case LockTupleShare:
    					if (HEAP_XMAX_IS_SHR_LOCKED(infomask) ||
    						HEAP_XMAX_IS_EXCL_LOCKED(infomask))
    					{
    						result = TM_Ok;
    						goto out_unlocked;
    					}
    					break;
    				case LockTupleNoKeyExclusive:
    					if (HEAP_XMAX_IS_EXCL_LOCKED(infomask))
    					{
    						result = TM_Ok;
    						goto out_unlocked;
    					}
    					break;
    				case LockTupleExclusive:
    					if (HEAP_XMAX_IS_EXCL_LOCKED(infomask) &&
    						infomask2 & HEAP_KEYS_UPDATED)
    					{
    						result = TM_Ok;
    						goto out_unlocked;
    					}
    					break;
    			}
    		}	
    
  • 我们不得不等待持锁事务结束,下面根据锁模式不同,进行不同的处理

    • 申请的是LockTupleKeyShare锁
      • 如果没有键值被更新,则与申请的锁不冲突,但是如果还是有更新操作,且入参fllow_updates为true(表示要锁定更新的版本链),还需要调用heap_lock_updated_tuple函数锁住更新后的元组。
      			if (!(infomask2 & HEAP_KEYS_UPDATED))//没有键值被更新,不冲突
      	{
      		bool		updated;
      		updated = !HEAP_XMAX_IS_LOCKED_ONLY(infomask);//还有更新操作
      
      		if (follow_updates && updated)//要跟踪更新链并锁住后面的元组
      		{
      			TM_Result	res;
      
      			res = heap_lock_updated_tuple(relation, tuple, &t_ctid,
      										  GetCurrentTransactionId(),
      										  mode);//锁住更新链后面的元组
      			if (res != TM_Ok)
      			{
      				result = res;
      				LockBuffer(*buffer, BUFFER_LOCK_EXCLUSIVE);
      				goto failed;
      			}
      		}
      
      • 再次检查,如果元组事务中还有更新操作,且有键值更新,则回到第一步重新遍历
      				if (!HeapTupleHeaderIsOnlyLocked(tuple->t_data) &&
      			((tuple->t_data->t_infomask2 & HEAP_KEYS_UPDATED) ||
      			 !updated))//元组不仅仅被锁住,且有键值更新,则重新遍历
      			goto l3;
      
      • 没有冲突,可以不用睡眠等待其他事务结束,require_sleep设置为false
      require_sleep = false;
      
    • 申请的是LockTupleShare锁
      如果当前元组只是被锁住而没有排他锁,则也不需要睡眠等待
    		else if (mode == LockTupleShare)//申请的是SHARE 锁
    	{
    		if (HEAP_XMAX_IS_LOCKED_ONLY(infomask) &&
    			!HEAP_XMAX_IS_EXCL_LOCKED(infomask))//只是被锁住且没有排他锁
    		{
    			LockBuffer(*buffer, BUFFER_LOCK_EXCLUSIVE);
    			if (!HEAP_XMAX_IS_LOCKED_ONLY(tuple->t_data->t_infomask) ||
    				HEAP_XMAX_IS_EXCL_LOCKED(tuple->t_data->t_infomask))//还有更新操作或有排他锁,重新检查
    				goto l3;
    			require_sleep = false;
    		}  
    
    • 申请的是LockTupleNoKeyExclusive锁
      • 如果是MultiXact,则需要判断MultiXact所有成员事务是否与当前事务申请的锁冲突,如果不冲突的话,也不需要睡眠
      • 如果是单一的事务且时Key SHARE锁,也不需要睡眠
    			if (infomask & HEAP_XMAX_IS_MULTI)//是MultiXact
    		{
    			if (!DoesMultiXactIdConflict((MultiXactId) xwait, infomask,
    										 mode, NULL))//判断MultiXact是否与当前事务申请的锁冲突,不冲突的话
    			{
    				LockBuffer(*buffer, BUFFER_LOCK_EXCLUSIVE);
    				if (xmax_infomask_changed(tuple->t_data->t_infomask, infomask) ||
    					!TransactionIdEquals(HeapTupleHeaderGetRawXmax(tuple->t_data),
    										 xwait))//如果xmax更新过或者跟当前的不一致,则重新检查
    					goto l3;
    				require_sleep = false;
    			}
    		}
    		else if (HEAP_XMAX_IS_KEYSHR_LOCKED(infomask))//如果添加的有keyshare锁
    		{
    			LockBuffer(*buffer, BUFFER_LOCK_EXCLUSIVE);
    			if (xmax_infomask_changed(tuple->t_data->t_infomask, infomask) ||
    				!TransactionIdEquals(HeapTupleHeaderGetRawXmax(tuple->t_data),
    									 xwait))//如果xmax有变更过,重新检查
    				goto l3;
    			require_sleep = false;
    		}
    
  • 如果当前事务是该元组的唯一持锁拥有者,我们也可以避免睡眠

		if (require_sleep && !(infomask & HEAP_XMAX_IS_MULTI) &&
			TransactionIdIsCurrentTransactionId(xwait))
		{
			/* ... but if the xmax changed in the meantime, start over */
			LockBuffer(*buffer, BUFFER_LOCK_EXCLUSIVE);
			if (xmax_infomask_changed(tuple->t_data->t_infomask, infomask) ||
				!TransactionIdEquals(HeapTupleHeaderGetRawXmax(tuple->t_data),
									 xwait))
				goto l3;
			Assert(HEAP_XMAX_IS_LOCKED_ONLY(tuple->t_data->t_infomask));
			require_sleep = false;
		}
  • 如果元组已经被其他事务删除或更新且事务已经提交,而require_sleep又是true的话,直接调到出错处理地方
	 //如果元组已经被其他事务删除或更新且已经提交,这里还要求睡眠的话就直接失败
		if (require_sleep && (result == TM_Updated || result == TM_Deleted))
		{
			LockBuffer(*buffer, BUFFER_LOCK_EXCLUSIVE);
			goto failed;
		}
  • 如果需要睡眠等待,执行下面操作
    • 调用heap_acquire_tuplock函数尝试申请元组级常规锁,如果申请失败,跳到申请失败位置
    		if (!skip_tuple_lock &&
    			!heap_acquire_tuplock(relation, tid, mode, wait_policy,
    								  &have_tuple_lock))//尝试申请元组级锁
    		{
    			result = TM_WouldBlock;
    			LockBuffer(*buffer, BUFFER_LOCK_EXCLUSIVE);
    			goto failed;
    		}
    
    • 如果是MultiXact,根据等待策略,执行对应的等待函数,等待MultiXact的事务结束
      • 如果是LockWaitBlock,则调用MultiXactIdWait函数阻塞等待事务结束
      • 如果是LockWaitSkip,则调用ConditionalMultiXactIdWait返回查询结果,不会等待
      • 如果是LockWaitError,则调用ConditionalMultiXactIdWait返回查询结果,如果无法获取到锁,则报错退出。
    			if (infomask & HEAP_XMAX_IS_MULTI)
    		{
    			MultiXactStatus status = get_mxact_status_for_lock(mode, false);
    
    			if (status >= MultiXactStatusNoKeyUpdate)
    				elog(ERROR, "invalid lock mode in heap_lock_tuple");
    
    			//根据更待策略,去等待锁
    			switch (wait_policy)
    			{
    				case LockWaitBlock:
    					MultiXactIdWait((MultiXactId) xwait, status, infomask,
    									relation, &tuple->t_self, XLTW_Lock, NULL);//等待MultiXact包含的事务结束,阻塞等待
    					break;
    				case LockWaitSkip:
    					if (!ConditionalMultiXactIdWait((MultiXactId) xwait,
    													status, infomask, relation,
    													NULL))//等待MultiXact包含的事务结束,不阻塞等待,立即返回结果
    					{
    						result = TM_WouldBlock;
    						LockBuffer(*buffer, BUFFER_LOCK_EXCLUSIVE);
    						goto failed;
    					}
    					break;
    				case LockWaitError:
    					if (!ConditionalMultiXactIdWait((MultiXactId) xwait,
    													status, infomask, relation,
    													NULL))//等待MultiXact包含的事务结束,不阻塞等待,立即返回结果,失败的直接报错退出
    						ereport(ERROR,
    								(errcode(ERRCODE_LOCK_NOT_AVAILABLE),
    								 errmsg("could not obtain lock on row in relation \"%s\"",
    										RelationGetRelationName(relation))));
    
    					break;
    			} 
    
    • 单一事务的情况,就是等待锁释放,根据等待策略,执行对应的等待函数
      • 如果是LockWaitBlock,则调用XactLockTableWait函数阻塞等待事务结束
      • 如果是LockWaitSkip,则调用ConditionalXactLockTableWait返回查询结果,不会等待
      • 如果是LockWaitError,则调用ConditionalXactLockTableWait返回查询结果,如果无法获取到锁,则报错退出。
    			else//等待锁释放
    		{
    			/* wait for regular transaction to end, or die trying */
    			switch (wait_policy)
    			{
    				case LockWaitBlock:
    					XactLockTableWait(xwait, relation, &tuple->t_self,
    									  XLTW_Lock);//阻塞等待xmax对应的事务结束
    					break;
    				case LockWaitSkip:
    					if (!ConditionalXactLockTableWait(xwait))//不阻塞等待xmax对应的事务结束
    					{
    						result = TM_WouldBlock;
    						/* recovery code expects to have buffer lock held */
    						LockBuffer(*buffer, BUFFER_LOCK_EXCLUSIVE);
    						goto failed;
    					}
    					break;
    				case LockWaitError:
    					if (!ConditionalXactLockTableWait(xwait))//不阻塞等待xmax对应的事务结束,失败就报错退出
    						ereport(ERROR,
    								(errcode(ERRCODE_LOCK_NOT_AVAILABLE),
    								 errmsg("could not obtain lock on row in relation \"%s\"",
    										RelationGetRelationName(relation))));
    					break;
    			}
    		} 
    
    • 如果事务有更新操作,且follow_updates为true,则需要调用heap_lock_updated_tuple函数,锁住更新后的元组
    		//如果 有更新,且需要跟踪并锁定更新的版本链
    		if (follow_updates && !HEAP_XMAX_IS_LOCKED_ONLY(infomask))
    		{
    			TM_Result	res;
    
    			res = heap_lock_updated_tuple(relation, tuple, &t_ctid,
    										  GetCurrentTransactionId(),
    										  mode);//锁住更新的版本链
    			if (res != TM_Ok)
    			{
    				result = res;
    				/* recovery code expects to have buffer lock held */
    				LockBuffer(*buffer, BUFFER_LOCK_EXCLUSIVE);
    				goto failed;
    			}
    		}
    
    • 再次判断xmax是否有变更,如果有,需要返回第一步重新检查
    • xmax事务已经结束,更新元组的标记位
    		if (xmax_infomask_changed(tuple->t_data->t_infomask, infomask) ||
    			!TransactionIdEquals(HeapTupleHeaderGetRawXmax(tuple->t_data),
    								 xwait))//xmax有更新且不是自己  回去继续循环
    			goto l3;
    
    		if (!(infomask & HEAP_XMAX_IS_MULTI))//更新xmax为当前的事务,更新标记位
    		{
    			UpdateXmaxHintBits(tuple->t_data, *buffer, xwait);
    		}
    
  • 到这里,我们就已经确定拿到了排他锁了,然后根据元组的状态返回结果
    • 如果xmax是0,或者元组只是被上锁,返回TM_OK
    • 如果当前元组的ctid与t_self不一致,表名元组已经被更新过,返回TM_Updated
    • 如果元组已经被删除,返回TM_deleted
		if (!require_sleep ||
			(tuple->t_data->t_infomask & HEAP_XMAX_INVALID) ||
			HEAP_XMAX_IS_LOCKED_ONLY(tuple->t_data->t_infomask) ||
			HeapTupleHeaderIsOnlyLocked(tuple->t_data))
			result = TM_Ok;
		else if (!ItemPointerEquals(&tuple->t_self, &tuple->t_data->t_ctid))
			result = TM_Updated;
		else
			result = TM_Deleted;
  • failed的异常处理
    • 更新元组信息
    • 重新固定vm页
    • 重新计算xmax值或者MultiXactID
    • 更新元组头信息
    • 缓冲区标记为脏
    • 写WAL日志
if (result != TM_Ok)
	{
		tmfd->ctid = tuple->t_data->t_ctid;
		tmfd->xmax = HeapTupleHeaderGetUpdateXid(tuple->t_data);
		if (result == TM_SelfModified)
			tmfd->cmax = HeapTupleHeaderGetCmax(tuple->t_data);
		else
			tmfd->cmax = InvalidCommandId;
		goto out_locked;
	}
	if (vmbuffer == InvalidBuffer && PageIsAllVisible(page))
	{
		LockBuffer(*buffer, BUFFER_LOCK_UNLOCK);
		visibilitymap_pin(relation, block, &vmbuffer);
		LockBuffer(*buffer, BUFFER_LOCK_EXCLUSIVE);
		goto l3;
	}

	xmax = HeapTupleHeaderGetRawXmax(tuple->t_data);
	old_infomask = tuple->t_data->t_infomask;
	MultiXactIdSetOldestMember();
	compute_new_xmax_infomask(xmax, old_infomask, tuple->t_data->t_infomask2,
							  GetCurrentTransactionId(), mode, false,
							  &xid, &new_infomask, &new_infomask2);//重新计算xmax或MultiXact

	START_CRIT_SECTION();
	 //更新元组头信息
	tuple->t_data->t_infomask &= ~HEAP_XMAX_BITS;
	tuple->t_data->t_infomask2 &= ~HEAP_KEYS_UPDATED;
	tuple->t_data->t_infomask |= new_infomask;
	tuple->t_data->t_infomask2 |= new_infomask2;
	if (HEAP_XMAX_IS_LOCKED_ONLY(new_infomask))
		HeapTupleHeaderClearHotUpdated(tuple->t_data);
	HeapTupleHeaderSetXmax(tuple->t_data, xid);
	if (HEAP_XMAX_IS_LOCKED_ONLY(new_infomask))
		tuple->t_data->t_ctid = *tid;
	if (PageIsAllVisible(page) &&
		visibilitymap_clear(relation, block, vmbuffer,
							VISIBILITYMAP_ALL_FROZEN))
		cleared_all_frozen = true;
	MarkBufferDirty(*buffer);//缓冲区标记为脏
	if (RelationNeedsWAL(relation))//写XLOG日志
	{
		xl_heap_lock xlrec;
		XLogRecPtr	recptr;

		XLogBeginInsert();
		XLogRegisterBuffer(0, *buffer, REGBUF_STANDARD);

		xlrec.offnum = ItemPointerGetOffsetNumber(&tuple->t_self);
		xlrec.locking_xid = xid;
		xlrec.infobits_set = compute_infobits(new_infomask,
											  tuple->t_data->t_infomask2);
		xlrec.flags = cleared_all_frozen ? XLH_LOCK_ALL_FROZEN_CLEARED : 0;
		XLogRegisterData((char *) &xlrec, SizeOfHeapLock);
		recptr = XLogInsert(RM_HEAP_ID, XLOG_HEAP_LOCK);
		PageSetLSN(page, recptr);
	}

	END_CRIT_SECTION();

	result = TM_Ok;

  • 解锁缓冲区,如果有tuple lock的话,解锁
out_unlocked:
	if (BufferIsValid(vmbuffer))
		ReleaseBuffer(vmbuffer);

	/*
	 * Don't update the visibility map here. Locking a tuple doesn't change
	 * visibility info.
	 */

	/*
	 * Now that we have successfully marked the tuple as locked, we can
	 * release the lmgr tuple lock, if we had it.
	 */
	if (have_tuple_lock)
		UnlockTupleTuplock(relation, tid, mode);

heap_lock_updated_tuple

在更新后的元组上申请锁,如果元组还没有被更新或者挪到其他分区中,就停止,否则就调用heap_lock_updated_tuple_rec申请锁

heap_lock_updated_tuple_rec

遍历找出最新的元组版本然后上锁

static TM_Result
heap_lock_updated_tuple_rec(Relation rel, ItemPointer tid, TransactionId xid,
							LockTupleMode mode)
{
	TM_Result	result;
	ItemPointerData tupid;
	HeapTupleData mytup;
	Buffer		buf;
	uint16		new_infomask,
				new_infomask2,
				old_infomask,
				old_infomask2;
	TransactionId xmax,
				new_xmax;
	TransactionId priorXmax = InvalidTransactionId;
	bool		cleared_all_frozen = false;
	bool		pinned_desired_page;
	Buffer		vmbuffer = InvalidBuffer;
	BlockNumber block;

	ItemPointerCopy(tid, &tupid);

	for (;;)
	{
		new_infomask = 0;
		new_xmax = InvalidTransactionId;
		block = ItemPointerGetBlockNumber(&tupid);
		ItemPointerCopy(&tupid, &(mytup.t_self));

		if (!heap_fetch(rel, SnapshotAny, &mytup, &buf))//通过ctid找到对应的元组
		{
			result = TM_Ok;
			goto out_unlocked;
		}

l4:
		CHECK_FOR_INTERRUPTS();
		if (PageIsAllVisible(BufferGetPage(buf)))
		{
			visibilitymap_pin(rel, block, &vmbuffer);//钉住vm
			pinned_desired_page = true;
		}
		else
			pinned_desired_page = false;

		LockBuffer(buf, BUFFER_LOCK_EXCLUSIVE);//buffer上锁
		if (!pinned_desired_page && PageIsAllVisible(BufferGetPage(buf)))
		{
			LockBuffer(buf, BUFFER_LOCK_UNLOCK);
			visibilitymap_pin(rel, block, &vmbuffer);//重新盯一下
			LockBuffer(buf, BUFFER_LOCK_EXCLUSIVE);
		}
		if (TransactionIdIsValid(priorXmax) &&
			!TransactionIdEquals(HeapTupleHeaderGetXmin(mytup.t_data),
								 priorXmax))//判断xmax是否为0,为0表示到达最新的元组位置,判断当前元组xmin是否等于上一版本的xmax
		{
			result = TM_Ok;//到达最新位置,返回
			goto out_locked;
		}
		if (TransactionIdDidAbort(HeapTupleHeaderGetXmin(mytup.t_data)))//该元组无效,返回
		{
			result = TM_Ok;
			goto out_locked;
		}

		old_infomask = mytup.t_data->t_infomask;
		old_infomask2 = mytup.t_data->t_infomask2;
		xmax = HeapTupleHeaderGetRawXmax(mytup.t_data);

		/*
		 如果当前元组已经被更新或者被一些正在运行的事务锁住,我们需要判断那些事务是否与我们要持有的锁冲突
		 */
		if (!(old_infomask & HEAP_XMAX_INVALID))//已经被更新
		{
			TransactionId rawxmax;
			bool		needwait;

			rawxmax = HeapTupleHeaderGetRawXmax(mytup.t_data);//获取xmax
			if (old_infomask & HEAP_XMAX_IS_MULTI)//如果是MultiXact
			{
				int			nmembers;
				int			i;
				MultiXactMember *members;
				Assert(!HEAP_LOCKED_UPGRADED(mytup.t_data->t_infomask));

				nmembers = GetMultiXactIdMembers(rawxmax, &members, false,
												 HEAP_XMAX_IS_LOCKED_ONLY(old_infomask));
				for (i = 0; i < nmembers; i++)//遍历每个member事务,检测是否与当前要持有的锁冲突
				{
					result = test_lockmode_for_conflict(members[i].status,
														members[i].xid,
														mode,
														&mytup,
														&needwait);//检查锁模式是否与member事务冲突

					if (result == TM_SelfModified)//已经被当前事务修改过了,下一个
					{
						pfree(members);
						goto next;
					}

					if (needwait)//需要等待持锁事务结束
					{
						LockBuffer(buf, BUFFER_LOCK_UNLOCK);
						XactLockTableWait(members[i].xid, rel,
										  &mytup.t_self,
										  XLTW_LockUpdated);
						pfree(members);
						goto l4;
					}
					if (result != TM_Ok)//不冲突
					{
						pfree(members);
						goto out_locked;
					}
				}
				if (members)
					pfree(members);
			}
			else//单一事务情况
			{
				MultiXactStatus status;
				if (HEAP_XMAX_IS_LOCKED_ONLY(old_infomask))//根据标记位判断锁模式
				{
					if (HEAP_XMAX_IS_KEYSHR_LOCKED(old_infomask))
						status = MultiXactStatusForKeyShare;
					else if (HEAP_XMAX_IS_SHR_LOCKED(old_infomask))
						status = MultiXactStatusForShare;
					else if (HEAP_XMAX_IS_EXCL_LOCKED(old_infomask))
					{
						if (old_infomask2 & HEAP_KEYS_UPDATED)
							status = MultiXactStatusForUpdate;
						else
							status = MultiXactStatusForNoKeyUpdate;
					}
					else
					{
						elog(ERROR, "invalid lock status in tuple");
					}
				}
				else
				{
					/* it's an update, but which kind? */
					if (old_infomask2 & HEAP_KEYS_UPDATED)
						status = MultiXactStatusUpdate;
					else
						status = MultiXactStatusNoKeyUpdate;
				}

				result = test_lockmode_for_conflict(status, rawxmax, mode,
													&mytup, &needwait);//检测是否冲突
				if (result == TM_SelfModified)//被当前事务修改过
					goto next;

				if (needwait)//需要等待事务结束
				{
					LockBuffer(buf, BUFFER_LOCK_UNLOCK);
					XactLockTableWait(rawxmax, rel, &mytup.t_self,
									  XLTW_LockUpdated);
					goto l4;
				}
				if (result != TM_Ok)//Ok
				{
					goto out_locked;
				}
			}
		}

		/* compute the new Xmax and infomask values for the tuple ... */
		compute_new_xmax_infomask(xmax, old_infomask, mytup.t_data->t_infomask2,
								  xid, mode, false,
								  &new_xmax, &new_infomask, &new_infomask2);//重新计算xmax

		if (PageIsAllVisible(BufferGetPage(buf)) &&
			visibilitymap_clear(rel, block, vmbuffer,
								VISIBILITYMAP_ALL_FROZEN))
			cleared_all_frozen = true;

		START_CRIT_SECTION();

		/* ... and set them 更新标记位*/
		HeapTupleHeaderSetXmax(mytup.t_data, new_xmax);
		mytup.t_data->t_infomask &= ~HEAP_XMAX_BITS;
		mytup.t_data->t_infomask2 &= ~HEAP_KEYS_UPDATED;
		mytup.t_data->t_infomask |= new_infomask;
		mytup.t_data->t_infomask2 |= new_infomask2;

		MarkBufferDirty(buf);//buffer标记为脏

		/* XLOG stuff */
		if (RelationNeedsWAL(rel))//写WAL日志
		{
			xl_heap_lock_updated xlrec;
			XLogRecPtr	recptr;
			Page		page = BufferGetPage(buf);

			XLogBeginInsert();
			XLogRegisterBuffer(0, buf, REGBUF_STANDARD);

			xlrec.offnum = ItemPointerGetOffsetNumber(&mytup.t_self);
			xlrec.xmax = new_xmax;
			xlrec.infobits_set = compute_infobits(new_infomask, new_infomask2);
			xlrec.flags =
				cleared_all_frozen ? XLH_LOCK_ALL_FROZEN_CLEARED : 0;

			XLogRegisterData((char *) &xlrec, SizeOfHeapLockUpdated);

			recptr = XLogInsert(RM_HEAP2_ID, XLOG_HEAP2_LOCK_UPDATED);

			PageSetLSN(page, recptr);
		}

		END_CRIT_SECTION();

next:
		/* if we find the end of update chain, we're done. */
		if (mytup.t_data->t_infomask & HEAP_XMAX_INVALID ||
			HeapTupleHeaderIndicatesMovedPartitions(mytup.t_data) ||
			ItemPointerEquals(&mytup.t_self, &mytup.t_data->t_ctid) ||
			HeapTupleHeaderIsOnlyLocked(mytup.t_data))
		{
			result = TM_Ok;
			goto out_locked;
		}

		/* tail recursion */
		priorXmax = HeapTupleHeaderGetUpdateXid(mytup.t_data);
		ItemPointerCopy(&(mytup.t_data->t_ctid), &tupid);//切换到下一个版本元组(即ctid指向的下一个元组)
		UnlockReleaseBuffer(buf);//缓冲区解压
	}

	result = TM_Ok;

out_locked:
	UnlockReleaseBuffer(buf);

out_unlocked:
	if (vmbuffer != InvalidBuffer)
		ReleaseBuffer(vmbuffer);//释放锁

	return result;
}

test_lockmode_for_conflict

检测给定的事务与给定的锁模式是否冲突

 static TM_Result
test_lockmode_for_conflict(MultiXactStatus status, TransactionId xid,
						   LockTupleMode mode, HeapTuple tup,
						   bool *needwait)
{
	MultiXactStatus wantedstatus;

	*needwait = false;
	wantedstatus = get_mxact_status_for_lock(mode, false);//获取锁模式对应的行锁模式
	if (TransactionIdIsCurrentTransactionId(xid))//如果就是当前事务,直接返回被自己修改过即可
	{
		return TM_SelfModified;
	}
	else if (TransactionIdIsInProgress(xid))//如果事务正在运行中
	{
		if (DoLockModesConflict(LOCKMODE_from_mxstatus(status),
								LOCKMODE_from_mxstatus(wantedstatus)))//检查是否与我们依赖的锁模式冲突
		{
			*needwait = true;
		}
		return TM_Ok;
	}
	else if (TransactionIdDidAbort(xid))//是否如果已经异常终止,不冲突
		return TM_Ok;
	else if (TransactionIdDidCommit(xid))//如果事务已经提交
	{
		if (!ISUPDATE_from_mxstatus(status))//不是元组更新的事务,不冲突
			return TM_Ok;

		if (DoLockModesConflict(LOCKMODE_from_mxstatus(status),
								LOCKMODE_from_mxstatus(wantedstatus)))//判断是否与锁模式冲突
		{
			if (!ItemPointerEquals(&tup->t_self, &tup->t_data->t_ctid))//ctid不一致,已经被更新
				return TM_Updated;
			else//已经被删除
				return TM_Deleted;
		}

		return TM_Ok;
	}
	return TM_Ok;
}

【参考】

  1. 《PostgreSQL数据库内核分析》
  2. 《Postgresql技术内幕-事务处理深度探索》
  3. 《PostgreSQL指南:内幕探索》
  4. pg14源码

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/2165910.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

Unity 新导航寻路演示(2)

对于静态场景来说&#xff0c;只需要3步 1.为场景Ground添加网格表面组件并烘焙 2.为player添加导航代理 using System.Collections; using System.Collections.Generic; using UnityEngine; using UnityEngine.AI;public class PlayerMove : MonoBehaviour {private NavMes…

2D动画转3D角色!无需建模- comfyUI工作流一键生成3d效果图!

如何将2d角色转化成3d角色&#xff1f; 不需要建模&#xff0c;通过一个2d转3d的工作流可以直接将你的2d图片转化成3d效果图。 而且操作特别简单&#xff0c;只需要3个步骤&#xff0c;这篇内容我们来说下这个工作路的使用 工作流特点 任意2D图片转换成3D风格 基于sd1.5模型…

ftdi_sio驱动学习笔记 3 - 端口操作

目录 1. ftdi_port_probe 1.1 私有数据结构ftdi_private 1.2 特殊probe处理 1.3 确定FTDI设备类型 1.4 确定最大数据包大小 1.5 设置读取延迟时间 1.6 初始化GPIO 1.6.1 使能GPIO 1.6.2 添加到系统 1.6.2.1 设置GPIO控制器的基本信息 1.6.2.2 设置GPIO控制器的元信息…

Apache Iceberg 与 Spark整合-使用教程(Iceberg 官方文档解析)

官方文档链接&#xff08;Spark整合Iceberg&#xff09; 1.Getting Started Spark 目前是进行 Iceberg 操作最丰富的计算引擎。官方建议从 Spark 开始&#xff0c;以理解 Iceberg 的概念和功能。 The latest version of Iceberg is 1.6.1.&#xff08;2024年9月24日11:45:55&…

如何在云端使用 Browserless 进行网页抓取?

云浏览器是什么&#xff1f; 云浏览器是一种基于云的组合&#xff0c;它将网页浏览器应用程序与一个虚拟化的容器相结合&#xff0c;实现了远程浏览器隔离的概念。开发人员可以使用流行的工具&#xff08;如 Playwright 和​ Puppeteer​&#xff09;来自动化网页浏览器&#…

repo 查看指定日期内,哪些仓库有修改,具体的修改详情

文章目录 想看指定时间段内仓库中修改了哪些具体的文件&#xff0c;是谁修改的&#xff0c;commit的备注信息等详情只想看某段时间内有更改的仓库的修改详情&#xff0c;其他没有修改的仓库不显示。 想看指定时间段内仓库中修改了哪些具体的文件&#xff0c;是谁修改的&#xf…

VSCode#include头文件时找不到头文件:我的解决方法

0.前言 1.在学习了Linux之后&#xff0c;我平常大部分都使用本地的XShell或者VSCode连接远程云服务器写代码&#xff0c;CentOS的包管理器为我省去了不少繁琐的事情&#xff0c;今天使用vscode打开本地目录想写点代码发现#include头文件后&#xff0c;下方出现了波浪线&#…

SparkSQL-初识

一、概览 Spark SQL and DataFrames - Spark 3.5.2 Documentation 我们先看下官网的描述&#xff1a; SparkSQL是用于结构化数据处理的Spark模块&#xff0c;与基本的Spark RDD API不同。Spark SQL提供的接口为Spark提供了更多关于正在执行的数据和计算结构的信息。在内部&a…

C++中vector类的使用

目录 1.vector类常用接口说明 1.1默认成员函数 1.1.1构造函数(constructor) 1.1.2 赋值运算符重载(operator()) 2. vector对象的访问及遍历操作(Iterators and Element access) 3.vector类对象的容量操作(Capacity) 4. vector类对象的修改及相关操作(Modifiers and Stri…

【Java数据结构】 ---对象的比较

乐观学习&#xff0c;乐观生活&#xff0c;才能不断前进啊&#xff01;&#xff01;&#xff01; 我的主页&#xff1a;optimistic_chen 我的专栏&#xff1a;c语言 &#xff0c;Java 欢迎大家访问~ 创作不易&#xff0c;大佬们点赞鼓励下吧~ 前言 上图中&#xff0c;线性表、堆…

[Redis][主从复制][上]详细讲解

目录 0.前言1.配置1.建立复制2.断开复制3.安全性4.只读5.传输延迟 2.拓扑1.一主一从结构2.一主多从结构2.树形主从结构 0.前言 说明&#xff1a;该章节相关操作不需要记忆&#xff0c;理解流程和原理即可&#xff0c;用的时候能自主查到即可主从复制&#xff1f; 分布式系统中…

PyTorch自定义学习率调度器实现指南

在深度学习训练过程中&#xff0c;学习率调度器扮演着至关重要的角色。这主要是因为在训练的不同阶段&#xff0c;模型的学习动态会发生显著变化。 在训练初期&#xff0c;损失函数通常呈现剧烈波动&#xff0c;梯度值较大且不稳定。此阶段的主要目标是在优化空间中快速接近某…

ResNet残差网络:深度学习的里程碑

引言 在深度学习领域&#xff0c;卷积神经网络&#xff08;CNN&#xff09;的发展一直推动着图像识别、目标检测等任务的进步。然而&#xff0c;随着网络层数的增加&#xff0c;传统的CNN面临着梯度消失和梯度爆炸等难题&#xff0c;限制了深层网络的训练效果。为了克服这些挑…

oracle direct path read处理过程

文章目录 缘起处理过程1.AWR Report 分析2.调查direct path read发生的table3.获取sql text4.解释sql并输出执行计划&#xff1a; 结论&#xff1a;补充direct path read等待事件说明 缘起 记录direct path read处理过程 处理过程 1.AWR Report 分析 问题发生时间段awr如下…

FortiGate OSPF动态路由协议配置

1.目的 本文档针对 FortiGate 的 OSPF 动态路由协议说明。OSPF 路由协议是一种 典型的链路状态(Link-state)的路由协议,一般用于同一个路由域内。在这里,路由 域是指一个自治系统,即 AS,它是指一组通过统一的路由政策或路由协议互相交 换路由信息的网络。在这个 AS 中,所有的 …

基于JSP+Servlet+Layui实现的博客系统

> 这是一个使用 Java 和 JSP 开发的博客系统&#xff0c;并使用 Layui 作为前端框架。 > 它包含多种功能&#xff0c;比如文章发布、评论管理、用户管理等。 > 它非常适合作为 Java 初学者的练习项目。 一、项目演示 - 博客首页 - 加载动画 - 右侧搜索框可以输入…

开源服务器管理软件Nexterm

什么是 Nexterm &#xff1f; Nexterm 是一款用于 SSH、VNC 和 RDP 的开源服务器管理软件。 安装 在群晖上以 Docker 方式安装。 在注册表中搜索 nexterm &#xff0c;选择第一个 germannewsmaker/nexterm&#xff0c;版本选择 latest。 本文写作时&#xff0c; latest 版本对…

【STM32】RTT-Studio中HAL库开发教程七:IIC通信--EEPROM存储器FM24C04

文章目录 一、简介二、模拟IIC时序三、读写流程四、完整代码五、测试验证 一、简介 FM24C04D&#xff0c;4K串行EEPROM&#xff1a;内部32页&#xff0c;每个16字节&#xff0c;4K需要一个11位的数据字地址进行随机字寻址。FM24C04D提供4096位串行电可擦除和可编程只读存储器&a…

Excel 设置自动换行

背景 版本&#xff1a;office 专业版 11.0 表格内输入长信息&#xff0c;发现默认状态时未自动换行的&#xff0c;找了很久设置按钮&#xff0c;遂总结成经验帖。 操作 1&#xff09;选中需设置的单元格/区域/行/列。 2&#xff09;点击【开始】下【对齐方式】中的【自动换…

HAproxy,nginx实现七层负载均衡

环境准备&#xff1a; 192.168.88.25 &#xff08;client&#xff09; 192.168.88.26 &#xff08;HAproxy&#xff09; 192.168.88.27 &#xff08;web1&#xff09; 192.168.88.28 (web2) 192.168.88.29 &#xff08;php1&#xff09; 192.168.88.30…