信号处理整体流程
- 信号从bgworker发出后,主进程将ParallelMessagePending置为true,下次CHECK_FOR_INTERRUPTS()时,会进入信号处理逻辑中:HandleParallelMessages。
- 进入信号处理逻辑后,首先遍历所有现存的ParallelContext,每一个ParallelContext都是由CreateParallelContext创建出来的,代表一个并行逻辑单元。例如vacuum有一个ParallelContext、并行执行计划也会创建一个ParallelContext。
- 每一个ParallelContext中,都会管理多个并行bgworker进程,而每一个bgworker都会提前申请一个共享内存mq,也就是error_mqh,具体在
ParallelContext->worker[i].error_mqh
中。 - 开始遍历每个进程的mq,看其中是否存在消息,如果有消息则安下面HandleParallelMessage逻辑处理掉即可。
信号处理逻辑
static void
HandleParallelMessage(ParallelContext *pcxt, int i, StringInfo msg)
{
char msgtype;
if (pcxt->known_attached_workers != NULL &&
!pcxt->known_attached_workers[i])
{
pcxt->known_attached_workers[i] = true;
pcxt->nknown_attached_workers++;
}
msgtype = pq_getmsgbyte(msg);
switch (msgtype)
{
case 'K': /* BackendKeyData */
{
int32 pid = pq_getmsgint(msg, 4);
(void) pq_getmsgint(msg, 4); /* discard cancel key */
(void) pq_getmsgend(msg);
pcxt->worker[i].pid = pid;
break;
}
case 'E': /* ErrorResponse */
case 'N': /* NoticeResponse */
{
ErrorData edata;
ErrorContextCallback *save_error_context_stack;
/* Parse ErrorResponse or NoticeResponse. */
pq_parse_errornotice(msg, &edata);
/* Death of a worker isn't enough justification for suicide. */
edata.elevel = Min(edata.elevel, ERROR);
/*
* If desired, add a context line to show that this is a
* message propagated from a parallel worker. Otherwise, it
* can sometimes be confusing to understand what actually
* happened. (We don't do this in DEBUG_PARALLEL_REGRESS mode
* because it causes test-result instability depending on
* whether a parallel worker is actually used or not.)
*/
if (debug_parallel_query != DEBUG_PARALLEL_REGRESS)
{
if (edata.context)
edata.context = psprintf("%s\n%s", edata.context,
_("parallel worker"));
else
edata.context = pstrdup(_("parallel worker"));
}
/*
* Context beyond that should use the error context callbacks
* that were in effect when the ParallelContext was created,
* not the current ones.
*/
save_error_context_stack = error_context_stack;
error_context_stack = pcxt->error_context_stack;
/* Rethrow error or print notice. */
ThrowErrorData(&edata);
/* Not an error, so restore previous context stack. */
error_context_stack = save_error_context_stack;
break;
}
case 'A': /* NotifyResponse */
{
/* Propagate NotifyResponse. */
int32 pid;
const char *channel;
const char *payload;
pid = pq_getmsgint(msg, 4);
channel = pq_getmsgrawstring(msg);
payload = pq_getmsgrawstring(msg);
pq_endmessage(msg);
NotifyMyFrontEnd(channel, payload, pid);
break;
}
case 'X': /* Terminate, indicating clean exit */
{
shm_mq_detach(pcxt->worker[i].error_mqh);
pcxt->worker[i].error_mqh = NULL;
break;
}
default:
{
elog(ERROR, "unrecognized message type received from parallel worker: %c (message length %d bytes)",
msgtype, msg->len);
}
}
}
信号是在哪里发出的?
每一个bgworker都会进入ParallelWorkerMain函数,在ParallelWorkerMain中会根据执行情况发出不同的信号:
例如:
void
ParallelWorkerMain(Datum main_arg)
{
发'K'
告知主进程当前子进程的PID。
pq_beginmessage(&msgbuf, 'K');
pq_sendint32(&msgbuf, (int32) MyProcPid);
pq_sendint32(&msgbuf, (int32) MyCancelKey);
pq_endmessage(&msgbuf);
发'X'
告知主进程当前子进程正常退出了。
pq_putmessage('X', NULL, 0);
}
还有一种情况,bgworker里面执行的逻辑如果出错了怎么办?
会走elog的底层逻辑,给这个mq发'E'
或'N'
,代码逻辑在这里:
error mq的内存是在哪里申请的?
和其他dsm中的共享内存结构一样,先调toc分配一段空间,然后初始化这段空间,然后调toc插入。后面用的时候用toc find函数用key来找到这段空间即可,key就是PARALLEL_KEY_ERROR_QUEUE。