信号驱动式IO指进程预先告知内核,当某个描述符上发生某事时,内核使用信号通知相关进程,它在历史上曾被称为异步IO,但信号驱动式IO不是真正的异步IO,真正的异步IO通常定义为进程执行IO系统调用告知内核启动某个IO操作,内核启动IO操作后立即返回到进程,进程在IO操作发生期间继续执行,当操作完成或遇到错误时,内核以进程在IO系统调用中指定的某种方式通知进程。
非阻塞式IO也不是真正的异步IO,对于非阻塞式IO,内核一旦启动IO操作就不像异步IO那样立即返回,而是等IO操作完成或遇到错误再返回。非阻塞式IO中,内核立即返回的唯一条件是IO操作的完成需要把进程投入睡眠,这种情况内核不启动IO操作。
POSIX通过aio_XXX函数提供真正的异步IO,这些函数允许进程指定IO操作完成时是否产生信号,以及产生什么信号。
源自Berkeley的实现使用SIGIO信号支持套接字和终端设备上的信号驱动式IO,SVR 4使用SIGPOLL信号支持流设备上的信号驱动式IO,且SIGPOLL与SIGIO值相同。
针对一个套接字使用信号驱动式IO要求进程执行以下步骤:
1.建立SIGIO信号的信号处理函数。
2.设置该套接字的属主,通常使用fcntl函数的F_SETOWN命令设置。
3.开启该套接字的信号驱动式IO,通常通过fcntl函数的F_SETFL命令打开O_ASYNC标志来完成。
O_ASYNC标志是相对较晚加入到POSIX规范中的,支持该标志的系统不多见,因此我们也可用ioctl函数的FIOASYNC请求开启信号驱动式IO。POSIX选用的名字并不恰当,使用O_SIGIO作为此标志名更好。
我们应在设置套接字属主前建立信号处理函数,在源自Berkeley的实现中,这两个步骤的调用顺序无关紧要,因为SIGIO的默认行为是忽略该信号,即使颠倒调用顺序,在调用fcntl后,调用signal前有较小机会产生SIGIO信号,此时信号只是被丢弃。但在SVR 4中,头文件sys/signal.h把SIGIO定义为SIGPOLL,而SIGPOLL的默认行为是终止进程,因此在SVR 4中,我们必须先安装信号处理函数,再设置套接字属主。
很容易把一个套接字设置成以信号驱动式IO模式工作,但哪些条件导致内核产生递交给套接字属主的SIGIO信号取决于具体协议。
对于UDP,SIGIO信号在发生以下事件时内核通过调用sorwakeup产生:
1.数据报到达套接字。
2.套接字上发生异步错误。
当捕获对于某UDP套接字的SIGIO信号时,我们调用recvfrom读入到达的数据或获取发生的异步错误。UDP发生异步错误的前提是UDP套接字已连接。
不幸的是,信号驱动式IO对TCP套接字近乎无用,因为该信号产生得过于频繁,且它的出现并没有告诉我们发生了什么事件,以下情况均导致对于一个TCP套接字产生SIGIO信号(假设该套接字的信号驱动式IO已开启):
1.监听套接字上某连接请求已完成。
2.某个断联请求已发起。
3.某个断联请求已完成。
4.某个连接已关闭一半。
5.数据到达套接字。
6.数据已从套接字发送走(即收到了已发送数据的确认,输出缓冲区有了空闲空间)。
7.发生某异步错误。
例如,如果一个进程读写同一套接字,那么当有数据到达或以前写的数据得到确认时,SIGIO信号均会产生,且信号处理函数中无法区分这两种情况,如果SIGIO用于这种情形,则TCP套接字应设为非阻塞式,以防read或write函数发生阻塞。我们应考虑只对监听TCP套接字使用SIGIO,因为对于监听套接字,产生SIGIO的唯一条件是某个新连接完成。
作者能找到的信号驱动式IO的唯一现实用途是基于UDP的NTP服务器程序,服务器主循环接收来自客户的一个请求数据报并发送回一个应答数据报,但对于每个客户请求,其工作量不能忽略,对服务器而言,重要的是为每个收到的数据报记录到达时间戳,该值将返回给客户,由客户计算到服务器的RTT。以下是构建这样的UDP服务器的两种方式:
大多UDP服务器都设计成上图左侧所示方式(包括我们的UDP回射服务器),但NTP服务器采用上图右侧所示的方式,当一个新数据报到达时,SIGIO信号处理函数读入该数据报,同时记录它的到达时刻,然后将它置于进程内的另一队列中,以便服务器主循环移走并处理。尽管这样做使服务器代码变复杂了,但可以精确获取数据报达到的时间戳。
进程可通过设置IP_RECVDSTADDR套接字选项获取所收取UDP数据报的目的地址,有人认为,对于所接收UDP数据报还应返回另两个信息:接收接口(如果主机采用普遍的弱端系统模型,则接收接口和目的地址可能不一致)和数据报到达时刻。
对于IPv6,IPV6_PKTINFO套接字选项返回接收接口,对于IPv4,IP_RECVIF套接字选项也返回接收接口。
FreeBSD还提供SO_TIMESTAMP套接字选项,它在一个timeval结构中以辅助数据的形式返回数据报的接收时刻。Linux则提供SIOCGSTAMP ioctl,它返回一个含有数据报接收时刻的timeval结构。
现给出一个类似上图右侧的例子,使用SIGIO信号接收到达数据报的UDP回射服务器程序,客户程序不用改动,服务器的main函数也不用改动,只需改动dg_echo函数。以下是UDP回射服务器的全局声明:
#include "unp.h"
static int sockfd;
#define QSIZE 8 /* size of input queue */
#define MAXDG 4096 /* max datagram size */
// SIGIO信号处理函数把到达的数据报放入一个队列,该队列是一个DG结构数组,我们把它作为一个环形缓冲区处理
// 每个DG结构包含指向所收取数据报的指针、该数据报长度、指向含有客户协议地址的套接字地址结构的指针、该协议地址的大小
typedef struct {
void *dg_data; /* ptr to actual datagram */
size_t dg_len; /* length of datagram */
struct sockaddr *dg_sa; /* ptr to sockaddr{} w/client's address */
socklen_t dg_salen; /* length of sockaddr{} */
} DG;
// 静态分配QSIZE个DG结构
static DG dg[QSIZE]; /* queue of datagrams to process */
static long cntread[QSIZE + 1]; /* diagnostic counter */
static int iget; /* next one for main loop to process,主循环将处理的下一个数组元素下标 */
static int iput; /* next one for signal handler to read into,信号处理函数将存放到的下一个数组元素下标 */
static int nqueue; /* # on queue for main loop to process,队列中供主循环处理的数据报总数 */
static socklen_t clilen; /* max length of sockaddr{} */
static void sig_io(int);
static void sig_hup(int);
下图是DG数组的一个例子,其中第一个元素指向一个150字节的数据报,与它关联的套接字地址长度为16:
以下是dg_echo函数,该函数与以上全局声明放在同一文件中:
void dg_echo(int sockfd_arg, SA *pcliaddr, socklen_t clilen_arg) {
int i;
const int on = 1;
sigset_t zeromask, newmask, oldmask;
// 把套接字保存在一个全局变量中,因为信号处理函数里也要用到它
sockfd = sockfd_arg;
clilen = clilen_arg;
// 初始化已接收数据报队列
for (i = 0; i < QSIZE; ++i) { /* init queue of buffers */
dg[i].dg_data = Malloc(MAXDG);
dg[i].dg_sa = Malloc(clilen);
dg[i].dg_salen = clilen;
}
iget = iput = nqueue = 0;
// 为SIGHUP(用于诊断目的)和SIGIO建立信号处理函数
Signal(SIGHUP, sig_hup);
Signal(SIGIO, sig_io);
// 设置套接字属主
Fcntl(sockfd, F_SETOWN, getpid());
// 设置信号驱动IO,上面提到过,fcntl的O_ASYNC标志是POSIX设置信号驱动式IO的方式
// 但由于大多系统还不支持它,我们改用ioctl函数
Ioctl(sockfd, FIOASYNC, &on);
// 设置非阻塞式IO,尽管大多系统支持使用fcntl函数O_NONBLOCK标志设置非阻塞式IO
// 但此处我们仍使用ioctl函数
Ioctl(sockfd, FIONBIO, &on);
Sigemptyset(&zeromask); /* init three signal sets */
// oldmask用来记录阻塞SIGIO时,原来的信号掩码
Sigemptyset(&oldmask);
Sigemptyset(&newmask);
Sigaddset(&newmask, SIGIO); /* signal we want to block */
// 把进程的当前信号掩码保存到oldmask中,然后把newmask逻辑或到当前信号掩码
Sigprocmask(SIG_BLOCK, &newmask, &oldmask);
for (; ; ) {
while (nqueue == 0) {
// sigsuspend函数先保存当前信号掩码,再把当前掩码设置为其参数,此例中是zeromask
// 即不阻塞任何信号,sigsuspend函数在进程捕获一个信号且从该信号的处理函数返回后才返回
// sigsuspend函数总是返回EINTR错误,在返回前它总是会把当前信号掩码恢复为调用时的值
// 在本例中就是恢复为newmask的值,从而确保sigsuspend函数返回后SIGIO继续被阻塞
// 如果sigsuspend函数返回后SIGIO信号未被阻塞,我们会进入下一次while循环的条件判断
// 可能在测试时我们发现nqueue为0,但刚测试完SIGIO信号就被递交了,导致nqueue为1
// 然后我们才调用sigsuspend进入睡眠,这样就错过了这个信号,除非另有信号发生
// 否则我们将永远阻塞在sigsuspend函数处
sigsuspend(&zeromask); /* wait for datagram to process */
}
/* unblock SIGIO */
Sigprocmask(SIG_SETMASK, &oldmask, NULL);
Sendto(sockfd, dg[iget].dg_data, dg[iget].dg_len, 0, dg[iget].dg_sa, dg[iget].dg_salen);
// 修改iget时不用阻塞SIGIO,因为只有主循环使用iget,信号处理函数不改动它
if (++iget >= QSIZE) {
iget = 0;
}
/* block SIGIO */
// 修改nqueue前必须阻塞SIGIO,因为主循环和信号处理函数都会改变它
// 另外我们在循环顶部测试nqueue时也需要SIGIO阻塞着
// 我们也可以去掉主循环中的两个sigprocmask函数,省得解阻塞SIGIO后又再阻塞它
// 但这么做会导致整个循环期间SIGIO一直阻塞着,从而降低了信号处理函数的及时性
// 这么做不会导致数据报的丢失(假设套接字接收缓冲区足够大),但SIGIO信号的递送在阻塞期间一直被拖延
// 编写执行信号处理的应用时,我们应尽可能减少阻塞信号的时间
Sigprocmask(SIG_BLOCK, &newmask, &oldmask);
nqueue--;
}
}
以下是SIGIO的信号处理函数,它也应和全局声明放到同一文件中:
static void sig_io(int signo) {
ssize_t len;
int nread;
DG *ptr;
for (nread = 0; ; ) {
// 如果DG结构数组队列已满,进程就终止,处理这种情况有更合适的方法,如分配额外缓冲区
// 但就我们的简单例子而言不如直接终止进程
if (nqueue >= QSIZE) {
err_quit("receive overflow");
}
ptr = &dg[iput];
ptr->dg_salen = clilen;
len = recvfrom(sockfd, ptr->dg_data, MAXDG, 0, ptr->dg_sa, &ptr->dg_salen);
if (len < 0) {
if (errno == EWOULDBLOCK) {
break; /* all done; no more queued to read */
} else {
err_sys("recvfrom error");
}
}
ptr->dg_len = len;
++nread;
++nqueue;
if (++iput >= QSIZE) {
iput = 0;
}
}
// cntread是每次SIGIO信号读入的数据报数量直方图
// SIGHUP信号被递交时,在其信号处理函数中将其显示为诊断信息
cntread[nread]++; /* histogram of # datagrams read per signal */
}
编写以上SIGIO信号处理函数遇到的问题是POSIX信号通常不排队,如果我们正在执行信号处理函数,期间SIGIO信号会被阻塞,如果期间SIGIO信号又发生了2次,则期间发生的这两次SIGIO信号之后只会再递送一次。
POSIX提供一些排队的实时信号,但SIGIO等信号通常不排队。
考虑以下情形:一个数据报到达导致SIGIO被递交,它的信号处理函数读入该数据报并把它放到供主循环读取的队列中,但在信号处理函数执行期间,又有两个数据报到达,导致SIGIO再产生2次,由于SIGIO被阻塞,当它的信号处理函数返回时,该处理函数仅再被调用一次,该信号处理函数的第二次执行读入第二个数据报,第三个数据报仍会留在套接字接收队列,第三个数据报被读入的条件是第四个数据报到达,当第四个数据报到达时,被读入并放到供主循环读取的队列中的是第三个而非第四个数据报。
既然信号是不排队的,开启信号驱动式IO的描述符通常也被设为非阻塞式,这样我们就可以把SIGIO信号的处理函数编写成在一个循环中执行读入操作,直到该操作返回EWOULDBLOCK。
主循环有另一种有问题的写法:
for (; ; ) {
Sigprocmask(SIG_BLOCK, &newmask, &oldmask);
while (nqueue == 0) {
sigsuspend(&zeromask); /* wait for datagram to process */
}
--nqueue;
/* unblock SIGIO */
Sigprocmask(SIG_SETMASK, &oldmask, NULL);
Sendto(sockfd, dg[iget].dg_data, dg[iget].dg_len, 0, dg[iget].dg_sa, dg[iget].dg_salen);
if (++iget >= QSIZE) {
iget = 0;
}
}
以上写法的错误在于,nqueue是在回射数组元素dg[iget]前递减的,这可能导致信号处理回射把新的数据报从套接字读出来并存到这个数组元素。如当前DG数组已满,iget和iput指向同一元素,此时信号处理函数中nqueue的值为QSIZE-1
,从而不会分配更多空间存放数据报,而是在当前iput处存放收到的套接字,导致iget指向的元素还未使用就被覆盖。
以下是SIGHUP的信号处理函数,它显示cntread数组内容,该函数也应和全局声明放在一个文件中:
static void sig_hup(int signo) {
int i;
for (i = 0; i <= QSIZE; ++i) {
printf("cntread[%d] = %ld\n", i, cntread[i]);
}
}
为了说明信号是不排队的,且除了设置套接字的信号驱动式IO标志外,还必须把套接字设置为非阻塞式,我们与6个客户一起运行以上回射服务器,每个客户发送3645行让服务器回射的文本(即3645个数据报,每个数据报是一行),且每个客户都从同一个shell脚本以后台方式启动,从而使所有客户几乎在同一时刻启动。所有客户终止后,我们向服务器发送SIGHUP信号,显示cntread数组内容:
大多情况下信号处理函数每次被调用只读入一个数据报,但有些情况会读入多个数据报。cntread[0]计数器不为0是可能的:SIGIO信号在其信号处理函数执行时产生,且在信号处理函数的本次执行中就预先读入了这些信号对应的数据报,当信号处理函数因这些信号的再次递交而被调用时,已经没有剩余的数据报可读了。最后我们验证该数组元素的加权总和等于6个客户发送的文本行数:15899*1+2099*2+515*3+57+4=6*3645=21870
。
信号驱动式IO就是让内核在套接字上发生某事时使用SIGIO信号通知进程:
1.对于已连接TCP套接字,有很多情况都会导致该通知,反而使这个特性几近无用。
2.对于监听TCP套接字,这种通知发生在有一个新连接准备好被接受时。
3.对于UDP套接字,这种通知意味着一个数据报或异步错误到达,这两种情况我们都调用recvfrom。