接上篇:redis7.x源码分析:(4) ae事件处理器(一),接下来看定时器事件的处理。
创建定时器事件:
long long aeCreateTimeEvent(aeEventLoop *eventLoop, long long milliseconds,
aeTimeProc *proc, void *clientData,
aeEventFinalizerProc *finalizerProc)
{
// 通过累加timeEventNextId生成新id
long long id = eventLoop->timeEventNextId++;
aeTimeEvent *te;
te = zmalloc(sizeof(*te));
if (te == NULL) return AE_ERR;
te->id = id;
// 设置触发时间为:当前单调时钟 + 需要定时时长,单位是微妙
te->when = getMonotonicUs() + milliseconds * 1000;
te->timeProc = proc;
te->finalizerProc = finalizerProc;
te->clientData = clientData;
te->prev = NULL;
te->next = eventLoop->timeEventHead;
te->refcount = 0;
if (te->next)
te->next->prev = te;
// 新加入的时间直接添加到链表头
eventLoop->timeEventHead = te;
return id;
}
Redis的定时器实现比较简单,它并未使用最小堆或者时间轮之类的方式来管理定时事件,而是直接使用一个双向链表把所有的定时器事件串起来,然后通过遍历去实现一些功能。这么做主要是因为Redis中定时器使用量非常少,不需要管理大量的定时事件,所以就不需要排序;另外没有频繁注册删除的操作,这样也就不需要快速查找。
删除定时器事件:
int aeDeleteTimeEvent(aeEventLoop *eventLoop, long long id)
{
aeTimeEvent *te = eventLoop->timeEventHead;
while(te) {
if (te->id == id) {
// 对于需要删除的定时器只把它的id置为无效,等待下一轮消息循环 processTimeEvents 中再做处理
te->id = AE_DELETED_EVENT_ID;
return AE_OK;
}
te = te->next;
}
return AE_ERR; /* NO event with the specified ID found */
}
对于定时器的删除操作不会立即执行,而是将定时器修改为无效,在下一轮的 processTimeEvents 中处理。
获取最早超时时间:
static int64_t usUntilEarliestTimer(aeEventLoop *eventLoop) {
aeTimeEvent *te = eventLoop->timeEventHead;
if (te == NULL) return -1;
aeTimeEvent *earliest = NULL;
// 遍历链表找到最早触发的定时器事件
while (te) {
if (!earliest || te->when < earliest->when)
earliest = te;
te = te->next;
}
monotime now = getMonotonicUs();
// 返回最早触发的定时器距当前时间的剩余时长
return (now >= earliest->when) ? 0 : earliest->when - now;
}
从代码中可以看出,最早超时时间就是通过遍历链表找出超时时间when最小的事件来实现的。aeProcessEvents中会调用它获取下个期望的超时时间,然后传递给 aeApiPoll 进行超时等待。
处理定时器事件:
/* Process time events */
static int processTimeEvents(aeEventLoop *eventLoop) {
int processed = 0;
aeTimeEvent *te;
long long maxId;
te = eventLoop->timeEventHead;
// 实际使用的最大id是 timeEventNextId-1
maxId = eventLoop->timeEventNextId-1;
monotime now = getMonotonicUs();
while(te) {
long long id;
/* Remove events scheduled for deletion. */
if (te->id == AE_DELETED_EVENT_ID) {
// 清理失效的定时器
aeTimeEvent *next = te->next;
/* If a reference exists for this timer event,
* don't free it. This is currently incremented
* for recursive timerProc calls */
// 判断下引用计数,还在被引用的直接跳过
if (te->refcount) {
te = next;
continue;
}
// 从链表中删除定时器节点
if (te->prev)
te->prev->next = te->next;
else
eventLoop->timeEventHead = te->next;
if (te->next)
te->next->prev = te->prev;
if (te->finalizerProc) {
te->finalizerProc(eventLoop, te->clientData);
// 提高定时器执行精度,执行回调后更新now
now = getMonotonicUs();
}
zfree(te);
te = next;
continue;
}
/* Make sure we don't process time events created by time events in
* this iteration. Note that this check is currently useless: we always
* add new timers on the head, however if we change the implementation
* detail, this check may be useful again: we keep it here for future
* defense. */
if (te->id > maxId) {
te = te->next;
continue;
}
// 超时了
if (te->when <= now) {
int retval;
id = te->id;
te->refcount++;
// 执行注册的定时器回调,并返回新的定时时间
retval = te->timeProc(eventLoop, id, te->clientData);
te->refcount--;
processed++;
// 提高定时器执行精度,执行回调后更新now
now = getMonotonicUs();
// 回调执行完后,循环定时器就更新时间点,否则就标记成删除等待清理
if (retval != AE_NOMORE) {
te->when = now + retval * 1000;
} else {
te->id = AE_DELETED_EVENT_ID;
}
}
te = te->next;
}
return processed;
}
它是在 aeProcessEvents 的函数末尾调用的,通过遍历链表确认是否有事件超时,然后执行相应的超时回调。
未经许可,请勿转载!作者:jwybobo2007