上一篇讲解了brin index的基本概念以及页布局postgres源码解析54 Brin Index–1,后续会从源码角度对索引的构建、维护等方面进行深入讲解。
1 关键数据结构
2 brinbuild执行流程图
3 brinbuild 函数详解
1 首先调用brin_matepage_init初始化brin meta元数据页,并构造对应的XLOG日志填充入至WAL buffer中;
2 紧接着调用brinRevmapInitialize初始化brin revmap映射页、BrinBuildState结构体用于记录后续brin tuple状态信息;
3 按heap表物理块的顺序扫描,构造对应的brin index 元组信息,元组的构造流程由回调函数brinbuildCallback实现;
4 调用form_and_insert_tuple将索引元组插入brin regular常规页中,同时将此元组的TID信息记录至brin revmap映射页中;
5 为此插入动作构造XLOG日志并插入至WAL buffer中;
6 最后释放锁资源,如发生页扩展情况需更新对应的FSM信息;
* brinbuild() -- build a new BRIN index.
IndexBuildResult *
brinbuild(Relation heap, Relation index, IndexInfo *indexInfo)
IndexBuildResult *result;
double reltuples;
double idxtuples;
BrinRevmap *revmap;
BrinBuildState *state;
Buffer meta;
BlockNumber pagesPerRange;
* We expect to be called exactly once for any index relation.
if (RelationGetNumberOfBlocks(index) != 0)
elog(ERROR, "index \"%s\" already contains data",
* Critical section not required, because on error the creation of the
* whole relation will be rolled back.
meta = ReadBuffer(index, P_NEW);
Assert(BufferGetBlockNumber(meta) == BRIN_METAPAGE_BLKNO);
brin_metapage_init(BufferGetPage(meta), BrinGetPagesPerRange(index),
if (RelationNeedsWAL(index))
xl_brin_createidx xlrec;
XLogRecPtr recptr;
Page page;
xlrec.version = BRIN_CURRENT_VERSION;
xlrec.pagesPerRange = BrinGetPagesPerRange(index);
XLogRegisterData((char *) &xlrec, SizeOfBrinCreateIdx);
XLogRegisterBuffer(0, meta, REGBUF_WILL_INIT | REGBUF_STANDARD);
page = BufferGetPage(meta);
PageSetLSN(page, recptr);
* Initialize our state, including the deformed tuple state.
revmap = brinRevmapInitialize(index, &pagesPerRange, NULL);
state = initialize_brin_buildstate(index, revmap, pagesPerRange);
* Now scan the relation. No syncscan allowed here because we want the
* heap blocks in physical order.
reltuples = table_index_build_scan(heap, index, indexInfo, false, true,
brinbuildCallback, (void *) state, NULL);
/* process the final batch */
/* release resources */
idxtuples = state->bs_numtuples;
* Return statistics
result = (IndexBuildResult *) palloc(sizeof(IndexBuildResult));
result->heap_tuples = reltuples;
result->index_tuples = idxtuples;
return result;
4 brin_form_tuple
brin tuple在内存中形式为BrinMemTuple,磁盘形式为BrinTuple,因此在写入磁盘前需要将BrinMemTuple转换成BrinTuple;其执行流程为:
1 首先根据brdesc->bd_totalstored为values、nulls、phony_nullbitmap与untoasted_values数组申请内存空间;
2 遍历brdesc->bd_tupdesc->natts属性,检查tuple是否存在空值,如果存在的需要将在nulls数组的对应元素置为true;
3 后续依次将tuple中的数据读出,并填充至values数组中;
4 遍历完brin index所有属性后,开始计算磁盘形式Brin index的长度lens;
5 申请大小为lens的内存空间rettuple,填充rettuple->bt_blkno与rettuple->bt_info属性,后续调用heap_fill_tuple将values数组中的数值依次填充至rettuple的数据域区;
6 后续填充bitmap区域,设置null 位码;
7 最后更新bt_info标识信息,返回rettuple地址。
* Generate a new on-disk tuple to be inserted in a BRIN index.
* See brin_form_placeholder_tuple if you touch this.
BrinTuple *
brin_form_tuple(BrinDesc *brdesc, BlockNumber blkno, BrinMemTuple *tuple,
Size *size)
Datum *values;
bool *nulls;
bool anynulls = false;
BrinTuple *rettuple;
int keyno;
int idxattno;
uint16 phony_infomask = 0;
bits8 *phony_nullbitmap;
Size len,
int i;
Datum *untoasted_values;
int nuntoasted = 0;
Assert(brdesc->bd_totalstored > 0);
values = (Datum *) palloc(sizeof(Datum) * brdesc->bd_totalstored);
nulls = (bool *) palloc0(sizeof(bool) * brdesc->bd_totalstored);
phony_nullbitmap = (bits8 *)
palloc(sizeof(bits8) * BITMAPLEN(brdesc->bd_totalstored));
untoasted_values = (Datum *) palloc(sizeof(Datum) * brdesc->bd_totalstored);
* Set up the values/nulls arrays for heap_fill_tuple
idxattno = 0;
for (keyno = 0; keyno < brdesc->bd_tupdesc->natts; keyno++)
int datumno;
* "allnulls" is set when there's no nonnull value in any row in the
* column; when this happens, there is no data to store. Thus set the
* nullable bits for all data elements of this column and we're done.
if (tuple->bt_columns[keyno].bv_allnulls)
for (datumno = 0;
datumno < brdesc->bd_info[keyno]->oi_nstored;
nulls[idxattno++] = true;
anynulls = true;
* The "hasnulls" bit is set when there are some null values in the
* data. We still need to store a real value, but the presence of
* this means we need a null bitmap.
if (tuple->bt_columns[keyno].bv_hasnulls)
anynulls = true;
* Now obtain the values of each stored datum. Note that some values
* might be toasted, and we cannot rely on the original heap values
* sticking around forever, so we must detoast them. Also try to
* compress them.
for (datumno = 0;
datumno < brdesc->bd_info[keyno]->oi_nstored;
Datum value = tuple->bt_columns[keyno].bv_values[datumno];
/* We must look at the stored type, not at the index descriptor. */
TypeCacheEntry *atttype = brdesc->bd_info[keyno]->oi_typcache[datumno];
/* Do we need to free the value at the end? */
bool free_value = false;
/* For non-varlena types we don't need to do anything special */
if (atttype->typlen != -1)
values[idxattno++] = value;
* Do nothing if value is not of varlena type. We don't need to
* care about NULL values here, thanks to bv_allnulls above.
* If value is stored EXTERNAL, must fetch it so we are not
* depending on outside storage.
* XXX Is this actually true? Could it be that the summary is
* NULL even for range with non-NULL data? E.g. degenerate bloom
* filter may be thrown away, etc.
if (VARATT_IS_EXTERNAL(DatumGetPointer(value)))
value = PointerGetDatum(heap_tuple_fetch_attr((struct varlena *)
free_value = true;
* If value is above size target, and is of a compressible datatype,
* try to compress it in-line.
if (!VARATT_IS_EXTENDED(DatumGetPointer(value)) &&
VARSIZE(DatumGetPointer(value)) > TOAST_INDEX_TARGET &&
(atttype->typstorage == 'x' || atttype->typstorage == 'm'))
Datum cvalue = toast_compress_datum(value);
if (DatumGetPointer(cvalue) != NULL)
/* successful compression */
if (free_value)
value = cvalue;
free_value = true;
* If we untoasted / compressed the value, we need to free it
* after forming the index tuple.
if (free_value)
untoasted_values[nuntoasted++] = value;
values[idxattno++] = value;
/* Assert we did not overrun temp arrays */
Assert(idxattno <= brdesc->bd_totalstored);
/* compute total space needed */
len = SizeOfBrinTuple;
if (anynulls)
* We need a double-length bitmap on an on-disk BRIN index tuple; the
* first half stores the "allnulls" bits, the second stores
* "hasnulls".
len += BITMAPLEN(brdesc->bd_tupdesc->natts * 2);
len = hoff = MAXALIGN(len);
data_len = heap_compute_data_size(brtuple_disk_tupdesc(brdesc),
values, nulls);
len += data_len;
len = MAXALIGN(len);
rettuple = palloc0(len);
rettuple->bt_blkno = blkno;
rettuple->bt_info = hoff;
/* Assert that hoff fits in the space available */
Assert((rettuple->bt_info & BRIN_OFFSET_MASK) == hoff); // BRIN_OFFSET_MASK= 0x1F (00011111)
* The infomask and null bitmap as computed by heap_fill_tuple are useless
* to us. However, that function will not accept a null infomask; and we
* need to pass a valid null bitmap so that it will correctly skip
* outputting null attributes in the data area.
(char *) rettuple + hoff,
/* done with these */
for (i = 0; i < nuntoasted; i++)
* Now fill in the real null bitmasks. allnulls first.
if (anynulls)
bits8 *bitP;
int bitmask;
rettuple->bt_info |= BRIN_NULLS_MASK;
* Note that we reverse the sense of null bits in this module: we
* store a 1 for a null attribute rather than a 0. So we must reverse
* the sense of the att_isnull test in brin_deconstruct_tuple as well.
bitP = ((bits8 *) ((char *) rettuple + SizeOfBrinTuple)) - 1;
bitmask = HIGHBIT;
for (keyno = 0; keyno < brdesc->bd_tupdesc->natts; keyno++)
if (bitmask != HIGHBIT)
bitmask <<= 1;
bitP += 1;
*bitP = 0x0;
bitmask = 1;
if (!tuple->bt_columns[keyno].bv_allnulls)
*bitP |= bitmask;
/* hasnulls bits follow */
for (keyno = 0; keyno < brdesc->bd_tupdesc->natts; keyno++)
if (bitmask != HIGHBIT)
bitmask <<= 1;
bitP += 1;
*bitP = 0x0;
bitmask = 1;
if (!tuple->bt_columns[keyno].bv_hasnulls)
*bitP |= bitmask;
bitP = ((bits8 *) (rettuple + SizeOfBrinTuple)) - 1;
if (tuple->bt_placeholder)
rettuple->bt_info |= BRIN_PLACEHOLDER_MASK;
*size = len;
return rettuple;
5 brin_doinsert 函数
经过函数brin_form_tuple对内存形式BrinMemTuple加工生成磁盘形式Brin tuple,后进入真正的插入操作,由brin_doinsert函数实现。执行流程如下:
1 首先进行安全检查,判断待插入元组大小是否超过单个brin index元组最大阈值,如果是则写错误日志信息,返回InvalidOffsetNumber;
2 确保索引条目所在heap页域是否对应当前映射页,如果不对应,需进行扩展或者重用某映射页;
3 如果待插入常规页所在缓冲块有效,获取缓冲块排它锁;检查此常规页是否有足够空间容纳待插入索引,不能的话则释放缓冲块排它锁,将缓冲块号*buffer置为InvalidBuffer;
4 如果缓冲块号为InvalidBuffer,则循环调用brin_getinsertbuffer函数找到可用的brin buffer;
5 对步骤2中的revmap buffer施加缓冲块排它锁;
6 获取步骤4中brin buffer对应常规页地址page和常规页块号blk;
7 调用PageAddItem函数向page中插入索引元组,返回其偏移量off;
8 将上述blk与off信息封装成TID插入revmap映射页中,并为此操作生成对应的XLOG;
9 释放锁资源,如果发生brin index常规页扩展情况,需要更新对应的FSM信息。
brin_doinsert(Relation idxrel, BlockNumber pagesPerRange,
BrinRevmap *revmap, Buffer *buffer, BlockNumber heapBlk,
BrinTuple *tup, Size itemsz)
Page page;
BlockNumber blk;
OffsetNumber off;
Size freespace = 0;
Buffer revmapbuf;
ItemPointerData tid;
bool extended;
Assert(itemsz == MAXALIGN(itemsz));
/* If the item is oversized, don't even bother. */
if (itemsz > BrinMaxItemSize)
errmsg("index row size %zu exceeds maximum %zu for index \"%s\"",
itemsz, BrinMaxItemSize, RelationGetRelationName(idxrel))));
return InvalidOffsetNumber; /* keep compiler quiet */
/* Make sure the revmap is long enough to contain the entry we need */
brinRevmapExtend(revmap, heapBlk);
* Acquire lock on buffer supplied by caller, if any. If it doesn't have
* enough space, unpin it to obtain a new one below.
if (BufferIsValid(*buffer))
* It's possible that another backend (or ourselves!) extended the
* revmap over the page we held a pin on, so we cannot assume that
* it's still a regular page.
LockBuffer(*buffer, BUFFER_LOCK_EXCLUSIVE);
if (br_page_get_freespace(BufferGetPage(*buffer)) < itemsz)
*buffer = InvalidBuffer;
* If we still don't have a usable buffer, have brin_getinsertbuffer
* obtain one for us.
if (!BufferIsValid(*buffer))
*buffer = brin_getinsertbuffer(idxrel, InvalidBuffer, itemsz, &extended);
while (!BufferIsValid(*buffer));
extended = false;
/* Now obtain lock on revmap buffer */
revmapbuf = brinLockRevmapPageForUpdate(revmap, heapBlk);
page = BufferGetPage(*buffer);
blk = BufferGetBlockNumber(*buffer);
/* Execute the actual insertion */
if (extended)
brin_page_init(page, BRIN_PAGETYPE_REGULAR);
off = PageAddItem(page, (Item) tup, itemsz, InvalidOffsetNumber,
false, false);
if (off == InvalidOffsetNumber)
elog(ERROR, "failed to add BRIN tuple to new page");
/* needed to update FSM below */
if (extended)
freespace = br_page_get_freespace(page);
ItemPointerSet(&tid, blk, off);
brinSetHeapBlockItemptr(revmapbuf, pagesPerRange, heapBlk, tid);
/* XLOG stuff */
if (RelationNeedsWAL(idxrel))
xl_brin_insert xlrec;
XLogRecPtr recptr;
uint8 info;
info = XLOG_BRIN_INSERT | (extended ? XLOG_BRIN_INIT_PAGE : 0);
xlrec.heapBlk = heapBlk;
xlrec.pagesPerRange = pagesPerRange;
xlrec.offnum = off;
XLogRegisterData((char *) &xlrec, SizeOfBrinInsert);
XLogRegisterBuffer(0, *buffer, REGBUF_STANDARD | (extended ? REGBUF_WILL_INIT : 0));
XLogRegisterBufData(0, (char *) tup, itemsz);
XLogRegisterBuffer(1, revmapbuf, 0);
recptr = XLogInsert(RM_BRIN_ID, info);
PageSetLSN(page, recptr);
PageSetLSN(BufferGetPage(revmapbuf), recptr);
/* Tuple is firmly on buffer; we can release our locks */
LockBuffer(*buffer, BUFFER_LOCK_UNLOCK);
LockBuffer(revmapbuf, BUFFER_LOCK_UNLOCK);
BRIN_elog((DEBUG2, "inserted tuple (%u,%u) for range starting at %u",
blk, off, heapBlk));
if (extended)
RecordPageWithFreeSpace(idxrel, blk, freespace);
FreeSpaceMapVacuumRange(idxrel, blk, blk + 1);
return off;