经过前两篇文章对brin index的讲解, 对brin index的构建更深入的认识,这些内容是理解索引更新流程的重点,相关知识点见postgres源码解析54 Brin Index–1 postgres源码解析55 Brin Index–2(brinbuild流程)。由于brin index是块范围索引,因此在插入、更新或者删除操作时,并不会实时进行维护索引信息;如下图,插入12与删除12均不会影响第三个brin index,可能会对统计信息产生影响,并不会影响数据正确性。
只有插入或者更新的数据超出原brin index边界范围内,才会触发更新动作。涉及的重点函数为 brininsert --> brin_form_tuple --> brin_doupdate --> PageIndexTupleOverwrite
brininsert
1 首先调用 brinRevmapInitialize函数初始brin index 映射页的访问对象BrinRevmap;
2 确定插入的heap元组落在哪个pagesPerRange,后续的更新操作均落在此页域中;
3 调用 brinGetTupleForHeapBlock获取该页域对应的磁盘形式brin index索引brtup;
4 如果brtup为空,退出循环
5 紧接着调用brin_deform_tuple函数将磁盘形式的brin index 转换成内存形式BrinMemTuple;
6 依次遍历索引元组的所有属性,判断插入堆元组数值是否落在索引元组区间内;如果在此区间内,设置标识need_insert 为false;如果超出此区间,则将发生变动的值更新至BrinMemTuple中,并设置标识need_insert 为true;
7 如果need_insert为false, 则释放锁资源;
8 调用 brin_copy_tuple函数拷贝brtup作为副本origtup,避免长时间持有锁;
9 然后将BrinMemTuple转换成磁盘形式brin index元组,判断是否可以同页更新,设置samepage标识;
10 释放缓冲块共享锁,调用brin_doupdate函数进行真正地更新操作;
11最后释放锁资源和内存资源。
brin_deform_tuple
该函数是将brin tuple从磁盘格式装换成内存格式,真正的实现函数为brin_deconstruct_tuple。
注:为避免在一循环中多次调用此函数频繁申请内存BrinMemTuple,提出一个优化手段。重复使用先前申请的内存BrinMemTuple,只需进行简单的初始化工作即可,测试验证此优化十分高效。
1 根据入参dMemtuple判断是否重用BrinMemTuple内存,可重用则只需进行简单地初始化工作,反之需要申请BrinMemTuple内存空间用于装载磁盘形式brin tuple数值信息。
2 获取tuple的数据域头部地址tp,判断tuple是否存在NULL值;若存在则需要获取tuple的bitmap地址nullbits;
3 调用brin_deconstruct_tuple函数将磁盘形式的brin tuple转换为内存形式。期执行流程如下:
/*
* brin_deconstruct_tuple
* Guts of attribute extraction from an on-disk BRIN tuple.
*
* Its arguments are:
* brdesc BRIN descriptor for the stored tuple
* tp pointer to the tuple data area
* nullbits pointer to the tuple nulls bitmask
* nulls "has nulls" bit in tuple infomask
* values output values, array of size brdesc->bd_totalstored
* allnulls output "allnulls", size brdesc->bd_tupdesc->natts
* hasnulls output "hasnulls", size brdesc->bd_tupdesc->natts
*
* Output arrays must have been allocated by caller.
*/
static inline void
brin_deconstruct_tuple(BrinDesc *brdesc,
char *tp, bits8 *nullbits, bool nulls,
Datum *values, bool *allnulls, bool *hasnulls)
{
int attnum;
int stored;
TupleDesc diskdsc;
long off;
/*
* First iterate to natts to obtain both null flags for each attribute.
* Note that we reverse the sense of the att_isnull test, because we store
* 1 for a null value (rather than a 1 for a not null value as is the
* att_isnull convention used elsewhere.) See brin_form_tuple.
*/
for (attnum = 0; attnum < brdesc->bd_tupdesc->natts; attnum++)
{
/*
* the "all nulls" bit means that all values in the page range for
* this column are nulls. Therefore there are no values in the tuple
* data area.
*/
allnulls[attnum] = nulls && !att_isnull(attnum, nullbits);
/*
* the "has nulls" bit means that some tuples have nulls, but others
* have not-null values. Therefore we know the tuple contains data
* for this column.
*
* The hasnulls bits follow the allnulls bits in the same bitmask.
*/
hasnulls[attnum] =
nulls && !att_isnull(brdesc->bd_tupdesc->natts + attnum, nullbits);
}
/*
* Iterate to obtain each attribute's stored values. Note that since we
* may reuse attribute entries for more than one column, we cannot cache
* offsets here.
*/
diskdsc = brtuple_disk_tupdesc(brdesc);
stored = 0;
off = 0;
for (attnum = 0; attnum < brdesc->bd_tupdesc->natts; attnum++)
{
int datumno;
if (allnulls[attnum])
{
stored += brdesc->bd_info[attnum]->oi_nstored;
continue;
}
for (datumno = 0;
datumno < brdesc->bd_info[attnum]->oi_nstored;
datumno++)
{
Form_pg_attribute thisatt = TupleDescAttr(diskdsc, stored);
if (thisatt->attlen == -1)
{
off = att_align_pointer(off, thisatt->attalign, -1,
tp + off);
}
else
{
/* not varlena, so safe to use att_align_nominal */
off = att_align_nominal(off, thisatt->attalign);
}
values[stored++] = fetchatt(thisatt, tp + off);
off = att_addlength_pointer(off, thisatt->attlen, tp + off);
}
}
}
brin_doupdate
brin_doupdate 该函数的主要功能是更新旧元组,有两种场景,在同一页更新还是非同页更新
1 首先判断索引元组是否大于最大索引元组阈值,如果是则打印错误信息并返回false;
2 如果samepage为false,则找到可以容纳待插元组的索引页,并载入内存newbuf且设置extended标识;
1)如果没有可用内存块,则返回false;
2)如果newbuf == oldbuf,说明oldbuf含有足够的空间容纳待插元组;
3 获取oldbuf的缓冲块排它锁,设置extended为false;
4 读取oldbuf对应的数据页oldpage和旧元组对应的item信息;
5 如果oldpage不是常规的索引页/item非法/item状态不正常,则释放排它锁,
1)如果newbuf有效,
1.1如果extended为true,则初始化该newbuf对应的数据页;
1.2释放newbuf上的锁资源(content lock和pin lock);
1.3更新索引表的FSM信息;
2)newbuf无效,返回false;
6 如果旧元组内容发生变化,则先释放oldbuf的缓冲块排它锁;
1)如果newbuf有效,
1.1如果extended为true,则初始化该newbuf对应的数据页;
1.2释放newbuf上的锁资源(content lock和pin lock);
1.3更新索引表的FSM信息;
2)newbuf无效,返回false;
7 如果是同页更新,则;
临界区
1)调用PageIndexTupleOverwrite函数执行更新操作
更新失败,打印错误信息并退出
2)将oldbuf标记为脏,需持有缓冲块排它锁防止其他进程进行刷盘操作,引发写入脏数据的风险;
3)构建XLOG日志(XLOG_BRIN_SAMEPAGE_UPDATE),更新oldpage的lsn信息;
临界区
4)释放oldbuf的缓冲块排它锁;
5)如果newbuf无效,返回false,反之进行如下步骤:
1.1如果extended为true,则初始化该newbuf对应的数据页;
1.2释放newbuf上的锁资源(content lock和pin lock);
1.3更新索引表的FSM信息;
6)返回true;
8 如果newbuf无效,则释放oldbuf的缓冲块排它锁,返回false;
9 步骤7-8均不满足时,则说明oldpage没有空间容纳待插入元组,需要newbuf执行更新插入操作,流程如下:
1)首先获取newabuf对应的索引映射revmappage页并将其载入revmapbuf;
临界区
2)如果extended标识为true,则初始化一个新的索引常规页newpage;
3)紧接着将索引元组从oldpage中删除,将其对应的itemdata设置为unused状态;
4)调用PageAddItem函数将待插索引元组插入newpage中,并返回其item偏移量newoff;
5)如果newoff无效,则报错失败退出;
6)设置oldbuf、newbuf为脏状态;
7)将插入的索引元组物理地址信息封装成TID插入到映射页中,并标记revmapbuf为脏状态;
8)构建XLOG日志,更新oldpage、newpage、revmappage的lsn信息;
临界区
9)释放锁资源,并更新索引表对应的FSM信息,最后返回true。
PageIndexTupleOverwrite 函数
在Brin index页替换指定索引
1 首先进行安全性检查,判断索引页是否损坏或者元组偏移量offnum是否合法;
2 然后计算新元组与旧元组的尺寸大小size_diff,然后重新移动目标元组前的所有元组,并更新offset指针;
3 最后将新索引元组插入到更新后的offset位置。
场景1:删除的元素不是最后一个ItenId指向的索引元组
仅回收索引元组,而ItenidData设置为UNUSED转态
场景2:删除的元素是最后一个ItenId指向的索引元组
ItemidData和索引元组空间都回收
/*
* PageIndexTupleOverwrite
*
* Replace a specified tuple on an index page.
*
* The new tuple is placed exactly where the old one had been, shifting
* other tuples' data up or down as needed to keep the page compacted.
* This is better than deleting and reinserting the tuple, because it
* avoids any data shifting when the tuple size doesn't change; and
* even when it does, we avoid moving the line pointers around.
* Conceivably this could also be of use to an index AM that cares about
* the physical order of tuples as well as their ItemId order.
*
* If there's insufficient space for the new tuple, return false. Other
* errors represent data-corruption problems, so we just elog.
*/
bool
PageIndexTupleOverwrite(Page page, OffsetNumber offnum,
Item newtup, Size newsize)
{
PageHeader phdr = (PageHeader) page;
ItemId tupid;
int oldsize;
unsigned offset;
Size alignednewsize;
int size_diff;
int itemcount;
/*
* As with PageRepairFragmentation, paranoia seems justified.
*/
if (phdr->pd_lower < SizeOfPageHeaderData ||
phdr->pd_lower > phdr->pd_upper ||
phdr->pd_upper > phdr->pd_special ||
phdr->pd_special > BLCKSZ ||
phdr->pd_special != MAXALIGN(phdr->pd_special))
ereport(ERROR,
(errcode(ERRCODE_DATA_CORRUPTED),
errmsg("corrupted page pointers: lower = %u, upper = %u, special = %u",
phdr->pd_lower, phdr->pd_upper, phdr->pd_special)));
itemcount = PageGetMaxOffsetNumber(page);
if ((int) offnum <= 0 || (int) offnum > itemcount)
elog(ERROR, "invalid index offnum: %u", offnum);
tupid = PageGetItemId(page, offnum);
Assert(ItemIdHasStorage(tupid));
oldsize = ItemIdGetLength(tupid);
offset = ItemIdGetOffset(tupid);
if (offset < phdr->pd_upper || (offset + oldsize) > phdr->pd_special ||
offset != MAXALIGN(offset))
ereport(ERROR,
(errcode(ERRCODE_DATA_CORRUPTED),
errmsg("corrupted line pointer: offset = %u, size = %u",
offset, (unsigned int) oldsize)));
/*
* Determine actual change in space requirement, check for page overflow.
*/
oldsize = MAXALIGN(oldsize);
alignednewsize = MAXALIGN(newsize);
if (alignednewsize > oldsize + (phdr->pd_upper - phdr->pd_lower))
return false;
/*
* Relocate existing data and update line pointers, unless the new tuple
* is the same size as the old (after alignment), in which case there's
* nothing to do. Notice that what we have to relocate is data before the
* target tuple, not data after, so it's convenient to express size_diff
* as the amount by which the tuple's size is decreasing, making it the
* delta to add to pd_upper and affected line pointers.
*/
size_diff = oldsize - (int) alignednewsize;
if (size_diff != 0)
{
char *addr = (char *) page + phdr->pd_upper;
int i;
/* relocate all tuple data before the target tuple */
memmove(addr + size_diff, addr, offset - phdr->pd_upper);
/* adjust free space boundary pointer */
phdr->pd_upper += size_diff;
/* adjust affected line pointers too */
for (i = FirstOffsetNumber; i <= itemcount; i++)
{
ItemId ii = PageGetItemId(phdr, i);
/* Allow items without storage; currently only BRIN needs that */
if (ItemIdHasStorage(ii) && ItemIdGetOffset(ii) <= offset)
ii->lp_off += size_diff;
}
}
/* Update the item's tuple length (other fields shouldn't change) */
ItemIdSetNormal(tupid, offset + size_diff, newsize);
/* Copy new tuple data onto page */
memcpy(PageGetItem(page, tupid), newtup, newsize);
return true;
}