写在文章开头
近期团队安排变得比较紧急,关于redis
系列的更新相对放缓一些,而我们今天要讨论的就是redis中关于事件模型的设计,我们都知道redis
通过单线程实现高效的网络IO处理,本文会从源码的角度来讲解一下redis
中文件事件驱动这一块的设计。
Hi,我是 sharkChili ,是个不断在硬核技术上作死的 java coder ,是 CSDN的博客专家 ,也是开源项目 Java Guide 的维护者之一,熟悉 Java 也会一点 Go ,偶尔也会在 C源码 边缘徘徊。写过很多有意思的技术博客,也还在研究并输出技术的路上,希望我的文章对你有帮助,非常欢迎你关注我的公众号: 写代码的SharkChili 。
因为近期收到很多读者的私信,所以也专门创建了一个交流群,感兴趣的读者可以通过上方的公众号获取笔者的联系方式完成好友添加,点击备注 “加群” 即可和笔者和笔者的朋友们进行深入交流。
详解文件事件的设计与实现
单线程reactor模式的设计
Linux
系统的思想是一切皆文件,所以在对于网络通信中服务端socket
也是为其分配一个文件描述符(fd)
并将其以文件的形式进行管理,并将其读写交给epoll
进行轮询处理,由此构成一个单线程的reactor
模型。
如下图所示,redis
服务端的主线程会在每一次循环时通过epoll
并将服务端socket
的fd
传入非阻塞获取该文件就绪的事件,然后根据事件的类型有序
的分发给对应的处理器进行处理。
对应的我们也给出redis
文件事件循环框架的核心代码,可以看到其入参为server.maxclients+REDIS_EVENTLOOP_FDSET_INCR
也就是10000+32+96
,这就是事件循环框架每次可容纳的socket
的文件描述符的大小:
//创建事件循环框架
server.el = aeCreateEventLoop(server.maxclients+REDIS_EVENTLOOP_FDSET_INCR);
对此我们步入aeCreateEventLoop
即可看到事件循环框架的核心实现,其本质就是完成事件循环框架的初始化,通过上一步传入的大小创建创建socket的数组空间,记录每一个接入的客户端socket
的事件。
aeEventLoop *aeCreateEventLoop(int setsize) {
aeEventLoop *eventLoop;
int i;
//空间分配
if ((eventLoop = zmalloc(sizeof(*eventLoop))) == NULL) goto err;
eventLoop->events = zmalloc(sizeof(aeFileEvent)*setsize);
eventLoop->fired = zmalloc(sizeof(aeFiredEvent)*setsize);
if (eventLoop->events == NULL || eventLoop->fired == NULL) goto err;
//记录事件数组的大小
eventLoop->setsize = setsize;
//......
//创建事件循环框架
if (aeApiCreate(eventLoop) == -1) goto err;
//基于传入的setsize初始化每一个文件描述符的events空间为AE_NONE,表示当前socket没有就绪的事件
for (i = 0; i < setsize; i++)
eventLoop->events[i].mask = AE_NONE;
return eventLoop;
//......
}
后续的事件与轮询处理都会在aeMain
中进行不断的轮询并转交分发器进行处理,这里我们也给出对应的核心代码,可以看到该循环本质就是传入轮询框架eventLoop
,轮询所的事件AE_ALL_EVENTS
:
void aeMain(aeEventLoop *eventLoop) {
eventLoop->stop = 0;
//循环
while (!eventLoop->stop) {
if (eventLoop->beforesleep != NULL)
eventLoop->beforesleep(eventLoop);
aeProcessEvents(eventLoop, AE_ALL_EVENTS);
}
}
我们步入其内部就可以看到对于服务端套接字(socket)的事件非阻塞轮询查看和事件分发处理逻辑:
int aeProcessEvents(aeEventLoop *eventLoop, int flags)
{
int processed = 0, numevents;
//......
//非阻塞获取服务端socket的就绪事件
numevents = aeApiPoll(eventLoop, tvp);
//基于返回值处理事件
for (j = 0; j < numevents; j++) {
aeFileEvent *fe = &eventLoop->events[eventLoop->fired[j].fd];
//获取事件的mask值
int mask = eventLoop->fired[j].mask;
//获取该事件是那个客户端套接字
int fd = eventLoop->fired[j].fd;
int rfired = 0;
//如果是读事件则调用rfileProc指针的函数进行命令处理
if (fe->mask & mask & AE_READABLE) {
rfired = 1;
fe->rfileProc(eventLoop,fd,fe->clientData,mask);
}
//如果是写则将该事件交给写处理器wfileProc将数据写回客户端
if (fe->mask & mask & AE_WRITABLE) {
if (!rfired || fe->wfileProc != fe->rfileProc)
fe->wfileProc(eventLoop,fd,fe->clientData,mask);
}
processed++;
}
}
//......
}
处理redis客户端连接请求
了解整体的模型设计之后,我们就来看看各个事件处理器的具体实现,当epoll
轮询的有客户端进行连接时,就会将该事件分发给连接处理器,而连接处理器的核心逻辑为:
- 记录套接字信息。
- 为该客户端初始化一个redisClient结构体记录其信息。
- 将其套接字
(socket)
的事件注册到epoll中后续进行轮询处理。
这里我们给出redis
的main方法关于redis
服务端acceptTcpHandler
处理的初始化逻辑,可以看到它会将acceptTcpHandler
和服务端套接字的AE_READABLE
进行绑定:
for (j = 0; j < server.ipfd_count; j++) {
//为服务端套接字绑定acceptTcpHandler连接处理器
if (aeCreateFileEvent(server.el, server.ipfd[j], AE_READABLE,
acceptTcpHandler,NULL) == AE_ERR)
{
//......
}
}
这里我们也给出acceptTcpHandler
处理器的实现逻辑,可以看到其内部的处理逻辑本质获取套接字的文件描述符,并基于这个文件描述符fd
为其进行初始化生成redisClient
并将事件注册到epoll
让epoll
进行轮询处理:
void acceptTcpHandler(aeEventLoop *el, int fd, void *privdata, int mask) {
//......
while(max--) {
//获取套接字的文件描述符
cfd = anetTcpAccept(server.neterr, fd, cip, sizeof(cip), &cport);
//......
//为该套接字进行初始化,并为其生产客户端对象进行管理,并将客户端读写事件交给epoll
acceptCommonHandler(cfd,0);
}
}
步入acceptCommonHandler
的逻辑我们即可看到创建redisClient
的方法createClient
,从核心逻辑可以看出如果redis
客户端对象创建失败,它会直接关闭套接字。同理如果创建的redis客户端达到我们上文maxclients
的上限也会将其释放并调用write
方法提交写事件告知客户端已达上限:
static void acceptCommonHandler(int fd, int flags) {
redisClient *c;
//为客户端套接字创建redisClient对象并注册事件到epoll中
if ((c = createClient(fd)) == NULL) {
redisLog(REDIS_WARNING,
"Error registering fd event for the new client: %s (fd=%d)",
strerror(errno),fd);
close(fd); /* May be already closed, just ignore errors */
return;
}
//.......
//客户端已达上线输出错误并释放客户端
if (listLength(server.clients) > server.maxclients) {
char *err = "-ERR max number of clients reached\r\n";
//输出错误
if (write(c->fd,err,strlen(err)) == -1) {
/* Nothing to do, Just to avoid the warning... */
}
//自增失败连接数
server.stat_rejected_conn++;
//释放客户端对象
freeClient(c);
return;
}
server.stat_numconnections++;
c->flags |= flags;
}
最后再来说说创建客户端的处理逻辑,如我们上文所说就是将客户端读事件注册到epoll
中,让epoll
进行轮询并分发处理,完成该操作后再初始化客户端的各种基础信息:
redisClient *createClient(int fd) {
redisClient *c = zmalloc(sizeof(redisClient));
//......
//将当前客户端套接字读请求即AE_READABLE事件注册到epoll中,并绑定命令处理器readQueryFromClient
if (aeCreateFileEvent(server.el,fd,AE_READABLE,
readQueryFromClient, c) == AE_ERR)
{
close(fd);
zfree(c);
return NULL;
}
}
//初始化客户端对象各种参数
selectDb(c,0);
c->id = server.next_client_id++;
c->fd = fd;
//.......
return c;
}
客户端命令读取与回复
经过上述的处理后,redis
客户但就和服务端建立连接,每当客户端发起各种指令操作时,redis
的epoll就会轮询到这个客户端套接字的读事件,并将其交给命令处理器处理,完成后将处理结果交给命令回复处理器提交写事件,下次epoll
轮询到这个事件就会将结果交给redis
客户端
我们再次给出事件轮询的代码片段,该片段位于ae.c
文件下,它会调用aeApiPoll
轮询所有套接字对应的fd是否有事件,以客户端命令请求为例,该方法就会返回redis客户端
套接字的事件对象,然后计算得到是AE_READABLE
事件,就将其交给readQueryFromClient
处理器处理,完成后会将处理结果即命令回复交给写处理器sendReplyToClient
中,下次aeApiPoll
就会轮询到该事件并将其回复给redis
客户端:
//轮询所有套接字查看是否有就绪事件
numevents = aeApiPoll(eventLoop, tvp);
for (j = 0; j < numevents; j++) {
aeFileEvent *fe = &eventLoop->events[eventLoop->fired[j].fd];
int mask = eventLoop->fired[j].mask;
int fd = eventLoop->fired[j].fd;
int rfired = 0;
//如果是AE_READABLE则说明客户端发起命令,调用rfileProc走到readQueryFromClient方法,完成后生成写事件下次循环时就会走到下方sendReplyToClient的处理逻辑
if (fe->mask & mask & AE_READABLE) {
rfired = 1;
fe->rfileProc(eventLoop,fd,fe->clientData,mask);
}
//读请求的处理结果就会走到AE_WRITABLE的处理器sendReplyToClient将结果写回客户端
if (fe->mask & mask & AE_WRITABLE) {
if (!rfired || fe->wfileProc != fe->rfileProc)
fe->wfileProc(eventLoop,fd,fe->clientData,mask);
}
processed++;
}
小结
自此我们将redis单线程的reactor模型以及对应的文件驱动设计都分析完毕,希望对你有所帮助。
我是 sharkchili ,CSDN Java 领域博客专家,开源项目—JavaGuide contributor,我想写一些有意思的东西,希望对你有帮助,如果你想实时收到我写的硬核的文章也欢迎你关注我的公众号: 写代码的SharkChili 。
因为近期收到很多读者的私信,所以也专门创建了一个交流群,感兴趣的读者可以通过上方的公众号获取笔者的联系方式完成好友添加,点击备注 “加群” 即可和笔者和笔者的朋友们进行深入交流。
参考
《redis设计与实现》