一、 准备工作
1. Publication(发布)节点
- postgersql.conf
wal_level = logical
max_replication_slots = 8 #每个订阅需要消耗一个slot
max_wal_senders = 10 #每个订阅需要使用一个wal sender
max_worker_processes=128 #必须 >=max_wal_senders加并行计算进程,或者其他插件需要fork的进程数。
- pg_hba.conf
host replication repuser 192.0.0.0/8 md5
2. Subscription(订阅)节点
- 初始化环境
initdb -D $PGDATA -E UTF8 --locale=en_US.utf8
pg_ctl –D $PGDATA start
- postgersql.conf
max_replication_slots = 8
max_logical_replication_workers = 8 # 保证不低于该实例总共需要创建的订阅数
max_worker_processes=128 # 大于等于max_logical_replication_workers + 1 + CPU并行计算 + 其他插件需要fork的进程数.
二、 发布与订阅
1. 创建测试表并设置replica identity
create table test(id int primary key, info text, crt_time timestamp);
\d test
alter table test replica identity using index test_pkey;
\d test
2. 发布表
create publication pub1 for table test;
select * from pg_publication;
3. 订阅端创建订阅
注意整句基本都是要改的
create subscription sub_from_pub1 connection 'host=192.168.0.108 port=5432 user=postgres password=postgres' publication pub1;
4. 发布端插入数据
insert into test select generate_series(1, 10), 'test', now();
5. 订阅端查询
select * from test;
三、 相关视图
1. 发布端信息 pg_publication
PostgreSQL: Documentation: 14: 52.39. pg_publication
postgres=# select * from pg_publication;
-[ RECORD 1 ]+------
oid | 16559
pubname | pub1
pubowner | 10
puballtables | f
pubinsert | t
pubupdate | t
pubdelete | t
pubtruncate | t
pubviaroot | f
2. 订阅端信息pg_subscription
PostgreSQL: Documentation: 14: 52.52. pg_subscription
postgres=# select * from pg_subscription;
-[ RECORD 1 ]---+-------------------------------------------------------------
oid | 16392
subdbid | 13893
subname | sub_from_pub1
subowner | 10
subenabled | t
subbinary | f
substream | f
subconninfo | host=192.168.0.108 port=5432 user=postgres password=postgres
subslotname | sub_from_pub1
subsynccommit | off
subpublications | {pub1}
3. 复制状态 pg_stat_replication
postgres=# select * from pg_stat_replication ;
-[ RECORD 1 ]----+------------------------------
pid | 19998
usesysid | 10
usename | postgres
application_name | sub_from_pub1
client_addr | 192.168.0.109
client_hostname |
client_port | 40421
backend_start | 2023-01-26 12:56:22.065934+08
backend_xmin |
state | streaming
sent_lsn | 0/3002D928
write_lsn | 0/3002D928
flush_lsn | 0/3002D928
replay_lsn | 0/3002D928
write_lag |
flush_lag |
replay_lag |
sync_priority | 0
sync_state | async
reply_time | 2023-01-26 12:57:14.031458+08
查询逻辑复制延迟
select state,
backend_start,
pg_size_pretty(pg_wal_lsn_diff(pg_current_wal_insert_lsn(), sent_lsn)) send_gap,
pg_size_pretty(pg_wal_lsn_diff(pg_current_wal_insert_lsn(), replay_lsn)) apply_gap,
sent_lsn,
write_lsn,
flush_lsn,
replay_lsn,
sync_state,
reply_time
from pg_stat_replication;
-[ RECORD 1 ]-+------------------------------
state | streaming
backend_start | 2023-01-26 12:56:22.065934+08
send_gap | 0 bytes
apply_gap | 0 bytes
sent_lsn | 0/3002E178
write_lsn | 0/3002E178
flush_lsn | 0/3002E178
replay_lsn | 0/3002E178
sync_state | async
reply_time | 2023-01-26 13:48:59.898499+08
4. 复制槽信息 pg_replication_slots
postgres=# select * from pg_replication_slots;
-[ RECORD 1 ]-------+--------------
slot_name | sub_from_pub1
plugin | pgoutput
slot_type | logical
datoid | 13893
database | postgres
temporary | f
active | t
active_pid | 19998
xmin |
catalog_xmin | 870
restart_lsn | 0/3002D8F0
confirmed_flush_lsn | 0/3002D928
wal_status | reserved
safe_wal_size |
two_phase | f
5. 查看订阅端状态 pg_stat_subscription
其中最主要的是订阅端接收的lsn信息
PostgreSQL: Documentation: 14: 28.2. The Statistics Collector
select postgres-# pg_size_pretty(pg_wal_lsn_diff(received_lsn, latest_end_lsn)),
* from pg_stat_subscription;
-[ RECORD 1 ]---------+------------------------------
pg_size_pretty | 0 bytes
subid | 16392
subname | sub_from_pub1
pid | 8553
relid |
received_lsn | 0/3002DA10
last_msg_send_time | 2023-01-26 12:59:36.582129+08
last_msg_receipt_time | 2023-01-26 12:59:38.393401+08
latest_end_lsn | 0/3002DA10
latest_end_time | 2023-01-26 12:59:36.582129+08
6. 查看订阅端replay进度 pg_replication_origin_status
-
remote_lsn:已复制到订阅端的发布端 lsn
-
local_lsn:本地已持久化(写入事务日志文件)的lsn,只有在此之前的脏页才能刷入磁盘
PostgreSQL: Documentation: 14: 52.80. pg_replication_origin_status
postgres=# select * from pg_replication_origin_status;
local_id | external_id | remote_lsn | local_lsn
----------+-------------+------------+-----------
1 | pg_16392 | 0/3002E058 | 0/170E998
四、 设置全部发布
1. 发布端
postgres=# drop publication pub1 ;
DROP PUBLICATION
postgres=# create publication pub1 for all tables ;
CREATE PUBLICATION
发布all table的情况下,测试新建一个表并插入数据,看订阅端能否识别
postgres=# create table test5(id int, crt_time timestamp);
CREATE TABLE
postgres=# insert into test5 values(1, now());
INSERT 0 1
2. 订阅端
可以看到数据没有自动同步
postgres=# create table test5(id int, crt_time timestamp);
CREATE TABLE
postgres=# select * from test5;
id | crt_time
----+----------
(0 rows)
刷新订阅端
alter subscription sub_from_pub1 refresh publication;
发布了所有表订阅端需要先创建对应表,或者把发布端无用表删除,否则会报错。
再次查询
参考
PGCA课程《第46-47讲PostgreSQL 逻辑复制》