文章目录
- 一、物理复制概述
- 二、同步流复制
- 三、pg_basebackup原理
- 源码剖析
- 主流程
- 具体过程
- 四、主从原理
- 基本介绍
- 1、 WAL日志文件复制
- 2、流复制(Streaming Replication)
- 主从原理
- 主从间的通信
- walsender与walreceiver过程
- 发生故障时的行为
- 扩展问题
一、物理复制概述
数据库服务器通常都允许存在一个与主库同步的在线备数据库服务器,当主数据库服务器出故障时,备数据库服务器就可以快速提升为主服务器并提供服务,从而实现数据库服务的高可用。此外,还允许多台数据库服务器同时提供负载均衡服务。因为数据库内部记录的是数据,当多台数据库同时提供服务时,不会像WEB服务器那么简单,而是存在着数据同步等问题。通常是一台主数据库提供读写,然后把数据同步到另一台备数据库上,这台备数据库不断应用(apply)从主数据库发来的变化数据。这台备服务器不能提供写服务,只提供只读服务。在 PostgreSQL中能提供读写全功能的服务器,称为Primary database 或 masterdatabase;若备份数据库在接收主数据库同步数据和应用同步数据时,不能提供只读服务,则该备份数据库称之为 warm standby server ;而如果在接收同步数据和应用同步数据时也能提供只读操作,则称该备份数据库为 hot standby server。hot standby 功能是 PostgreSOL 9.在 PostgreSQL 9.0 之前的版本中,没有流复制的功能,基本只能一个个传送 WAL 日志文件(除非使用第三方的软件),所以备库最少比主库落后一个 WAL日志文件,当故障出现后,使用Standby 数据库接管数据库服务时,丢失的数据会比较多。PostgreSQL9.0后,提供了流复制功能,实现了主库产生一点日志后,就会马上传送到备库上去,从而一般只丢失最多几秒的数据PostgreSOL9.1中,流复制的功能得到了进一步的提升,实现了同步复制的功能,这样主备切换后,就不存在数据丢失的问题。有人会问,如果同步复制,当备库出现问题时,会不会导致主库也会被 hang 住?通常会导致这个问题,但 PostgreSQL 提供了多个 Standby 数据库的功能,如配置两个 Standby 数据库,当其中一个 Standby 数据库坏掉时,主数据库不会被 hang 住,两个备数据库都出现问题时,才会导致主数据库不能写。PostgreSQL9.2之后,增加了级连复制的功能,也就是一个 Standby 数据库后面可以再级连另一个 Standby 数据库,也就是说其他 Standby数据库不必都从主数据库上传递 WAL 日志,而是可以从 Standby 数据库传递 WAL 日志。
二、同步流复制
同步流复制的架构
PostgreSQL 的流复制是异步的,异步的缺点是 Standby上的数据落后于主库上的数据如果使用 Hot Standby 做读写分离,就会存在数据一致性的问题,这对于一些一致性较高的应用来说是不可接受的。所以 PostgreSQL 从 9.1版本之后提供了同步流复制的架构。同步复制要求在数据写人 Standby 数据库后,事务的 commit 才返回,所以 Standby 库出现问题时,会导致主库被 hang 住。解决这个问题的方法是启动两个 Standby 数据库,这两个 Standby数据库只要有一个是正常的,就不会让主库 hang住。所以在实际应用中,同步流复制,总是有1个主库和2个以上的 Standby 库。
关于配置过程这里不再详细介绍,只说一下同步相对异步需要注意的点:
- 需要在主库的
postgresql.conf
文件中设置synchronous_commit
参数为on
或remote_write
、remote_apply
,以及设置synchronous_standby_names
参数来指定一个或多个从库的application_name
,这些从库将参与同步复制。
以下主要介绍同步流复制之间一些要注意的点:
假设有db01,db02,db03三个主机,db01作主库,其余两个作备库。
下面测试同步复制功能。
先关掉一台 Standby(db02),看主库是否能正常工作,命令如下:
osdba@db02:~spgstop
waiting for server to shut down… done
server stopped
然后到主库上做如下操作:
postgres=# insert into test0l values(l,‘1111’);
INSERT 01postgres=#
可以看到,当一台 Standby 损坏时,主库是不受影响的。再关掉一台 Standby(db03 ),看主库是否能正常工作,命令如下:
osdba@db03:~$pgstop
waiting for server to shut down…
server stopped
在主库上做如下操作:
postgres=# select *from test01;
id | note
----十------
1 | 1111
(1 row)
postgres=# insert into test0l values(2,‘2222’);
可以发现主库的非更新查询都是正常的,但更新操作都被hang住了。这时再启动一台Standby(db02),命令如下:
osdba@db02:~$ pgstart
server starting
可以发现主库 hang 住的操作可以继续下去了,如下:
postgres=# insert into test01 values(2,‘2222’);
INSERT O1
postgres=#
可以发现 db03 又从“sync”状态变成了“potential”,db02 重新变成了"同步状态"。
三、pg_basebackup原理
建 Standby 的过程可分为两个大步骤,第一个大步骤是生成一个基础备份,第二个大步骤是把基本备份拷贝到备机上,配置相关文件和参数把备库启动在 Standby 模式下,这样就完成了 Standby 库的搭建。
生成基础备份的步骤如下:
1)以数据库超级用户身份连接到数据库,发出命令:
SELECT pg_start_backup(‘label’);
2)执行备份。使用任何方便的文件系统工具比如tar,或直接把数据目录复制下来。这些操作过程中既不需要关闭数据库,也不需要停止数据库的任何操作。
3)再次以数据库超级用户身份连接数据库,然后发出命令:
SELECT pg_stop_backup();
这将中止备份模式并自动切换到下一个 WAL段。 自动切换是为了让在备份间隔中写人的最后一个 WAL, 段文件可以立即为下次备份做好准备。
4)拷贝备份过程中产生的 WAL 日志文件:
在上面的步骤中,有人可能会问为什么用备份数据库前需要执行 pg_start_backup()?
实际上 pg_start_backup()主要做了以下两个工作:
- 设置写日志标志为:XLogCtl->Insert.forcePageWrites=true,也就是把这个标志设置为true 后,数据库会把变化的整个数据块都记录到数据库中,而不仅仅是块中记录的变化。
- 强制发生一次 checkpoint 点。
为什么要强制 WAL 日志把整个块都写人 WAL, 中呢?想象一下:如果用 cp 命令拷贝文件时,数据库在拷贝的同时也在写这个文件,那么这就会出现一个数据块里有两种操作,数据库正在写、cp 命令正在读,这样有可能使得拷贝的数据块前半部分是新数据,后半部分是旧数据,也就是说单个数据块的数据不一致。这时,如果后面使用 WAL 日志把数据推到一个一致点时,由于 WAL 日志中只记录块中行的变化,因此会导致不一致块无法恢复。但如果WAL 日志中记录的是整个新数据块的内容,那么重演 WAL 日志时,会把整个新块的内容写到块中,这样块的内容就达到一致点了。
强制发生一次 checkpoint,也是为了把前面的脏数据都刷到磁盘中,这样从这之后产生的日志就记录整个数据块,可以保证恢复的正确。
实际上 PostgreSQL为了方便,已经提供了一个命令行工具 pg basebackup 来完成上面基础备份的步骤。
物理备份现在常用的方法是使用pg_basebackup就行备份
pg_basebackup -D backup -Ft -z -P
如果没有指定备份的ip地址、端口、用户的情况下,就是对本地备份,和直接执行psql一样。
源码剖析
涉及到的代码主要在src/backend/replication
以及bin/pg_basebackup
中。
其实,pg_basebackup工具就是对底层API的封装,其主要过程是相同的,但具体到代码,并不是直接调用的pg_start_backup,pg_stop_backup函数,而是通过一些命令的形式,这些特殊的命令定义在src/backend/replication/repl_gram.y中,后面我们会进行分析。
主流程
pg_basebackup执行基础备份的主要流程如下,其中,涉及到libpq协议与服务端进行连接,通信,向服务端发送一些特殊的命令语句,这些命令的解析在src/backend/replication/repl_gram.y
中可以查看到具体的语法定义。主流程如下:
main(int argc, char **argv)
--> GetConnection(); // 连接服务端,(例如主节点)
--> PQconnectdbParams(keywords, values, true);
--> BaseBackup(); // 执行基础备份
--> GenerateRecoveryConfig(conn, replication_slot); // 用于生成primary_conninfo配置信息
--> PQconninfo(pgconn);
--> RunIdentifySystem(conn, &sysidentifier, &latesttli, NULL, NULL)
basebkp = psprintf("BASE_BACKUP LABEL '%s' %s %s %s %s %s %s %s %s %s",
escaped_label,
estimatesize ? "PROGRESS" : "",
includewal == FETCH_WAL ? "WAL" : "",
fastcheckpoint ? "FAST" : "",
maxrate_clause ? maxrate_clause : "",
format == 't' ? "TABLESPACE_MAP" : "",
verify_checksums ? "" : "NOVERIFY_CHECKSUMS",
manifest_clause ? manifest_clause : "",
manifest_checksums_clause);
// 向服务端发送执行命令"BASE_BACKUP LABEL 'pg14bak' PROGRESS NOWAIT MANIFEST 'yes' "
--> PQsendQuery(conn, basebkp)
--> PQgetResult(conn); // Get the starting WAL location
--> StartLogStreamer(xlogstart, starttli, sysidentifier);
--> CreateReplicationSlot(param->bgconn, replication_slot, NULL, temp_replication_slot, true, true, false)
// 执行命令 "CREATE_REPLICATION_SLOT \"pg_basebackup_2553309\" TEMPORARY PHYSICAL RESERVE_WAL"
--> PQexec(conn, query->data);
// Start a child process and tell it to start streaming.
// 创建一个单独的子进程用于日志传输
bgchild = fork();
if (bgchild == 0)
{
/* in child process */
LogStreamerMain(param);
--> ReceiveXlogStream(param->bgconn, &stream) // Receive a log stream starting at the specified position.
}
if (!writing_to_stdout && manifest)
ReceiveBackupManifest(conn); // receive backup manifest
整个的过程,最重要的有3点:
- 在进行备份前,执行一次checkpoint,记录开始的位置,在服务端接收到BASE_BACKUP LABLE命令后,生成备份标签文件backup_lable,这个文件最重要的作用是记录数据库恢复的起始位置。当启动备份实例时,会读该文件进行恢复。
- 复制数据库数据文件
- 日志复制
我们可以看一下backup_lable
文件中的内容:
postgres@postgres:~/pgsql/pgbak$ cat backup_label
START WAL LOCATION: 0/C000028 (file 00000001000000000000000C) 备份开始时日志的位置
CHECKPOINT LOCATION: 0/C000060 检查点的位置
BACKUP METHOD: streamed 备份方法
BACKUP FROM: primary 备份源
START TIME: 2024-07-26 17:15:58 CST 备份开始的物理时间
LABEL: pg14bak 备份标签
START TIMELINE: 1
具体过程
仅看上面的主流程还是有一些不清楚的地方的。
这里有个很重要的命令BASE_BACKUP LABEL
,备份命令,获取XLOG的存放路径和备份开始时日志的位置,那么服务端这块是怎么处理的呢?我们看一下服务端的相关代码:
void PostgresMain(int argc, char *argv[], const char *dbname, const char *username)
{
// ...
if (am_walsender)
WalSndSignals();
/* Perform initialization specific to a WAL sender process. */
if (am_walsender)
InitWalSender();
for (;;)
{
firstchar = ReadCommand(&input_message);
switch (firstchar)
{
case 'Q': /* simple query */
{
const char *query_string;
/* Set statement_timestamp() */
SetCurrentStatementStartTimestamp();
query_string = pq_getmsgstring(&input_message);
pq_getmsgend(&input_message);
if (am_walsender)
{
if (!exec_replication_command(query_string))
exec_simple_query(query_string);
}
else
exec_simple_query(query_string);
send_ready_for_query = true;
}
break;
}
}
/* Execute an incoming replication command.*/
bool exec_replication_command(const char *cmd_string)
{
// ...
/* Looks like a WalSender command, so parse it. */
parse_rc = replication_yyparse();
if (parse_rc != 0)
ereport(ERROR,
(errcode(ERRCODE_SYNTAX_ERROR),
errmsg_internal("replication command parser returned %d",
parse_rc)));
switch (cmd_node->type)
{
case T_IdentifySystemCmd:
cmdtag = "IDENTIFY_SYSTEM";
set_ps_display(cmdtag);
IdentifySystem();
EndReplicationCommand(cmdtag);
break;
case T_BaseBackupCmd:
cmdtag = "BASE_BACKUP";
set_ps_display(cmdtag);
PreventInTransactionBlock(true, cmdtag);
SendBaseBackup((BaseBackupCmd *) cmd_node);
EndReplicationCommand(cmdtag);
break;
case T_CreateReplicationSlotCmd:
cmdtag = "CREATE_REPLICATION_SLOT";
set_ps_display(cmdtag);
CreateReplicationSlot((CreateReplicationSlotCmd *) cmd_node);
EndReplicationCommand(cmdtag);
break;
case T_DropReplicationSlotCmd:
cmdtag = "DROP_REPLICATION_SLOT";
set_ps_display(cmdtag);
DropReplicationSlot((DropReplicationSlotCmd *) cmd_node);
EndReplicationCommand(cmdtag);
break;
case T_StartReplicationCmd:
{
StartReplicationCmd *cmd = (StartReplicationCmd *) cmd_node;
cmdtag = "START_REPLICATION";
set_ps_display(cmdtag);
PreventInTransactionBlock(true, cmdtag);
if (cmd->kind == REPLICATION_KIND_PHYSICAL)
StartReplication(cmd);
else
StartLogicalReplication(cmd);
/* dupe, but necessary per libpqrcv_endstreaming */
EndReplicationCommand(cmdtag);
Assert(xlogreader != NULL);
break;
}
case T_TimeLineHistoryCmd:
cmdtag = "TIMELINE_HISTORY";
set_ps_display(cmdtag);
PreventInTransactionBlock(true, cmdtag);
SendTimeLineHistory((TimeLineHistoryCmd *) cmd_node);
EndReplicationCommand(cmdtag);
break;
case T_VariableShowStmt:
{
DestReceiver *dest = CreateDestReceiver(DestRemoteSimple);
VariableShowStmt *n = (VariableShowStmt *) cmd_node;
cmdtag = "SHOW";
set_ps_display(cmdtag);
/* syscache access needs a transaction environment */
StartTransactionCommand();
GetPGVariable(n->name, dest);
CommitTransactionCommand();
EndReplicationCommand(cmdtag);
}
break;
default:
elog(ERROR, "unrecognized replication command node tag: %u",
cmd_node->type);
}
// ...
}
我们继续看一下SendBaseBackup
函数的实现。
/*
* SendBaseBackup() - send a complete base backup.
*
* The function will put the system into backup mode like pg_start_backup()
* does, so that the backup is consistent even though we read directly from
* the filesystem, bypassing the buffer cache.
*/
void SendBaseBackup(BaseBackupCmd *cmd)
{
basebackup_options opt;
SessionBackupState status = get_backup_status();
if (status == SESSION_BACKUP_NON_EXCLUSIVE)
ereport(ERROR,
(errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
errmsg("a backup is already in progress in this session")));
parse_basebackup_options(cmd->options, &opt);
WalSndSetState(WALSNDSTATE_BACKUP);
if (update_process_title)
{
char activitymsg[50];
snprintf(activitymsg, sizeof(activitymsg), "sending backup \"%s\"",
opt.label);
set_ps_display(activitymsg);
}
perform_base_backup(&opt);
}
主流程如下:
SendBaseBackup(BaseBackupCmd *cmd)
--> perform_base_backup(&opt);
// creates the necessary starting checkpoint and constructs the backup label file.
--> do_pg_start_backup(opt->label, opt->fastcheckpoint, &starttli, labelfile, &tablespaces, tblspc_map_file);
--> RequestCheckpoint(CHECKPOINT_FORCE | CHECKPOINT_WAIT | (fast ? CHECKPOINT_IMMEDIATE : 0));
--> CreateCheckPoint(flags | CHECKPOINT_IMMEDIATE);
四、主从原理
基本介绍
PostgrepSQL在数据目录的子目录pg_xlog子目录中维护了一个WAL日志文件,可以把WAL日志备份到另外一台备份服务器,通过重做WAL日志的方式在备服务器上恢复数据(类似Oracle的redo日志)。
WAL日志复制到另外一台备份服务器可以有两种方式:
1、 WAL日志文件复制
此种方式是写完一个WAL日志后,才把WAL日志文件拷贝到备份数据库中。这样通常备份会落后主库一个WAL日志文件,当主数据库发生故障时,主数据库的WAL文件并没有填充完毕未传输(默认16MB)、或者时延等原因导致WAL文件没有传输完毕,会导致被数据库可能存在一定的数据丢失。此种方式是postgreSQL9.当潜在或异步备库发生故障时,主库会终止连接到故障备库的walsender进程,并继续进行所有处理。换而言之,主库上的事务处理不会受到这两种备库的影响。
采用此方式的WAL复制,需要:
- 主数据库的wal_level配置为archive或以上。
- PostgreSQL 9.1之后提供了一个很方便的工具pg_basebackup,使用完成一次基础备份到备数据库。
- 后续产生WAL文件,可以通过archive_command参数调度命令传输至备机。
2、流复制(Streaming Replication)
流复制是PostgreSQL 9.0之后才提供的新的传递WAL日志的方法。通过流复制,备库不断的从主库同步相应的数据,并在备库apply每个WAL record,这里的流复制每次传输单位是WAL日志的record。它的好处是只要主库一产生日志,就会马上传递到备库,同WAL日志文件相比有更低同步延迟。
同时PostgreSQL9.0之后提供了Hot Standby能力,备库在应用WAL record的同时也能够提供只读服务。
WAL流复制支持同步、异步方式:
- 异步流复制模式中,主库提交的事务不会等待备库接收WAL日志流并返回确认信息,因此异步流复制模式下主库与备库的数据版本上会存在一定的处理延迟,延迟的时间主要受主库压力、备库主机性能、网络带宽等影响,当正常情况下,主备的延迟通常在毫秒级的范围内,当主库宕机,这个延迟就主要受到故障发现与切换时间的影响而拉长,不过虽然如此,这些数据延迟的问题,可以从架构或相关自动化运维手段不断优化设置。
- 同步流复制模式中,要求主库把WAL日志写入磁盘,同时等待WAL日志记录复制到备库、并且WAL日志记录在任何一个备库写入磁盘后,才能向应用返回Commit结果。一旦所有备库故障,在主库的应用操作则会被挂起,所以此方式建议起码是1主2备。
主从原理
PG主备流复制的核心由三个进程组成:
- walsender:用于主库发送WAL日志记录至从库
- walreceiver:用于从库接收主库的WAL日志记录
- startup:用于从库apply日志
先简单了解一下基本流程
(1)启动主、备服务器
(2)备节点启动startup进程
(3)备节点启动walreceiver进程
(4)walreceiver进程向主节点发送连接请求,如果主库尚未启动,walreceiver会定期重发该请求
(5)当主节点收到连接请求时,将启动walsender进程,并建立walsender与walreceiver之间的TCP连接
(6)walreceiver发送备节点最新的LSN,这个阶段在IT领域称为握手机制
(7)如果备库最新LSN小于主库最新LSN(落后),walsender会将前一个LSN到后一个LSN之间的wal数据发送到walreceiver。这个阶段就是备库追赶主库的阶段。
(8)流复制开始工作
主从间的通信
- 后端进程通过执行函数
XLogInsert()
和XLogFlush()
,将WAL数据写入并刷新到WAL段文件中。- walsender进程将写入WAL段文件的WAL数据发送到walreceiver进程。
- 在发送WAL数据之后,后端进程继续等待来自备库的ACK响应。更确切地说,后端进程通过执行内部函数
SyncRepWaitForLSN()
来获取锁存器(latch),并等待它被释放。- 备库上的walreceiver通过
write()
系统调用,将接收到的WAL数据写入备库的WAL段,并向walsender返回ACK响应。- walreceiver通过系统调用(例如
fsync()
)将WAL数据刷新到WAL段中,向walsender返回另一个ACK响应,并通知**启动进程(startup process )**相关WAL数据的更新。- 启动进程重放已写入WAL段的WAL数据。
- walsender在收到来自walreceiver的ACK响应后释放后端进程的锁存器,然后,后端进程完成
commit
或abort
动作。 锁存器释放的时间取决于参数synchronous_commit
。如果它是'on'
(默认),当接收到步骤(5)的ACK时,锁存器被释放。而当它是'remote_write'
时,接收到步骤(4)的ACK时,即被释放。
每个ACK响应将备库的内部信息通知给主库。包含以下四个项目:
- 已写入最新WAL数据的LSN位置。
- 已刷新最新WAL数据的LSN位置。
- 启动进程已经重放最新的WAL数据的LSN。
- 发送此响应的时间戳。
/* XLogWalRcvSendReply(void) */
/* src/backend/replication/walreceiver.c */
/* 构造一条新消息 */
reply_message.write = LogstreamResult.Write;
reply_message.flush = LogstreamResult.Flush;
reply_message.apply = GetXLogReplayRecPtr();
reply_message.sendTime = now;
/* 为消息添加消息类型,并执行发送 */
buf[0] = 'r';
memcpy(&buf[1], &reply_message, sizeof(StandbyReplyMessage));
walrcv_send(buf, sizeof(StandbyReplyMessage) + 1);
walreceiver不仅在写入和刷新WAL数据时返回ACK响应,而且还定期发送备库的心跳响应。因此,主库始终掌握所有连接备库的状态。
心跳的间隔设置为参数wal_receiver_status_interval
,默认为10秒。
执行如下查询,可以显示所连接备库的相关LSN信息。
testdb=# SELECT application_name AS host,
write_location AS write_LSN, flush_location AS flush_LSN,
replay_location AS replay_LSN FROM pg_stat_replication;
host | write_lsn | flush_lsn | replay_lsn
----------+-----------+-----------+------------
standby1 | 0/5000280 | 0/5000280 | 0/5000280
standby2 | 0/5000280 | 0/5000280 | 0/5000280 (2 rows)
walsender与walreceiver过程
主要分为以下几个流程:
①主备数据库启动,备库启动startup进程,备库启动walreceiver进程,wal进程向主库发送连接请求。
②主库收到连接请求后启动walsender进程,并与walreceiver进程建立tcp连接。
③备库walreceiver进程发送最新的wal lsn给主库。
④主库进行lsn对比,定期向备库发送心跳信息来确认备库可用性,并且将没有传递的wal日志进行发送,同时调用SyncRepWaitForLSN()函数来获取锁存器,并且等待备库响应,锁存器的释放时机和主备同步模式的选择有关。
④备库调用操作系统write()函数将wal写入缓存,然后调用操作系统fsync()函数将wal刷新到磁盘,然后进行wal回放。同时备库向主库返回ack信息,ack信息中包含write_lsn、flush_lsn、replay_lsn,这些信息会发送给主库,用以告知主库当前wal日志在备库的应用位置及状态,相关位置信息可以通过pg_stat_replication视图查看。
⑤如果启用了hot_standby_feedback参数,备库会定期向主库发送xmin信息,用以保证主库不会vacuum掉备库需要的元组信息。
发生故障时的行为
我们再来看看当从库发生故障时主库的表现。
- 当潜在或异步备库发生故障时,主库会终止连接到故障备库的walsender进程,并继续进行所有处理。换而言之,主库上的事务处理不会受到这两种备库的影响。
- 当同步备库发生故障时,主库将终止连接到故障备库的walsender进程,并使用具有最高优先级的潜在备库替换首要同步备库。与上述的故障相反,主库将会暂停从失效点到成功替换同步备库之间的查询处理。
扩展问题
备节点长期停机再启动后,会发生什么?
-
9.4以前,如果备节点请求的wal段在主节点已被覆盖,那么备节点将无法追上主节点。这个问题没有什么好的解决方案,只能把wal_keep_segments参数增大,减少发生的可能性。
-
9.4开始,这个问题可以使用复制槽(replication slot)来预防——通过暂停walreceiver进程,将含有未发送wal段的pg_xlog保存在复制槽中。复制槽可提高wal数据发送灵活性性,主要用于逻辑复制。