本地缓冲区管理器(local buffer manager)为temporary表(无需WAL-logged或checkpointed)进行快速缓冲区管理,API定义在src/backend/storage/buffer/localbuf.c中。
获取LocalBuffer
初始化
首先解释temp_buffers GUC参数,其用于设置session使用的temporary buffers的最大数量,对应的变量为num_temp_buffers。InitLocalBuffers函数用于初始化local buffer cache。因为大多数查询不会涉及local buffers,为缓冲区分配空间的工作延迟到需要时才进行,该函数仅仅创建buffer headers。如下图所示,本地缓冲区和共享缓冲区的层次结构类似,其初始化也类似,不同的是LocalBufferDescriptors的元素的buf_id是负数,原因在代码注释中也解释过了。
static void InitLocalBuffers(void) {
int nbufs = num_temp_buffers;
HASHCTL info;
if (IsParallelWorker()) ereport(ERROR, (errcode(ERRCODE_INVALID_TRANSACTION_STATE), errmsg("cannot access temporary tables during a parallel operation"))); /* Parallel workers can't access data in temporary tables, because they have no visibility into the local buffers of their leader. This is a convenient, low-cost place to provide a backstop check for that. Note that we don't wish to prevent a parallel worker from accessing catalog metadata about a temp table, so checks at higher levels would be inappropriate. */
/* Allocate and zero buffer headers and auxiliary arrays */
LocalBufferDescriptors = (BufferDesc *) calloc(nbufs, sizeof(BufferDesc));
LocalBufferBlockPointers = (Block *) calloc(nbufs, sizeof(Block));
LocalRefCount = (int32 *) calloc(nbufs, sizeof(int32));
if (!LocalBufferDescriptors || !LocalBufferBlockPointers || !LocalRefCount) ereport(FATAL,(errcode(ERRCODE_OUT_OF_MEMORY),errmsg("out of memory")));
nextFreeLocalBuf = 0;
for (int i = 0; i < nbufs; i++){ /* initialize fields that need to start off nonzero */
BufferDesc *buf = GetLocalBufferDescriptor(i);
/* negative to indicate local buffer. This is tricky: shared buffers
* start with 0. We have to start with -2. (Note that the routine
* BufferDescriptorGetBuffer adds 1 to buf_id so our first buffer id
* is -1.) */
buf->buf_id = -i - 2;
/* Intentionally do not initialize the buffer's atomic variable
* (besides zeroing the underlying memory above). That way we get
* errors on platforms without atomics, if somebody (re-)introduces
* atomic operations for local buffers. */
}
/* Create the lookup hash table */
MemSet(&info, 0, sizeof(info));
info.keysize = sizeof(BufferTag);
info.entrysize = sizeof(LocalBufferLookupEnt);
LocalBufHash = hash_create("Local Buffer Lookup Table",nbufs, &info, HASH_ELEM | HASH_BLOBS);
if (!LocalBufHash) elog(ERROR, "could not initialize local buffer hash table");
NLocBuffer = nbufs; /* Initialization done, mark buffers allocated */
}
获取
GetLocalBufferStorage函数为local buffer分配内存(TopMemoryContext内存上下文的LocalBufferContext),该函数的思想是聚合我们的存储请求,以便内存管理器不会看到大量相对较小的请求。因为一旦在特定进程中创建了本地缓冲区,我们就永远不会返回它,所以用单独管理的块来加重memmgr的负担是没有意义的。The idea of this function is to aggregate our requests for storage so that the memory manager doesn’t see a whole lot of relatively small requests. Since we’ll never give back a local buffer once it’s created within a particular process, no point in burdening memmgr with separately managed chunks.
static Block GetLocalBufferStorage(void) {
static char *cur_block = NULL;
static int next_buf_in_block = 0;
static int num_bufs_in_block = 0;
static int total_bufs_allocated = 0;
static MemoryContext LocalBufferContext = NULL;
char *this_buf;
if (next_buf_in_block >= num_bufs_in_block) {
if (LocalBufferContext == NULL) LocalBufferContext = AllocSetContextCreate(TopMemoryContext, "LocalBufferContext", ALLOCSET_DEFAULT_SIZES); /* We allocate local buffers in a context of their own, so that the space eaten for them is easily recognizable in MemoryContextStats output. Create the context on first use. */
int num_bufs; /* Need to make a new request to memmgr */
num_bufs = Max(num_bufs_in_block * 2, 16); /* Start with a 16-buffer request; subsequent ones double each time */
num_bufs = Min(num_bufs, NLocBuffer - total_bufs_allocated); /* But not more than what we need for all remaining local bufs */
num_bufs = Min(num_bufs, MaxAllocSize / BLCKSZ); /* And don't overflow MaxAllocSize, either */
cur_block = (char *) MemoryContextAlloc(LocalBufferContext, num_bufs * BLCKSZ);
next_buf_in_block = 0;
num_bufs_in_block = num_bufs;
}
/* Allocate next buffer in current memory block */
this_buf = cur_block + next_buf_in_block * BLCKSZ;
next_buf_in_block++;
total_bufs_allocated++;
return (Block) this_buf;
}
LocalBufferAlloc
本地缓冲区初始化的调用链为ReadBufferExtended/ReadBufferWithoutRelcache --> ReadBuffer_common --> LocalBufferAlloc --> InitLocalBuffers。ReadBufferExtended—返回一个缓冲区,该缓冲区包含请求关系的请求块。如果请求的blknum是P_NEW,则扩展关系文件并分配一个新块。(调用者负责确保只有一个后端同时尝试扩展关系!)ReadBufferExtended – returns a buffer containing the requested block of the requested relation. If the blknum requested is P_NEW, extend the relation file and allocate a new block. (Caller is responsible for ensuring that only one backend tries to extend a relation at the same time!) 返回:包含读取的块的缓冲区的缓冲区编号。返回的缓冲区已固定。不返回错误,而是返回elog。假设调用此函数时,reln已经打开。Returns: the buffer number for the buffer containing the block read. The returned buffer has been pinned. Does not return on error — elog’s instead. Assume when this function is called, that reln has been opened already.
ReadBufferMode形参取值:在RBM_NORMAL模式下,从磁盘读取页面,并验证页面标题。如果页头无效,将引发错误。(但请注意,全零页面被视为“有效”;请参阅PageIsVerifiedExtended()。)In RBM_NORMAL mode, the page is read from disk, and the page header is validated. An error is thrown if the page header is not valid. (But note that an all-zero page is considered “valid”; see PageIsVerifiedExtended().) RBM_ZERO_ON_ERROR与正常模式类似,但如果页面标题无效,则页面将归零,而不是抛出错误。这适用于非关键数据,调用方准备修复错误。RBM_ZERO_ON_ERROR is like the normal mode, but if the page header is not valid, the page is zeroed instead of throwing an error. This is intended for non-critical data, where the caller is prepared to repair errors.在RBM_ZERO_AND_LOCK模式下,如果页面已经不在缓冲区缓存中,则会用零填充,而不是从磁盘读取。当调用者从头开始填充页面时很有用,因为这样可以节省I/O,并避免在磁盘上的页面有损坏的页头时出现不必要的故障。页面返回时被锁定,以确保调用者有机会在页面对其他人可见之前对其进行初始化。注意:不要使用此模式读取超出关系当前物理EOF的页面;当修改并写出页面时,这可能会导致md.c.出现问题。但P_NEW是可以的。In RBM_ZERO_AND_LOCK mode, if the page isn’t in buffer cache already, it’s filled with zeros instead of reading it from disk. Useful when the caller is going to fill the page from scratch, since this saves I/O and avoids unnecessary failure if the page-on-disk has corrupt page headers. The page is returned locked to ensure that the caller has a chance to initialize the page before it’s made visible to others. Caution: do not use this mode to read a page that is beyond the relation’s current physical EOF; that is likely to cause problems in md.c when the page is modified and written out. P_NEW is OK, though. RBM_ZERO_AND_CLEANUP_LOCK与RBM_ZEOR_AND_LOCK相同,但在页面上获得清理强度锁。RBM_ZERO_AND_CLEANUP_LOCK is the same as RBM_ZERO_AND_LOCK, but acquires a cleanup-strength lock on the page. RBM_NORMAL_NO_LOG模式的处理方式与RBM_NORAL相同。RBM_NORMAL_NO_LOG mode is treated the same as RBM_NORMAL here.
如果策略不为NULL,则使用非默认缓冲区访问策略。If strategy is not NULL, a nondefault buffer access strategy is used.
Buffer ReadBufferExtended(Relation reln, ForkNumber forkNum, BlockNumber blockNum, ReadBufferMode mode, BufferAccessStrategy strategy) {
bool hit;
RelationOpenSmgr(reln); /* Open it at the smgr level if not already done */
if (RELATION_IS_OTHER_TEMP(reln)) ereport(ERROR,(errcode(ERRCODE_FEATURE_NOT_SUPPORTED), errmsg("cannot access temporary tables of other sessions"))); /* Reject attempts to read non-local temporary relations; we would be likely to get wrong data since we have no visibility into the owning session's local buffers. */
pgstat_count_buffer_read(reln); /* Read the buffer, and update pgstat counters to reflect a cache hit or miss. */
Buffer buf = ReadBuffer_common(reln->rd_smgr, reln->rd_rel->relpersistence, forkNum, blockNum, mode, strategy, &hit);
if (hit) pgstat_count_buffer_hit(reln);
return buf;
}
ReadBufferWithoutRelcache函数和ReadBufferExtended相似,但是不需要relcache条目。
Buffer ReadBufferWithoutRelcache(RelFileNode rnode, ForkNumber forkNum, BlockNumber blockNum, ReadBufferMode mode, BufferAccessStrategy strategy) {
bool hit;
SMgrRelation smgr = smgropen(rnode, InvalidBackendId);
return ReadBuffer_common(smgr, RELPERSISTENCE_PERMANENT, forkNum, blockNum, mode, strategy, &hit);
}
LocalBufferAlloc函数为给定的relation的数据页查找或创建一个local buffer。该API类似于bufmgr.c中的BufferAlloc,不同的是不需要任何locking操作(API is similar to bufmgr.c’s BufferAlloc, except that we do not need to do any locking since this is all local),IO_IN_PROGRESS不会设置,当前仅支持默认访问策略default access strategy。LocalBufferAlloc函数执行流程如下:调用InitLocalBuffers初始化初始化Local Buffer;初始化BufferTag;查询LocalBufHash,判定需要的buffer已经存在;如果存在,通过LocalBufferLookupEnt中存储的BufferDesc的序号,找到LcoalBufferDescriptors数组中的元素,这里使用了LocalRefCount来模拟shared buffer中的PinBuffer,更新计数值;如果不存在,需要获取新的Buffer,这里使用clock sweep algorithm算法,获取到的缓冲区未被引用,但它可能仍然是脏的。如果是这样的话,请在重复使用之前将其写出来。
BufferDesc *LocalBufferAlloc(SMgrRelation smgr, ForkNumber forkNum, BlockNumber blockNum, bool *foundPtr){
BufferTag newTag; /* identity of requested block */
INIT_BUFFERTAG(newTag, smgr->smgr_rnode.node, forkNum, blockNum);
if (LocalBufHash == NULL) InitLocalBuffers(); /* Initialize local buffers if first request in this session */
/* See if the desired buffer already exists */
LocalBufferLookupEnt *hresult = (LocalBufferLookupEnt *)hash_search(LocalBufHash, (void *) &newTag, HASH_FIND, NULL);
BufferDesc *bufHdr; int b; uint32 buf_state;
if (hresult){
b = hresult->id;
bufHdr = GetLocalBufferDescriptor(b);
buf_state = pg_atomic_read_u32(&bufHdr->state);
/* this part is equivalent to PinBuffer for a shared buffer */
if (LocalRefCount[b] == 0){
if (BUF_STATE_GET_USAGECOUNT(buf_state) < BM_MAX_USAGE_COUNT){
buf_state += BUF_USAGECOUNT_ONE;
pg_atomic_unlocked_write_u32(&bufHdr->state, buf_state);
}
}
LocalRefCount[b]++;
ResourceOwnerRememberBuffer(CurrentResourceOwner,BufferDescriptorGetBuffer(bufHdr));
if (buf_state & BM_VALID) *foundPtr = true;
else{/* Previous read attempt must have failed; try again */
*foundPtr = false;
}
return bufHdr;
}
/* Need to get a new buffer. We use a clock sweep algorithm (essentially the same as what freelist.c does now...) */
int trycounter = NLocBuffer;
for (;;){
b = nextFreeLocalBuf;
if (++nextFreeLocalBuf >= NLocBuffer) nextFreeLocalBuf = 0; // 到数组尾部,循环到数组首部
bufHdr = GetLocalBufferDescriptor(b);
if (LocalRefCount[b] == 0){ // 如果RefCount为0,说明可能可以使用
buf_state = pg_atomic_read_u32(&bufHdr->state);
if (BUF_STATE_GET_USAGECOUNT(buf_state) > 0){
buf_state -= BUF_USAGECOUNT_ONE; // 减少bufDesc中state计数
pg_atomic_unlocked_write_u32(&bufHdr->state, buf_state);
trycounter = NLocBuffer;
}else{/* Found a usable buffer */ // bufDesc中state计数为零
LocalRefCount[b]++;
ResourceOwnerRememberBuffer(CurrentResourceOwner,BufferDescriptorGetBuffer(bufHdr));
break;
}
}
else if (--trycounter == 0) ereport(ERROR,(errcode(ERRCODE_INSUFFICIENT_RESOURCES),errmsg("no empty local buffer available")));
}
if (buf_state & BM_DIRTY){ /* this buffer is not referenced but it might still be dirty. if that's the case, write it out before reusing it! */
SMgrRelation oreln;
Page localpage = (char *) LocalBufHdrGetBlock(bufHdr);
oreln = smgropen(bufHdr->tag.rnode, MyBackendId); /* Find smgr relation for buffer */
PageSetChecksumInplace(localpage, bufHdr->tag.blockNum);
/* And write... */
smgrwrite(oreln, bufHdr->tag.forkNum, bufHdr->tag.blockNum, localpage, false);
/* Mark not-dirty now in case we error out below */
buf_state &= ~BM_DIRTY;
pg_atomic_unlocked_write_u32(&bufHdr->state, buf_state);
pgBufferUsage.local_blks_written++;
}
/* lazy memory allocation: allocate space on first use of a buffer. */
if (LocalBufHdrGetBlock(bufHdr) == NULL){ /* Set pointer for use by BufferGetBlock() macro */
LocalBufHdrGetBlock(bufHdr) = GetLocalBufferStorage();
}
/* Update the hash table: remove old entry, if any, and make new one. */
if (buf_state & BM_TAG_VALID){
hresult = (LocalBufferLookupEnt *)hash_search(LocalBufHash, (void *) &bufHdr->tag,HASH_REMOVE, NULL);
if (!hresult) elog(ERROR, "local buffer hash table corrupted");
/* mark buffer invalid just in case hash insert fails */
CLEAR_BUFFERTAG(bufHdr->tag);
buf_state &= ~(BM_VALID | BM_TAG_VALID);
pg_atomic_unlocked_write_u32(&bufHdr->state, buf_state);
}
bool found;
hresult = (LocalBufferLookupEnt *) hash_search(LocalBufHash, (void *) &newTag, HASH_ENTER, &found);
if (found) /* shouldn't happen */
elog(ERROR, "local buffer hash table corrupted");
hresult->id = b;
/* it's all ours now. */
bufHdr->tag = newTag;
buf_state &= ~(BM_VALID | BM_DIRTY | BM_JUST_DIRTIED | BM_IO_ERROR);
buf_state |= BM_TAG_VALID;
buf_state &= ~BUF_USAGECOUNT_MASK;
buf_state += BUF_USAGECOUNT_ONE;
pg_atomic_unlocked_write_u32(&bufHdr->state, buf_state);
*foundPtr = false;
return bufHdr;
}
预取
LocalPrefetchBuffer函数用于initiate异步读取relation数据块,其调用堆栈如下PrefetchBuffer --> LocalPrefetchBuffer。
void LocalPrefetchBuffer(SMgrRelation smgr, ForkNumber forkNum, BlockNumber blockNum) {
#ifdef USE_PREFETCH
BufferTag newTag; /* identity of requested block */
LocalBufferLookupEnt *hresult;
INIT_BUFFERTAG(newTag, smgr->smgr_rnode.node, forkNum, blockNum);
if (LocalBufHash == NULL) InitLocalBuffers(); /* Initialize local buffers if first request in this session */
/* See if the desired buffer already exists */
hresult = (LocalBufferLookupEnt *) hash_search(LocalBufHash, (void *) &newTag, HASH_FIND, NULL);
if (hresult){ /* Yes, so nothing to do */ return; }
/* Not in buffers, so initiate prefetch */
smgrprefetch(smgr, forkNum, blockNum);
#endif /* USE_PREFETCH */
}
删除Local Buffer
MarkLocalBufferDirty函数用于标记local buffer dirty,其调用堆栈如下MarkBufferDirty/MarkBufferDirtyHint --> MarkLocalBufferDirty。
void MarkLocalBufferDirty(Buffer buffer) {
int bufid = -(buffer + 1);;
BufferDesc *bufHdr = GetLocalBufferDescriptor(bufid);
uint32 buf_state = pg_atomic_read_u32(&bufHdr->state);
if (!(buf_state & BM_DIRTY)) pgBufferUsage.local_blks_dirtied++;
buf_state |= BM_DIRTY;
pg_atomic_unlocked_write_u32(&bufHdr->state, buf_state);
}
DropRelFileNodeLocalBuffers函数从缓冲池中删除具有块号>=firstDelBlock的指定关系的所有页面。(特别是,当firstDelBlock=0时,所有页面都会被删除。)脏页面只会被删除,而不必先将它们写出来。因此,这是不可回滚的,因此只能极其小心地使用!This function removes from the buffer pool all the pages of the specified relation that have block numbers >= firstDelBlock. (In particular, with firstDelBlock = 0, all pages are removed.) Dirty pages are simply dropped, without bothering to write them out first. Therefore, this is NOT rollback-able, and so should be used only with extreme caution! 调用堆栈如下DropRelFileNodeBuffers --> DropRelFileNodeLocalBuffers
void DropRelFileNodeLocalBuffers(RelFileNode rnode, ForkNumber forkNum, BlockNumber firstDelBlock) {
for (int i = 0; i < NLocBuffer; i++){
BufferDesc *bufHdr = GetLocalBufferDescriptor(i);
uint32 buf_state = pg_atomic_read_u32(&bufHdr->state);
LocalBufferLookupEnt *hresult;
if ((buf_state & BM_TAG_VALID) && RelFileNodeEquals(bufHdr->tag.rnode, rnode) &&
bufHdr->tag.forkNum == forkNum && bufHdr->tag.blockNum >= firstDelBlock){
if (LocalRefCount[i] != 0) elog(ERROR, "block %u of %s is still referenced (local %u)",bufHdr->tag.blockNum,relpathbackend(bufHdr->tag.rnode, MyBackendId,bufHdr->tag.forkNum), LocalRefCount[i]);
/* Remove entry from hashtable */
hresult = (LocalBufferLookupEnt *)hash_search(LocalBufHash, (void *) &bufHdr->tag,HASH_REMOVE, NULL);
if (!hresult) /* shouldn't happen */
elog(ERROR, "local buffer hash table corrupted");
/* Mark buffer invalid */
CLEAR_BUFFERTAG(bufHdr->tag);
buf_state &= ~BUF_FLAG_MASK;
buf_state &= ~BUF_USAGECOUNT_MASK;
pg_atomic_unlocked_write_u32(&bufHdr->state, buf_state);
}
}
}
DropRelFileNodeAllLocalBuffers function removes from the buffer pool all pages of all forks of the specified relation. 函数调用堆栈DropRelFileNodesAllBuffers --> DropRelFileNodeAllLocalBuffers。
void DropRelFileNodeAllLocalBuffers(RelFileNode rnode) {
for (int i = 0; i < NLocBuffer; i++){
BufferDesc *bufHdr = GetLocalBufferDescriptor(i);
LocalBufferLookupEnt *hresult;
uint32 buf_state = pg_atomic_read_u32(&bufHdr->state);
if ((buf_state & BM_TAG_VALID) && RelFileNodeEquals(bufHdr->tag.rnode, rnode)){
if (LocalRefCount[i] != 0) elog(ERROR, "block %u of %s is still referenced (local %u)",bufHdr->tag.blockNum, relpathbackend(bufHdr->tag.rnode, MyBackendId,bufHdr->tag.forkNum), LocalRefCount[i]);
/* Remove entry from hashtable */
hresult = (LocalBufferLookupEnt *)hash_search(LocalBufHash, (void *) &bufHdr->tag,HASH_REMOVE, NULL);
if (!hresult)/* shouldn't happen */ elog(ERROR, "local buffer hash table corrupted");
/* Mark buffer invalid */
CLEAR_BUFFERTAG(bufHdr->tag);
buf_state &= ~BUF_FLAG_MASK;
buf_state &= ~BUF_USAGECOUNT_MASK;
pg_atomic_unlocked_write_u32(&bufHdr->state, buf_state);
}
}
}