高性能定时器
时间轮
由于排序链表定时器容器有这样一个问题:添加定时器的效率偏低。而即将介绍的时间轮则解决了这个问题。一种简单的时间轮如下所示。
如图所示的时间轮内,指针指向轮子上的一个slot(槽), 它以恒定的时间顺时针转动,每转动一步指向下一个槽,每次转动就称为一个滴答即tick()
。一个tick的时间成为时间轮的槽间隔时间si(slot interval)。该时间轮共有N个槽,因此它运转一周的时间为 N * si
。
每个槽指向一条定时器链表,每条链表上的定时器都具有相同的特征:它们的定时时间相差 N * si 的整数倍。
假如现在指针指向槽cs(current slot), 添加一个定时时间为ti的定时器,则该定时器将被插入槽 ts(timer slot)对应的链表中:
ts = (cs + (ti / si)) % N
基于排序链表的定时器使用唯一的一条链表来管理所有定时器,所以插入操作随着定时器数目的增多而降低。
时间轮使用哈希表的思想,将定时器散列到不同的链表上。 这样每条链表上的定时器数目都将明显少于原来的排序链表上的定时器数目。
对于时间轮而言,要提高定时精度,就要使**si (槽间隔)**值足够小;
要提高执行效率,则要求**N(槽数量)**值足够大。
如下是一个简单的时间轮代码,本上还是以链表的操作为主
// 时间轮定时器,用数组(环)存储每一条定时器链表
// 求hash(超时时间)决定定时器到数组的哪个位置
#ifndef TIME_WHEEL_TIMER
#define TIME_WHEEL_TIMER
#include <time.h>
#include <netinet/in.h>
#include <stdio.h>
#define BUFFER_SIZE 64
class tw_timer; // 时间轮定时器前置声明
// 绑定socket和定时器
struct client_data
{
sockaddr_in address;
int sockfd;
char buf[BUFFER_SIZE];
tw_timer *timer;
}
class tw_timer
{
public:
tw_timer(int rot, int ts)
:next(NULL), prev(NULL), rotation(rot), time_slot(ts){}
public:
int rotation; // 记录定时器在时间轮转多少圈才生效
int time_slot; // 记录定时器属于时间轮上的哪个slot
void (*cb_func)(client_data *);
client_data *user_data; // 用指针存储对应的用户数据
tw_timer *next;
tw_timer *prev;
}
class time_wheel
{
public:
time_wheel() : cur_slot(0)
{
for(int i = 0; i < N; ++i)
{
slots[i] = NULL; // init
}
}
~time_wheel()
{
for(int i = 0; i < N; ++i)
{
tw_timer *tmp = slots[i];
while(tmp)
{
// 重新设置链表头节点
slots[i] = tmp->next;
delete tmp;
tmp = slots[i];
}
}
}
// 根据定时值timeout,创建一个定时器,并把它插入到合适的槽中
tw_timer* add_timer(int timeout)
{
if(timeout < 0)
{
return NULL;
}
int ticks = 0;
// 根据待插入定时器的超时值计算它将在时间轮转动多少个滴答后被触发
// 若待插入定时器的超时值小于时间轮的槽间隔,则将ticks向上取整为1
if(timeout < SI)
{
ticks = 1;
}
else
{
ticks = timeout / SI;
}
// 计算转多少圈后被触发
int rotation = ticks / N;
// 计算待插入的定时器应该被插入哪个槽中
int ts = (cur_slot + (ticks % N)) % N;
// 创建新的定时器,它在时间轮转动rotation圈之后触发,位于第ts个slot上
tw_timer *timer = new tw_timer(rotation, ts);
// 插入指定槽中的链表 头
// 第ts个slot没有任何定时器(空链表)
if(!slots[ts])
{
printf("add timer, rotation is %d, ts is %d, cur_slot is %d\n", rotation, ts, cur_slot);
}
else // 链表非空,则头插
{
timer->next = slots[ts];
slots[ts]->prev = timer;
slots[ts] = timer;
}
return timer;
}
// 删除目标定时器
void del_timer(tw_timer *timer)
{
if(!timer)
{
return;
}
int ts = timer->time_slot;
// slots[ts] 是目标定时器所在头节点
// 如果待删定时器就是该头节点,则需要重置第ts个slot的链表头节点
if(timer == slots[ts])
{
slots[ts] = slots[ts]->next;
if(slots[ts])
{
slots[ts]->prev = NULL;
}
// 如果第ts个slot的链表就剩下一个节点,直接删除
delete timer;
}
else
{
timer->prev->next = timer->next;
if(timer->next)
{
timer->next->prev = timer->prev;
}
delete timer;
}
}
// 时间轮转动函数,每SI时间之后,cur_slot向前滚动一个slot
void tick()
{
// 时间轮上当前槽的头节点
tw_timer *tmp = slots[cur_slot];
printf("current slot is %d\n", cur_slot);
// 遍历当前slot上链表的每个定时器节点,
while(tmp)
{
printf("tick the timer once\n");
// 定时器超过1轮,跳过
if(tmp->rotation > 0)
{
tmp->rotation--;
tmp = tmp->next;
}
// 否则只要指针到当前slot,里面的所有定时器就都到时了
else // 执行定时任务,删除tmp节点
{
tmp->cb_func(tmp->user_data);
// 链表头节点!!
if(tmp = slots[cur_slot])
{
printf("delete header in cur_slot\n");
slots[cur_slot] = tmp->next; // 让tmp的下一个节点做头节点
delete tmp;
if(slots[cur_slot])
{
slots[cur_slot]->prev = tmp->prev;
}
tmp = slots[cur_slot]; // tmp为刚刚删除的节点的下一个节点
}
else // 非头节点
{
tmp->prev->next = tmp->next;
if(tmp->next)
{
tmp->next->prev = tmp->prev;
}
tw_timer *tmp2 = tmp->next;
delete tmp;
tmp = tmp2; //
}
}
}
// 时间轮转动(指针移动到下一个slot)
cur_slot = (++cur_slot) % N;
}
private:
// 时间轮上slot的数目
static const int N = 60;
// 指针每1s转动一次,即slot的间隔为1s slot interval
static const int SI = 1;
// 时间轮,每个存放定时器链表
tw_timer* slots[N];
int cur_slot; // 指针,指向当前的slot
};
复杂度分析
由于添加一个定时器是链表头插,则时间复杂度为 O ( 1 ) O(1) O(1)
删除一个定时器的时间复杂的也为 O ( 1 ) O(1) O(1)
执行一个定时器的复杂度为 O ( n ) O(n) O(n).但实际执行一个定时任务效率要比 O ( n ) O(n) O(n)好,因为时间轮将所有定时器散列到不同的链表上。
若使用多个轮子实现时间轮,执行一个定时器任务的复杂度可以降到 O ( 1 ) O(1) O(1)
时间堆
前面讨论的定时器方案都是以固定的频率调用定时处理函数tick()
,并在其中依次检测到期的定时器,然后执行到期定时器上的回调函数。
设计定时器的另一种思路是:将所有定时器中超时时间最小的一个定时器的超时时间作为调用tick()
的间隔时间。 这样,一旦tick()
函数被调用,超时时间最小的定时器必然到期,我们就可以在tick()
函数中处理该定时器。
然后再从剩余的定时器中找出超时时间最小的一个,并将这段最小时间设置为下一次tick()
间隔。
最小堆很适合这种定时方案。本文实现最小堆有以下关键特点:
-
根节点值小于其孩子节点的值(递归成立);
-
插入节点是在最后一个节点添加新节点,然后进行上滤保证最小堆特性;
-
删除节点是删除其根节点上的元素,然后把最后一个元素移动到根节点,进行下滤操作保证最小堆特性;
-
将N个元素的数组(普通二叉树)初始化为最小堆,即从二叉树最后一个非叶节点到根节点(第 [ ( N − 1 ) / 2 ] [(N-1) / 2] [(N−1)/2] ~ 0 个元素)执行下滤操作。
-
本文实现的最小堆底层是用数组进行存储,是一个适配器,联想C++的
priority_queue<int, vector<int>, greater<int>>
最小堆代码实现如下
// 用最小堆存储定时器,称为时间堆
#ifndef MIN_HEAP
#define MIN_HEAP
#include <iostream>
#include <netinet/in.h>
#include <time.h>
using std::exception;
#define BUFFER_SIZE 64
class heap_timer;
// 绑定socket和定时器
struct client_data
{
sockaddr_in address;
int sockfd;
char buf[BUFFER_SIZE];
heap_timer *timer;
};
// 定时器类
class heap_timer
{
public:
heap_timer(int delay)
{
expire = time(NULL) + delay;
}
public:
time_t expire = expire; // 定时器生效的绝对时间
void (*cb_func)(client_data *);
client_data *user_data;
};
// 时间堆类
class time_heap
{
public:
// 初始化一个大小为cap的空堆
// throw (std::exception) 表示该函数可能抛出std::exception 类型的异常
time_heap(int cap) throw (std::exception) : capacity(cap), cur_size(0)
{
array = new heap_timer*[capacity];
if(!array)
{
throw std::exception();
}
else
{
for(int i = 0; i < capacity; ++i)
{
array[i] = NULL;
}
}
}
// 用已有的堆数组初始化堆
time_heap(heap_timer **init_array, int size, int capacity) throw (std::exception) : cur_size(size), capacity(capacity)
{
if(capacity < size)
{
throw std::exception();
}
array = new heap_timer*[capacity];
if(!array)
{
throw std::exception();
}
for(int i = 0; i < capacity; ++i)
{
array[i] = NULL;
}
if(size != 0)
{
// 初始化数组
for(int i = 0; i < size; ++i)
{
array[i] = init_array[i];
}
// 最后一个非叶子节点到根节点调堆(下滤)
for(int i = (cur_size - 1); i >= 0; --i)
{
percolate_down(i);
}
}
}
~time_heap()
{
for(int i = 0; i < cur_size; ++i)
{
delete array[i];
}
delete []array;
}
public:
// 堆添加节点,上滤
void add_timer(heap_timer *timer) throw (std::exception)
{
if(!timer)
{
return;
}
if(cur_size >= capacity) // 容量不足,堆指针数组需要扩充一倍
{
resize();
}
// 新插入了一个元素,在堆最后插入,然后调堆
int hole = cur_size++;
int parent = 0;
// 上滤操作
for(; hole > 0; hole = parent) // hole = parent使得最终结果位置上移
{
parent = (hole - 1) / 2; // hole节点的父节点计算
if(array[parent]->expire <= timer->expire)
{
// 父节点小于插入的节点,满足小根堆要求,直接结束
break;
}
array[hole] = array[parent]; // 父节点节点下移
}
array[hole] = timer;
}
void del_timer(heap_timer *timer)
{
if(!timer)
{
return;
}
// 仅仅将目标定时器的回调函数设置为空,即延迟销毁
// 这将节省真正删除该定时器的开销,但易使堆数指针组膨胀
timer->cb_func = NULL;
}
// 获取堆顶部的定时器,expire最小者
heap_timer* top() const
{
if(empty())
{
return NULL;
}
return array[0];
}
// 删除堆顶部的定时器
void pop_timer()
{
if(empty())
{
return ;
}
if(array[0])
{
delete array[0];
// 将原来堆顶元素用堆的最后一个元素临时填充,然后下滤
array[0] = array[--cur_size];
percolate_down(0);
}
}
// 定时处理函数
void tick()
{
heap_timer *tmp = array[0];
time_t cur = time(NULL); // 循环遍历堆中每个定时器(堆用数组实现,故数组遍历),处理到期的定时器
while(!empty())
{
if(!tmp)
{
break;
}
// 如果堆顶定时器没到期,则退出循环,因为堆顶定时器到时时间使最近的,其他更晚
if(tmp->expire > cur)
{
break;
}
if(array[0]->cb_func)
{
array[0]->cb_func(array[0]->user_data);
}
// 将堆顶元素删除,同时让tmp指向新的堆顶
pop_timer();
tmp = array[0];
}
}
bool empty() const
{
return cur_size == 0;
}
private:
// 下面两个函数是被其他成员函数调用,不对外提供
// 最小堆的下滤操作,确保数组中以第hole个节点作为根的子树满足最小堆性质
void percolate_down(int hole)
{
heap_timer *temp = array[hole];
int child = 0;
// hole * 2 + 1为hole的左孩子
for(; (hole * 2 + 1) <= (cur_size - 1); hole = child) // hole = child是一个下滤的动作
{
child = hole * 2 + 1; // 左孩子
// 要选择expire小的孩子进行比较
if((child < (cur_size - 1)) && (array[child + 1]->expire < array[child]->expire))
{
child++;
}
if(array[child]->expire < temp->expire) // 下滤
{
array[hole] = array[child];
}
else
{
break;
}
}
array[hole] = temp;
}
// 将堆数组容量扩大一倍
void resize() throw (std::exception)
{
heap_timer **temp = new heap_timer*[2 * capacity];
for(int i = 0; i < 2 * capacity; ++i)
{
temp[i] = NULL;
}
if(!temp)
{
throw std::exception();
}
capacity = 2 * capacity;
// 把原来数组的内容拷贝到新的数组
for(int i = 0; i < cur_size; ++i)
{
temp[i] = array[i];
}
delete []array;
array = temp;
}
private:
heap_timer **array; // 定时器指针数组
int capacity;
int cur_size;
};
#endif
复杂度分析
-
对时间堆而言,添加一个定时器的时间复杂度为 O ( l o g n ) O(logn) O(logn)(由于需要上滤操作)
-
删除一个定时器的时间复杂度为 O ( 1 ) O(1) O(1),这是因为只是将目标定时器的回调函数设置为空
-
执行一个定时器的时间复杂度为 O ( 1 ) O(1) O(1)
参考文献
- 《Linux高性能服务器编程》,游双