网络程序需要处理定时事件,如定期检测一个客户连接的活动状态。服务器程序通常管理着众多定时事件,有效地组织这些定时事件,使其在预期的时间被触发且不影响服务器的主要逻辑,对于服务器的性能有至关重要的影响。为此,我们要将每个定时事件分别封装成定时器,并使用某种容器类数据结构,如链表、排序链表、时间轮,将所有定时器串联起来,以实现对定时事件的统一管理。本章主要讨论两种高效的管理定时器的容器:时间轮和时间堆。
定时指在一段时间后触发某段代码的机制,我们可以在这段代码中依次处理所有到期的定时器,即定时机制是定时器得以被处理的原动力。Linux提供三种定时方法:
- socket套接字选项
SO_RCVTIMEO
和SO_SNDTIMEO
; SIGALRM
信号;- I/O复用系统调用的超时参数。
socket 选项 SO_RCVTIMEO
和 SO_SNDTIMEO
SO_RCVTIMEO
设置 socket 接收数据超时时间。
SO_SNDTIMEO
设置 socket 发送数据超时时间。
这两个数据仅对与数据接收和发送相关的 socket 系统调用 send
、sendmsg
、recv
、recvmsg
、accept
和 connect
。
在程序中,我们根据上述系统调用的返回值以及 errno
来判断超时时间是否已到,进而决定是否开始处理定时任务。
实战 5.使用 SO_SNDTIMEO
选项设置定时
#include <sys/types.h>
#include <sys/socket.h>
#include <netinet/in.h>
#include <arpa/inet.h>
#include <stdlib.h>
#include <assert.h>
#include <stdio.h>
#include <errno.h>
#include <fcntl.h>
#include <unistd.h>
#include <string.h>
#include <libgen.h>
/* 超时连接函数
* 这个函数的作用是尝试与指定IP地址和端口的服务器建立连接,
* 并设置一个连接超时时间。
*/
int timeout_connect(const char *ip, int port, int time) {
int ret = 0;
struct sockaddr_in address;
bzero(&address, sizeof(address));
address.sin_family = AF_INET;
inet_pton(AF_INET, ip, &address.sin_addr);
address.sin_port = htons(port);
int sockfd = socket(PF_INET, SOCK_STREAM, 0);
assert(sockfd >= 0);
// SO_RCVTIMEO和SO_SNDTIMEO套接字选项对应的值类型为timeval,这和select函数的超时参数类型相同
struct timeval timeout;
timeout.tv_sec = time;
timeout.tv_usec = 0;
socklen_t len = sizeof(timeout);
ret = setsockopt(sockfd, SOL_SOCKET, SO_SNDTIMEO, &timeout, len);
assert(ret != -1);
/*
* 如果连接在指定时间内没有成功建立,connect() 函数将返回 -1,
* 并且 errno 会被设置为 EINPROGRESS,表示连接超时。
*/
printf("Attempting to connect...\n");
ret = connect(sockfd, (struct sockaddr *)&address, sizeof(address));
printf("Connect returned: %d\n", ret);
if (ret == -1) {
// 超时对应的错误号是EINPROGRESS,此时就可执行定时任务了
if (errno == EINPROGRESS) {
printf("conencting timeout, process timeout logic\n");
return -1;
}
printf("error occur when connecting to server\n");
return -1;
}
return sockfd;
}
int main(int argc, char *argv[]) {
if (argc != 3) {
printf("usage: %s ip_address port_number\n", basename(argv[0]));
return 1;
}
const char *ip = argv[1];
int port = atoi(argv[2]);
/* 超时时间 10s */
int sockfd = timeout_connect(ip, port, 10);
if (sockfd < 0) {
return 1;
}
return 0;
}
编译:
g++ -o client connect_timeout.cpp
运行:
./client 192.168.100.100 8080 /* 这是一个无效的IP地址 */
结果:
SIGALRM 信号
由alarm
和setitimer
函数设置的实时闹钟一旦超时,就会触发SIGALRM
信号,我们可以使用该信号的信号处理函数来处理定时任务。
基于升序链表的定时器
定时器通常至少要包含两个成员:一个超时时间(相对时间或绝对时间)和一个任务回调函数。有时还可能包含回调函数被执行时需要传入的参数,以及是否重启定时器等信息。如果使用链表作为容器来串联所有定时器,则每个定时器还要包含指向下一定时器的指针成员,如果链表是双向的,则每个定时器还需要包含指向前一个定时器的指针成员。
从执行效率来看,添加定时器的时间复杂度是O(n)
,删除定时器的时间复杂度是O(1)
,执行定时任务的时间复杂度平均是O(1)
。
处理非活动连接
现在考虑以上升序定时器链表的实际应用 ——处理非活动连接。服务器进程通常要定期处理非活动连接:给客户端发一个重连请求,或关闭该连接,或者其他。
Linux在内核中提供了对连接是否处于活动状态的定期检查机制,我们可通过socket选项KEEPALIVE
来激活它。我们考虑在应用层实现类似KEEPALIVE
的机制,以管理所有长时间处于非活动状态的连接。
实战 6. 利用 alarm 函数周期性触发 SIGALRM 信号
如以下代码利用alarm
函数周期性地触发SIGALRM
信号,该信号的信号处理函数利用管道通知主循环执行定时器链表上的定时任务——关闭非活动的连接。
服务器端
TIMESLOT
定义了定时器的基本时间单位,用于设定定时器回调函数的调用频率。这个值代表了服务器在触发 SIGALRM
信号后多长时间再次触发该信号,从而执行定时任务。在代码示例中,TIMESLOT
被设置为 5 秒,也就是说,基本时间单位是 5 秒。
alarm
函数用于安排信号在未来的某个时间点发送给进程。这个函数是标准库中的一部分,定义在 <unistd.h>
头文件中。当调用 alarm(TIMESLOT);
时,设定了一个闹钟,让操作系统在 TIMESLOT
秒后向当前进程发送一个 SIGALRM
信号。
alarm(TIMESLOT);
在 timer_handler()
函数中调用,意味着每隔 TIMESLOT
秒,操作系统将向进程发送一个 SIGALRM
信号。收到这个信号后,sig_handler
处理函数将被执行,进而调用 timer_handler
函数来处理所有定时任务。
当一个新的客户端连接被接受,服务器为这个连接创建一个定时器,并设置其超时时间:
timer->expire = cur + 3 * TIMESLOT;
cur
是当前时间,3 * TIMESLOT
表示定时器的超时时间是15秒(因为 3 乘以 5秒)。
#include <sys/types.h>
#include <sys/socket.h>
#include <netinet/in.h>
#include <arpa/inet.h>
#include <assert.h>
#include <stdio.h>
#include <signal.h>
#include <unistd.h>
#include <errno.h>
#include <string.h>
#include <fcntl.h>
#include <stdlib.h>
#include <sys/epoll.h>
#include <pthread.h>
#include <libgen.h>
#include "lst_timer.h"
#define FD_LIMIT 65535
#define MAX_EVENT_NUMBER 1024
#define TIMESLOT 5
static int pipefd[2];
static sort_timer_lst timer_lst; /* 用升序链表来管理定时器 */
static int epollfd = 0;
/* 将文件描述符设置为非阻塞模式 */
int setnonblocking(int fd) {
int old_option = fcntl(fd, F_GETFL);
int new_option = old_option | O_NONBLOCK;
fcntl(fd, F_SETFL, new_option);
return old_option;
}
/* addfd 函数把新的文件描述符添加到 epoll 监听列表中,
* 并且设置为非阻塞和边缘触发模式(EPOLLET),以提高事件处理的效率。
*/
void addfd(int epollfd, int fd) {
epoll_event event;
event.data.fd = fd;
event.events = EPOLLIN | EPOLLET;
epoll_ctl(epollfd, EPOLL_CTL_ADD, fd, &event); /* 将fd注册到内核事件表epollfd中*/
setnonblocking(fd);
}
/* sig_handler 用于处理程序接收到的信号。
* 它把信号编号发送到 pipefd 管道,这样主循环可以安全地处理信号事件。
* 这种使用信号和管道的组合是多线程或多进程环境中处理信号的一种安全模式。
*/
void sig_handler(int sig) {
int save_errno = errno;
int msg = sig;
// 此处还是老bug,没有考虑字节序就发送了int的低地址的1字节
send(pipefd[1], (char *)&msg, 1, 0);
errno = save_errno;
}
/* addsig 函数设置信号处理函数,并确保系统调用被中断时能自动重启,避免了部分系统调用失败的问题。 */
void addsig(int sig) {
struct sigaction sa;
memset(&sa, '\0', sizeof(sa));
sa.sa_handler = sig_handler;
sa.sa_flags |= SA_RESTART;
sigfillset(&sa.sa_mask);
assert(sigaction(sig, &sa, NULL) != -1);
}
void timer_handler() {
// 处理定时任务
timer_lst.tick();
// 由于alarm函数只会引起一次SIGALRM信号,因此重新定时,以不断触发SIGALRM信号
alarm(TIMESLOT);
}
/* 定时器回调函数,它删除非活动连接socket上的注册事件,并关闭之 */
void cb_func(client_data *user_data) {
epoll_ctl(epollfd, EPOLL_CTL_DEL, user_data->sockfd, 0);
assert(user_data);
close(user_data->sockfd);
printf("close fd %d\n", user_data->sockfd);
}
int main(int argc, char *argv[]) {
if (argc != 3) {
printf("usage: %s ip_address port_number\n", basename(argv[0]));
return 1;
}
const char *ip = argv[1];
int port = atoi(argv[2]);
int ret = 0;
struct sockaddr_in address;
bzero(&address, sizeof(address));
address.sin_family = AF_INET;
inet_pton(AF_INET, ip, &address.sin_addr);
address.sin_port = htons(port);
int listenfd = socket(PF_INET, SOCK_STREAM, 0);
assert(listenfd >= 0);
ret = bind(listenfd, (struct sockaddr *)&address, sizeof(address));
assert(ret != -1);
ret = listen(listenfd, 5);
assert(ret != -1);
epoll_event events[MAX_EVENT_NUMBER];
int epollfd = epoll_create(5); /* 创建内核事件表 */
assert(epollfd != -1);
addfd(epollfd, listenfd);
ret = socketpair(PF_UNIX, SOCK_STREAM, 0, pipefd);
assert(ret != -1);
setnonblocking(pipefd[1]); /* fd[1]只能用于数据写入。 */
addfd(epollfd, pipefd[0]);
// 设置信号处理函数
addsig(SIGALRM);
addsig(SIGTERM);
bool stop_server = false;
// 直接初始化FD_LIMIT个client_data对象,其数组索引是文件描述符
client_data *users = new client_data[FD_LIMIT];
bool timeout = false;
// 定时
alarm(TIMESLOT);
while (!stop_server) {
int number = epoll_wait(epollfd, events, MAX_EVENT_NUMBER, -1); /* 它在一段超时时间内等待一组文件描述符上的事件 */
if ((number < 0) && (errno != EINTR)) {
printf("epoll failure\n");
break;
}
for (int i = 0; i < number; ++i) {
int sockfd = events[i].data.fd;
// 处理新到的客户连接
if (sockfd == listenfd) {
struct sockaddr_in client_address;
socklen_t client_addrlength = sizeof(client_address);
int connfd = accept(listenfd, (struct sockaddr *)&client_address, &client_addrlength);
addfd(epollfd, connfd);
users[connfd].address = client_address;
users[connfd].sockfd = connfd;
// 创建一个定时器,设置其回调函数和超时时间,然后绑定定时器和用户数据,并将定时器添加到timer_lst中
util_timer *timer = new util_timer;
timer->user_data = &users[connfd];
timer->cb_func = cb_func;
time_t cur = time(NULL);
timer->expire = cur + 3 * TIMESLOT;
users[connfd].timer = timer;
timer_lst.add_timer(timer);
// 处理信号
} else if ((sockfd == pipefd[0]) && (events[i].events & EPOLLIN)) {
int sig;
char signals[1024];
ret = recv(pipefd[0], signals, sizeof(signals), 0);
if (ret == -1) {
// handle the error
continue;
} else if (ret == 0) {
continue;
} else {
for (int i = 0; i < ret; ++i) {
switch (signals[i]) {
case SIGALRM:
// 先标记为有定时任务,因为定时任务优先级比IO事件低,我们优先处理其他更重要的任务
timeout = true;
break;
case SIGTERM:
stop_server = true;
break;
}
}
}
// 从客户连接上接收到数据
} else if (events[i].events & EPOLLIN) {
memset(users[sockfd].buf, '\0', BUFFER_SIZE);
ret = recv(sockfd, users[sockfd].buf, BUFFER_SIZE - 1, 0);
printf("get %d bytes of client data %s from %d\n", ret, users[sockfd].buf, sockfd);
util_timer *timer = users[sockfd].timer;
if (ret < 0) {
// 如果发生读错误,则关闭连接,并移除对应的定时器
if (errno != EAGAIN) {
cb_func(&users[sockfd]);
if (timer) {
timer_lst.del_timer(timer);
}
}
} else if (ret == 0) {
// 如果对方关闭连接,则我们也关闭连接,并移除对应的定时器
cb_func(&users[sockfd]);
if (timer) {
timer_lst.del_timer(timer);
}
} else {
// 如果客户连接上读到了数据,则调整该连接对应的定时器,以延迟该连接被关闭的时间
if (timer) {
time_t cur = time(NULL);
timer->expire = cur + 3 * TIMESLOT;
printf("adjust timer once\n");
timer_lst.adjust_timer(timer);
}
}
}
}
// 最后处理定时事件,因为IO事件的优先级更高,但这样会导致定时任务不能精确按预期的时间执行
if (timeout) {
timer_handler();
timeout = false;
}
}
close(listenfd);
close(pipefd[1]);
close(pipefd[0]);
delete[] users;
return 0;
}
客户端
为了测试服务器能够按照预定的定时器逻辑关闭非活动连接,我们可以编写一个简单的客户端程序,该程序连接到服务器后不发送任何数据,仅保持连接一定时间后(sleep(20);
)关闭,看服务器是否会在设定的超时时间后自动关闭该连接。
服务端中,定时器的超时时间被设置为3 * TIMESLOT
,即15秒(TIMESLOT
设置为5秒)。定时器的目的是在客户端在指定时间内没有任何活动(例如数据交换)的情况下自动关闭该连接。
SIGALRM
信号每次触发就在其信号处理函数(如果使用统一事件源,则是主函数)中执行一次tick
函数,处理链表上的到期任务,并在终端打印"timer tick"。
void timer_handler() {
/* 处理定时任务 */
timer_lst.tick();
/* 由于alarm函数只会引起一次SIGALRM信号,因此重新定时,以不断触发SIGALRM信号 */
alarm(TIMESLOT);
}
在服务器端,在没有收到任何数据的情况下,服务器在定时器到期后关闭了连接的日志消息,例如 "close fd <fd>
" 的输出,表示服务器正确处理了非活动连接。
#include <stdio.h>
#include <stdlib.h>
#include <unistd.h>
#include <string.h>
#include <sys/socket.h>
#include <netinet/in.h>
#include <arpa/inet.h>
int main(int argc, char *argv[]) {
if (argc != 3) {
printf("Usage: %s <ip> <port>\n", argv[0]);
return 1;
}
const char *server_ip = argv[1];
int server_port = atoi(argv[2]);
// 创建 socket
int sock = socket(AF_INET, SOCK_STREAM, 0);
if (sock < 0) {
perror("Socket creation failed");
return 1;
}
// 服务器地址结构
struct sockaddr_in server_addr;
memset(&server_addr, 0, sizeof(server_addr));
server_addr.sin_family = AF_INET;
server_addr.sin_port = htons(server_port);
if (inet_pton(AF_INET, server_ip, &server_addr.sin_addr) <= 0) {
perror("Invalid address/ Address not supported");
return 1;
}
// 连接到服务器
if (connect(sock, (struct sockaddr *)&server_addr, sizeof(server_addr)) < 0) {
perror("Connection Failed");
return 1;
}
printf("Connected to server, now sleeping...\n");
// 保持连接,但不进行任何通信
sleep(20); // 保持连接20秒,调整时间以匹配服务器设置的超时时间
// 关闭 socket
close(sock);
printf("Disconnected from server\n");
return 0;
}
效果
I/O 复用系统调用的超时函数
Linux下的3组I/O复用系统调用都带有超时参数,因此它们不仅能统一处理信号(通过管道在信号处理函数中通知主进程)和I/O事件,也能统一处理定时事件,但由于I/O复用系统调用可能在超时时间到期前就返回(有I/O事件发生),所以如果我们要利用它们来定时,就需要不断更新定时参数以反映剩余的时间,如下代码所示:
#define TIMEOUT 5000
int timeout = TIMEOUT;
time_t start = time(NULL);
time_t end = time(NULL);
while (1) {
printf("the timeout is now %d mil-seconds\n", timeout);
start = time(NULL);
int number = epoll_wait(epollfd, events, MAX_EVENT_NUMBER, timeout);
if ((number < 0) && (errno != EINTR)) {
printf("epoll failure\n");
break;
}
// 如果epoll_wait函数返回0,说明超时时间到,此时可处理定时任务,并重置定时时间
if (number == 0) {
// 处理定时任务
timeout = TIMEOUT;
continue;
}
// 到此,epoll_wait函数的返回值大于0,
end = time(NULL);
// 更新timeout的值为减去本次epoll_wait调用的持续时间
timeout -= (end - start) * 1000;
// 重新计算后的timeout值可能是0,说明本次epoll_wait调用返回时,不仅有文件描述符就绪,且其超时时间也刚好到达
// 此时我们要处理定时任务,并充值定时时间
if (timeout <= 0) {
// 处理定时任务
timeout = TIMEOUT;
}
// handle connections
}
time_t time(time_t *t);
函数原型:
time_t time(time_t *t);
参数
t
: 这是一个指向time_t
类型的指针,用于存储时间值。如果参数是NULL
,函数只返回当前时间。
返回值
- 函数返回一个
time_t
类型的值,它表示从Epoch至今的秒数。如果出现错误,则返回(time_t) -1
。
高性能定时器
时间轮
基于排序链表的定时器存在一个问题:添加定时器的效率偏低(添加定时器的时间复杂度是O(n)
)。下面讨论的时间轮解决了这个问题,一种简单的时间轮如下图:
上图中,时间轮内部的实线指针指向轮子上的一个槽(slot),它以恒定的速度顺时针转动,每转动一步就指向下一个槽(虚线指针所指的槽),每次转动称为一个滴答(tick),一个滴答的时间称为时间轮的槽间隔si(slot interval),它实际上就是心搏时间。上图中的时间轮共有N个槽,因此它转动一周的时间是N * si
。每个槽指向一条定时器链表,每条链表上的定时器具有相同的特征:它们的定时事件相差N * si
的整数倍,时间轮正是利用这个关系将定时器散列到不同的链表中。假如现在指针指向槽cs
,我们要添加一个定时事件为ti
的定时器,则该定时器将被插入槽ts
(timer slot)对应的链表中:
基于排序链表的定时器使用唯一的一条链表来管理所有定时器,所以插入操作的效率随着定时器数目的增多而降低,而时间轮使用哈希表的思想,将定时器散列到不同的链表上,这样每条链表上的定时器数目都将明显少于原来的排序链表上的定时器数目,插入操作的效率基本不受定时器数目的影响。
对时间轮而言,要想提高定时精度,就要使si
足够小,要提高执行效率,就要求N足够大(N越大,散列冲突的概率就越小)。
以下代码描述了一种简单的时间轮,因为它只有一个轮子,而复杂的时间轮可能有多个轮子,不同的轮子有不同的粒度,相邻的两个轮子,精度高的转一圈,精度低的仅仅往前移动一槽,就像水表一样。
对时间轮而言,如果一共有n个定时器,则添加一个定时器的时间复杂度为O(1)
;删除一个定时器的时间复杂度平均也是O(1)
,但最坏情况下可能所有节点都在一个槽中,此时删除定时器的时间复杂度为O(n)
;执行一个定时器的时间复杂度是O(n)
,实际上执行一个定时器任务的效率要比O(n)
好得多,因为时间轮将所有定时器散列到了不同的链表上,时间轮的槽越多,等价于散列表的入口(entry)越多,从而每条链表上的定时器数量越少。此外,以上代码中只使用了1个时间轮,当使用多个轮子来实现时间轮时,执行一个定时器任务的时间复杂度将接近O(1)
。
时间堆
以上讨论的定时方案都是以固定频率调用心搏函数tick
,并在其中依次检测到期的定时器,然后执行到期定时器上的回调函数。设计定时器的另一种思路是:将所有定时器中超时时间最小的定时器的超时值作为心搏间隔,这样,一旦心搏函数tick
被调用,超时时间最小的定时器必然到期,我们就可在tick
函数中处理该定时器,然后,再次从剩余定时器中找出超时时间最小的一个,并将这段最小时间设为下一次心搏间隔。
最小堆很适合这种定时方案:
最小堆是每个节点的值都小于或等于其子节点的值的完全二叉树
由于最小堆其实就是一棵树,所以其实现可以用链表,也可以用数组。用最小堆实现的定时器称为时间堆。对时间堆而言,添加一个定时器的时间复杂度是O(lgn)
;删除一个定时器的时间复杂度是O(1)
;执行一个定时器的时间复杂度是O(1)
。因此,时间堆的效率很高。