GPDB - 高可用 - 流复制状态
GPDB的高可用基于流复制,通过FTS进行自动故障切换。自动故障切换需要根据primary-mirror流复制的各种状态进行判断。本节就聊聊primary-mirror流复制的各种状态。同样适用于PgSQL
1、WalSndState
typedef enum WalSndState
{
WALSNDSTATE_STARTUP = 0,
WALSNDSTATE_BACKUP,
WALSNDSTATE_CATCHUP,
WALSNDSTATE_STREAMING,
WALSNDSTATE_STOPPING
} WalSndState;
WalSndState保存的是wal sender进程的状态信息,变量值如上代码。
WALSNDSTATE_STARTUP表示启动状态;
WALSNDSTATE_BACKUP表示备份状态
WALSNDSTATE_CATCHUP表示追赶状态
WALSNDSTATE_STREAMING表示流复制状态
WALSNDSTATE_STOPPING表示wal sender即将退出
2、什么时候切换到WALSNDSTATE_STOPPING
1)集群shutdown有三种方式:smart、fast、immediate
三种标记值分别为:
#define SmartShutdown 1
#define FastShutdown 2
#define ImmediateShutdown 3
Smart shutdown:不允许有新连接,待已有连接全部结束后关闭数据库;
Fast shutdown:不允许新连接,向所有活跃的服务进程发送SIGTERM信号,让他们立即退出,之后等待所有子进程退出并关闭数据库
Immediate shutdown:不允许新连接,主进程postgres向所有子进程发送SIGQUIT信号并立即退出,所有子进程也会立即退出。下次启动会回放WAL日志进行恢复。
2)如果shutdown模式不为immediate,则集群shutdown的时候,postgres主进程会向checkpoint进程发送SIGUSR2信号:
3)checkpoint进程的SIGUSR2信号处理函数为ReqShutdownHandler,从上图的代码逻辑可见,ReqShutdownHandler会将shutdown_requested置为true,并唤醒MyLatch。
4)checkpoint进程接着调用ShutdownXLog,然后proc_exit(0)退出checkpoint进程。
5)ShutdownXLog函数调用WalSndInitStopping向所有sender进程发送SIGUSR1信号;然后调用WalSndWaitStopping等待所有sender进程退出,每个10ms判断一次。
6)sender进程SIGUSR1信号处理函数procsignal_sigusr1_handler检查信号来自PROCSIG_WALSND_INIT_STOPPING,然后将got_STOPPING置为true
7)流复制的sender处理完SIGUSR1信号后,继续返回信号前处理流程。Sender的发送日志函数为XLogSendPhysical,此时got_STOOPING已为true,所以调用WalSndSetState将walsnd->state切换到WALSNDSTATE_STOPPING状态,然后调用FTSReplicationStatusUpdateForWalState更新WAL复制状态
8)另外当sender进程从WalSndLoop退出后(replication_active置为false),这个时候,Wal sender进程才接收到信号,HandleWalSndInitStopping中也可以看到,会向自己发送SIGTERM信号,信号处理函数die,即退出进程(因为流复制终止了,不必管它了)。
9)若,sender进程还没从WalSndLoop退出(replication_active置为true),这个时候,Wal sender进程接收到信号,HandleWalSndInitStopping中也可以看到,他会设置got_STOPPING为true,让WAL sender进程发送完WAL后退出WalSndLoop循环后调用proc_exit自行退出。
2、sender进程什么时候退出?
书接上文,产生个问题:WalSndLoop何时退出?若没有shutdown,何时再发起流复制?
Wal sender进程接收到mirror发来的start replication命令后,进入StartReplication开始流复制。
1)WalSndLoop循环中,通过XLogSendPhysical函数不断发送WAL
2)XLogSendPhysical函数发送WAL达到一个时间线的末尾节点位置时,向mirror的receiver进程发送CopyDone消息,即开头为‘c’的消息,并将streamingDoneSending变量改为true
3)receiver进程的入口函数WalReceiverMain,通过walrcv_receive::libpqrcv_receive不断接收WAL日志和消息。当接收到发来的CopyDone消息后返回-1
4)接着,返回到WalReceiverMain函数中,当walrcv_receive返回-1后,一路下来会退出接收消息和日志的循环,并进入walrcv_endstreaming再向primary发送个CopyDone消息
5)primary的ProcessRepliesIfAny处理mirror发来的消息,当接收到CopyDone消息后,将streamingDoneReceiving改为true
6)返回WalSndLoop循环,当streamingDoneSending和streamingDoneReceiving都为true时退出循环
总结一句话:primary发完一个时间线内的WAL,切换下一个时间线时,会退出发送WAL日志的循环stop streaming;当然mirror的receiver进程发起下一个时间线的日志拉取,即再次调用libpqrcv_startstreaming函数向primary发送START_REPLICATION命令后,primary仍旧会再次进入WalSndLoop循环发送WAL日志。
3、什么时候进入WALSNDSTATE_BACKUP?
exec_replication_command:进行基础备份的时候
exec_replication_command:进行基础备份的时候
switch (cmd_node->type){
case T_BaseBackupCmd:
PreventInTransactionBlock(true, "BASE_BACKUP");
SendBaseBackup((BaseBackupCmd *) cmd_node);
| parse_basebackup_options(cmd->options, &opt);
| WalSndSetState(WALSNDSTATE_BACKUP);
| perform_base_backup(&opt);
break;
...
}
进行基础备份,也就是构建mirror的时候进入该状态。
4、什么时候进入WALSNDSTATE_STARTUP?
1)sender进程刚fork出来,InitWalSenderSlot初始化的时候
2)WalSndLoop进程退出后又进入startup状态,因为下个时间线的复制即将开始
3)sender进程遇到ERROR故障,跳回到PostgresMain回退操作处,回退事务后,进入WalSndErrorCleanup,若没有stop则重新设置为startup状态,等待接收start replication命令重新开始复制。
PostgresMain
if (am_walsender)
InitWalSender();//sender进程的初始化
|-- InitWalSenderSlot
|-- for (i = 0; i < max_wal_senders; i++){
| WalSnd *walsnd = &WalSndCtl->walsnds[i];
| SpinLockAcquire(&walsnd->mutex);
| if (walsnd->pid != 0){
| //找一个空闲的slot
| SpinLockRelease(&walsnd->mutex);
| continue;
| }else{
| walsnd->pid = MyProcPid;
| walsnd->state = WALSNDSTATE_STARTUP;
| ...
| break;
| }
| }
|-- on_shmem_exit(WalSndKill, 0);
StartReplication:sender的WalSndLoop退出后又进入startup状态
WalSndLoop(XLogSendLogical);
...
if (got_STOPPING)
proc_exit(0);
WalSndSetState(WALSNDSTATE_STARTUP);
EndCommand("COPY 0", DestRemote);
PostgresMain
//sender进程遇到ERROR报错,sender进程需要再次start replication才能进入传输wal
if (sigsetjmp(local_sigjmp_buf, 1) != 0){
AbortCurrentTransaction();
if (am_walsender)
WalSndErrorCleanup();
|-- if (got_STOPPING || got_SIGUSR2)
| proc_exit(0);
|-- WalSndSetState(WALSNDSTATE_STARTUP);
...
for (;;){
firstchar = ReadCommand(&input_message);
switch (firstchar){
case 'Q':
{
if (am_walsender){
if (!exec_replication_command(query_string))
exec_simple_query(query_string);
}else if (am_ftshandler)
HandleFtsMessage(query_string);
else if (am_faulthandler)
HandleFaultMessage(query_string);
else
exec_simple_query(query_string);
send_ready_for_query = true;
break;
}
case 'M':
...
}
}
5、什么时候进入WALSNDSTATE_CATCHUP?
开始流复制前,设置成catchup状态。
StartReplication:开始流复制前
WalSndSetState(WALSNDSTATE_CATCHUP);
/* Send a CopyBothResponse message, and start streaming */
pq_beginmessage(&buf, 'W');
pq_sendbyte(&buf, 0);
pq_sendint16(&buf, 0);
pq_endmessage(&buf);
pq_flush();
WalSndLoop(XLogSendLogical);
...
6、什么时候进入WALSNDSTATE_STREAMING?
当前时间线内没有要发送的日志了,并且没有下一个时间线需要切换发送日志,则将其改为streaming状态。
WalSndLoop
for (;;){
if (!pq_is_send_pending())
send_data();
else
WalSndCaughtUp = false;
...
//现在没有要发送的了
if (WalSndCaughtUp && !pq_is_send_pending()){
if (MyWalSnd->state == WALSNDSTATE_CATCHUP)
WalSndSetState(WALSNDSTATE_STREAMING);
}
...
}