文章目录
- 0.简介
- 1.PG通讯协议
- 1.1 消息格式
- 1.2 消息交互流程
- 1.2.1 启动流程
- 1.2.2 简单查询流程
- 1.2.3 扩展查询
- 1.2.3.1 pipelining
- 1.2.4 取消流程
- 1.2.5 结束流程
- 1.2.6 copy流程
- 1.2.7 错误和通知
0.简介
之前文章对于PG的内部模块做了一些介绍,接下来对PG和外部交互的部分进行介绍,本文主要介绍通讯协议的概念和PG中的消息格式和交互流程。
1.PG通讯协议
通讯协议,就是通信双方对数据传输和解析的一种约定。遵守协议的双方可以进行正常的“交流”。对于PG,其外部的客户端(像psql,JDBC,PgAdmin等)都需要遵循PG的通信协议才能正确的进行通信,下面主要针对PG 7.4 版本开始使用的 Protocol v3.0 版本介绍,该协议支持 TCP/IP 和 Unix 域套接字。下面从数据格式,消息类型和传输流程来分别进行介绍。
1.1 消息格式
消息的格式是:第一个字节标识消息的类型,随后用四个字节标识消息的长度,然后是消息体。
其解析过程可以参考SocketBackend函数
static int
SocketBackend(StringInfo inBuf)
{
int qtype;
/*
* Get message type code from the frontend.
*/
HOLD_CANCEL_INTERRUPTS();
pq_startmsgread();
qtype = pq_getbyte();
if (qtype == EOF) /* frontend disconnected */
{
//
}
/*
* Validate message type code before trying to read body; if we have lost
* sync, better to say "command unknown" than to run out of memory because
* we used garbage as a length word.
*
* This also gives us a place to set the doing_extended_query_message flag
* as soon as possible.
*/
switch (qtype)
{
case 'Q': /* simple query */
doing_extended_query_message = false;
if (PG_PROTOCOL_MAJOR(FrontendProtocol) < 3)
{
/* old style without length word; convert */
if (pq_getstring(inBuf))
{
if (IsTransactionState())
ereport(COMMERROR,
(errcode(ERRCODE_CONNECTION_FAILURE),
errmsg("unexpected EOF on client connection with an open transaction")));
else
{
/*
* Can't send DEBUG log messages to client at this
* point. Since we're disconnecting right away, we
* don't need to restore whereToSendOutput.
*/
whereToSendOutput = DestNone;
ereport(DEBUG1,
(errcode(ERRCODE_CONNECTION_DOES_NOT_EXIST),
errmsg("unexpected EOF on client connection")));
}
return EOF;
}
}
break;
case 'F': /* fastpath function call */
doing_extended_query_message = false;
if (PG_PROTOCOL_MAJOR(FrontendProtocol) < 3)
{
if (GetOldFunctionMessage(inBuf))
{
if (IsTransactionState())
ereport(COMMERROR,
(errcode(ERRCODE_CONNECTION_FAILURE),
errmsg("unexpected EOF on client connection with an open transaction")));
else
{
/*
* Can't send DEBUG log messages to client at this
* point. Since we're disconnecting right away, we
* don't need to restore whereToSendOutput.
*/
whereToSendOutput = DestNone;
ereport(DEBUG1,
(errcode(ERRCODE_CONNECTION_DOES_NOT_EXIST),
errmsg("unexpected EOF on client connection")));
}
return EOF;
}
}
break;
case 'X': /* terminate */
doing_extended_query_message = false;
ignore_till_sync = false;
break;
case 'B': /* bind */
case 'C': /* close */
case 'D': /* describe */
case 'E': /* execute */
case 'H': /* flush */
case 'P': /* parse */
doing_extended_query_message = true;
/* these are only legal in protocol 3 */
if (PG_PROTOCOL_MAJOR(FrontendProtocol) < 3)
ereport(FATAL,
(errcode(ERRCODE_PROTOCOL_VIOLATION),
errmsg("invalid frontend message type %d", qtype)));
break;
case 'S': /* sync */
/* stop any active skip-till-Sync */
ignore_till_sync = false;
/* mark not-extended, so that a new error doesn't begin skip */
doing_extended_query_message = false;
/* only legal in protocol 3 */
if (PG_PROTOCOL_MAJOR(FrontendProtocol) < 3)
ereport(FATAL,
(errcode(ERRCODE_PROTOCOL_VIOLATION),
errmsg("invalid frontend message type %d", qtype)));
break;
case 'd': /* copy data */
case 'c': /* copy done */
case 'f': /* copy fail */
doing_extended_query_message = false;
/* these are only legal in protocol 3 */
if (PG_PROTOCOL_MAJOR(FrontendProtocol) < 3)
ereport(FATAL,
(errcode(ERRCODE_PROTOCOL_VIOLATION),
errmsg("invalid frontend message type %d", qtype)));
break;
default:
/*
* Otherwise we got garbage from the frontend. We treat this as
* fatal because we have probably lost message boundary sync, and
* there's no good way to recover.
*/
ereport(FATAL,
(errcode(ERRCODE_PROTOCOL_VIOLATION),
errmsg("invalid frontend message type %d", qtype)));
break;
}
/*
* In protocol version 3, all frontend messages have a length word next
* after the type code; we can read the message contents independently of
* the type.
*/
if (PG_PROTOCOL_MAJOR(FrontendProtocol) >= 3)
{
if (pq_getmessage(inBuf, 0))
return EOF; /* suitable message already logged */
}
else
pq_endmsgread();
RESUME_CANCEL_INTERRUPTS();
return qtype;
}
1.2 消息交互流程
1.2.1 启动流程
客户端首先发送startup packet到服务器,服务器判断是否需要授权信息,需要的话这发送权限认证请求要求客户端发送密码,权限验证后进行一些参数信息传递,最后发送readyforquery表明连接创建成功。
1.2.2 简单查询流程
客户端会发送一个Query消息,服务端处理请求,然后返回查询结果,查询结果包含两部分,一是结构,即RowDescription(如列名,类型,长度);另外一个是数据,即DataRow(一行数据).
每个sql结束后都会返回一个commandcomplete,查询请求结束后回回复一条ReadyForQuery。
1.2.3 扩展查询
扩展查询将一个查询分为多个步骤,客户端先给服务的发送Parse消息,该消息包含参数化sql,服务端收到消息后,调用exec_parse_message 函数进行处理,进行语法分析、语义分析和重写,同时会创建一个 Plan Cache 的结构,用于缓存后续的执行计划;然后客户端发送bind消息,携带具体参数值,服务端收到该消息后,调用 exec_bind_message 函数进行处理;然后客户端发送describe,获取结果的列名,类型等信息;然后客户端发送execute去获取DataRow;最后Sync是扩展协议的请求消息结束标识。
1.2.3.1 pipelining
扩展查询允许使用流水线,即发送的一系列查询无需等待较早查询完成,减少给定系列操作的网络所需往返次数。但是需要考虑一个步骤失败,后续步骤已经发送给服务器的问题。
1.2.4 取消流程
在查询期间,客户端可能会请求取消查询,为了不让服务端在处理时不断检查来自客户端的新输入,取消请求采用的方式是打开一个新连接然后发送CancelRequet请求,而不是StartUp。那么如何知道要取消哪一个请求?依靠的是查询startup阶段,服务端返回给客户端的进程id和一个取消码。
1.2.5 结束流程
正常终止通过客户端发送的terminate终止消息来关闭连接,在异常常见下(数据库关闭),服务段可能没有收到请求就断开连接,在这种常见下,服务端会尝试在关闭连接之前发送错误或通知消息,给出原因。
1.2.6 copy流程
Copy 子协议对应三种模式:
copy-in 导入数据,对应命令 COPY FROM STDIN
copy-out 导出数据,对应命令 COPY TO STDOUT
copy-both 用于 walsender,在主备间批量传输数据
以 copy-in 为例,服务端收到 COPY 命令后,进入 COPY 模式,并回复 CopyInResponse。随后客户端通过 CopyData 消息传输数据,CopyComplete 消息标识数据传输完成,服务端收到该消息后,发送 CommandComplete 和 ReadyForQuery 消息,消息流如下:
1.2.7 错误和通知
错误和通知通过ErrorResponse 和通知消息 NoticeResponse来返回,具体含义如下: