文章目录
- 摘要
- 了解Linux的epoll
- 了解Reactor模型
- 源码
- initServer
- initListeners
- aeMain
- 事件管理器
- aeProcessEvents
- 读事件
摘要
有时候在面试的时候会被问到Redis为什么那么快?有一点就是客户端请求和应答是基于I/O多路复用(比如linux的epoll)的Reactor模型,今天我们就基于这个点顺着Redis的源码进行探索。
PS:本文基于Redis7.2.0源码
建议阅读本文之前熟悉Linux的epoll和Reactor模型相关知识
了解Linux的epoll
在UNIX网络编程这本书中对socket有清晰的描述,server端的可用系统函数有socket、bind、listen、accept、read、write、close,其中accept、read、write是socket的获取完成三次握手的连接,读,写三种事件。
最基本的socket模型是阻塞模型,随着技术的演进,又有了非阻塞,I/O多路复用,信号驱动、异步,共五种I/O模型。其中I/O多路复用模型实用的最广的,而其又分为select,poll,epoll三种,性能依次提升,Redis在linux下的网络就是基于epoll的。
了解Reactor模型
Reactor模型在网络编程中非常有名,比如Java的Netty,优点是,相比多线(进)程在相同资源条件下可以同时处理更多的连接。Reactor模型分为单Reactor单线程、单Reactor多线程、多Reactor(一主多从)多线程三种类型。
Redis在V6.0之前都是单Reactor单线程类型,V6.0改造成了单Reactor多线程类型。
源码
我们知道 c 语言入口函数一般都是 main 函数,所以我们先从入口函数下手,Redis server端的入口函数在server.c文件中。
int main(int argc, char **argv) {
//初始化服务,比如配置等
initServer();
//初始化网络listen
initListeners();
//开始主线程循环
aeMain(server.el);
aeDeleteEventLoop(server.el);
}
initServer
void initServer(void) {
//初始化server这个全局变量
/* Initialization after setting defaults from the config system. */
server.aof_state = server.aof_enabled ? AOF_ON : AOF_OFF;
//基于配置文件配置的最大连接数创建事件循环器
server.el = aeCreateEventLoop(server.maxclients+CONFIG_FDSET_INCR);
.....
//创建定时任务管理器(比如key过期,客户端超时等),并设置回调函数为serverCron
if (aeCreateTimeEvent(server.el, 1, serverCron, NULL, NULL) == AE_ERR) {
serverPanic("Can't create event loop timers.");
exit(1);
}
/* Register a readable event for the pipe used to awake the event loop
* from module threads. */
if (aeCreateFileEvent(server.el, server.module_pipe[0], AE_READABLE,
modulePipeReadable,NULL) == AE_ERR) {
serverPanic(
"Error registering the readable event for the module pipe.");
}
/* Register before and after sleep handlers (note this needs to be done
* before loading persistence since it is used by processEventsWhileBlocked. */
aeSetBeforeSleepProc(server.el,beforeSleep);
aeSetAfterSleepProc(server.el,afterSleep);
}
initListeners
这里实现了socket的系统函数的scoket,bind,listen三步调用。
根据配置有三种选择TCP/TLS/Unix */,三者都实现了ConnectionType结构体的定义,位于connection.h文件中,在connection.c文件的connTypeInitialize函数会注册这三种类型(CT_Socket,CT_TLS,CT_Unix)
当在配置文件配置了port(比如6379)时,就选用CT_Socket。
void initListeners() {
/* Setup listeners from server config for TCP/TLS/Unix */
int conn_index;
connListener *listener;
if (server.port != 0) {//listen for TCP,
conn_index = connectionIndexByType(CONN_TYPE_SOCKET);
if (conn_index < 0)
serverPanic("Failed finding connection listener of %s", CONN_TYPE_SOCKET);
listener = &server.listeners[conn_index];
listener->bindaddr = server.bindaddr;//我们在配置文件配置bind关键字时,可以配置多个,因此是个数组
listener->bindaddr_count = server.bindaddr_count;
listener->port = server.port;
listener->ct = connectionByType(CONN_TYPE_SOCKET);//获取CT_Socket
}
......
/* create all the configured listener, and add handler to start to accept */
int listen_fds = 0;
for (int j = 0; j < CONN_TYPE_MAX; j++) {
listener = &server.listeners[j];
if (listener->ct == NULL)
continue;
if (connListen(listener) == C_ERR) {//该函数会调用CT_Socket的listen函数,最后完成scoket,bind,listen的系统调用,并设置为非阻塞I/O
//在CT_Socket下调用链路为connListen=>(socket.c文件)connSocketListen=>listenToPort=>anetTcpServer(完成
//scoket,bind,listen的系统调用) ,anetNonBlock(设置为非阻塞I/O)
serverLog(LL_WARNING, "Failed listening on port %u (%s), aborting.", listener->port, listener->ct->get_type(NULL));
exit(1);
}
if (createSocketAcceptHandler(listener, connAcceptHandler(listener->ct)) != C_OK)
serverPanic("Unrecoverable error creating %s listener accept handler.", listener->ct->get_type(NULL));
listen_fds += listener->count;
}
....
}
本函数将socket设置为可读状态,并绑定CT_Socket的accept_handler函数,即socket.c的connSocketAcceptHandler函数,用于处理客户端的连接建立
int createSocketAcceptHandler(connListener *sfd, aeFileProc *accept_handler) {
int j;
for (j = 0; j < sfd->count; j++) {
if (aeCreateFileEvent(server.el, sfd->fd[j], AE_READABLE, accept_handler,sfd) == AE_ERR) {
/* Rollback */
for (j = j-1; j >= 0; j--) aeDeleteFileEvent(server.el, sfd->fd[j], AE_READABLE);
return C_ERR;
}
}
return C_OK;
}
aeMain
本函数用于进入主线程事件循环逻辑
void aeMain(aeEventLoop *eventLoop) {
eventLoop->stop = 0;
while (!eventLoop->stop) {
aeProcessEvents(eventLoop, AE_ALL_EVENTS|
AE_CALL_BEFORE_SLEEP|
AE_CALL_AFTER_SLEEP);
}
}
本章中很多ae开头的函数,比如aeMain、aeCreateFileEvent、aeProcessEvents等,这些函数位于ae.c文件,是Redis事件管理器的具体实现,具体见下一章。
事件管理器
在上一章中知道了通过aeMain开启了事件循环,那么先看一下aeProcessEvents函数。
在聊aeProcessEvents函数之前我们先看下Redis如何根据系统选择操作系统提供的I/O库的。
在ae.c文件开头可以看到如下代码:
/* Include the best multiplexing layer supported by this system.
* The following should be ordered by performances, descending. */
#ifdef HAVE_EVPORT
#include "ae_evport.c"
#else
#ifdef HAVE_EPOLL
#include "ae_epoll.c" //linux下选择epoll
#else
#ifdef HAVE_KQUEUE
#include "ae_kqueue.c"
#else
#include "ae_select.c"
#endif
#endif
#endif
aeProcessEvents
int aeProcessEvents(aeEventLoop *eventLoop, int flags)
{
...
//执行initServer函数中注册的beforesleep回调函数
if (eventLoop->beforesleep != NULL && flags & AE_CALL_BEFORE_SLEEP)
eventLoop->beforesleep(eventLoop);
/* Call the multiplexing API, will return only on timeout or when
* some event fires. */
numevents = aeApiPoll(eventLoop, tvp);//获取socket事件,如果是epoll则是执行epoll_wait获取有事件的socket
/* Don't process file events if not requested. */
if (!(flags & AE_FILE_EVENTS)) {
numevents = 0;
}
//执行initServer函数中注册的aftersleep 回调函数
if (eventLoop->aftersleep != NULL && flags & AE_CALL_AFTER_SLEEP)
eventLoop->aftersleep(eventLoop);
...
for (j = 0; j < numevents; j++) {
//读事件
//在上一章的createSocketAcceptHandler已经了解到socket listen后第一次绑定了socket.c的connSocketAcceptHandler
//那么如果是accept事件就会调用connSocketAcceptHandler,继而调用networking.c的createClient
if (!invert && fe->mask & mask & AE_READABLE) {
fe->rfileProc(eventLoop,fd,fe->clientData,mask);
fired++;
fe = &eventLoop->events[fd]; /* Refresh in case of resize. */
}
//写事件
if (fe->mask & mask & AE_WRITABLE) {
if (!fired || fe->wfileProc != fe->rfileProc) {
fe->wfileProc(eventLoop,fd,fe->clientData,mask);
fired++;
}
}
/* If we have to invert the call, fire the readable event now after the writable one. */
if (invert) {
fe = &eventLoop->events[fd]; /* Refresh in case of resize. */
if ((fe->mask & mask & AE_READABLE) && (!fired || fe->wfileProc != fe->rfileProc))
{
fe->rfileProc(eventLoop,fd,fe->clientData,mask);
fired++;
}
}
}
}
读事件
socket的accept和read都是读事件。所以在CT_Socket下,客户端第一次建立连接时调用socket.c的connSocketAcceptHandler
accept之后等待read事件
static void connSocketAcceptHandler(aeEventLoop *el, int fd, void *privdata, int mask) {
int cport, cfd, max = MAX_ACCEPTS_PER_CALL;
char cip[NET_IP_STR_LEN];
UNUSED(el);
UNUSED(mask);
UNUSED(privdata);
while(max--) {
cfd = anetTcpAccept(server.neterr, fd, cip, sizeof(cip), &cport);//调用系统函数accept,并设置socket为非阻塞的
if (cfd == ANET_ERR) {
if (errno != EWOULDBLOCK)
serverLog(LL_WARNING,
"Accepting client connection: %s", server.neterr);
return;
}
serverLog(LL_VERBOSE,"Accepted %s:%d", cip, cport);
acceptCommonHandler(connCreateAcceptedSocket(cfd, NULL),0,cip);
}
}
//在networking.c文件
void acceptCommonHandler(connection *conn, int flags, char *ip) {
...
/* Create connection and client */
if ((c = createClient(conn)) == NULL) {
char addr[NET_ADDR_STR_LEN] = {0};
char laddr[NET_ADDR_STR_LEN] = {0};
connFormatAddr(conn, addr, sizeof(addr), 1);
connFormatAddr(conn, laddr, sizeof(addr), 0);
serverLog(LL_WARNING,
"Error registering fd event for the new client connection: %s (addr=%s laddr=%s)",
connGetLastError(conn), addr, laddr);
connClose(conn); /* May be already closed, just ignore errors */
return;
}
...
}
client *createClient(connection *conn) {
...
if (conn) {
connEnableTcpNoDelay(conn);
if (server.tcpkeepalive)
connKeepAlive(conn,server.tcpkeepalive);
connSetReadHandler(conn, readQueryFromClient);//重点。调用CT_Socket的set_read_handler函数,就socket.c文件的connSocketSetReadHandler
connSetPrivateData(conn, c);
}
...
}
static int connSocketSetReadHandler(connection *conn, ConnectionCallbackFunc func) {
if (func == conn->read_handler) return C_OK;
conn->read_handler = func;//设置read事件的回调函数为readQueryFromClient
if (!conn->read_handler)
aeDeleteFileEvent(server.el,conn->fd,AE_READABLE);
else
if (aeCreateFileEvent(server.el,conn->fd,//创建AE_READABLE事件,并绑定读回调函数为CT_Socket的ae_handler函数,等待aeProcessEvents循环触发进行回调
AE_READABLE,conn->type->ae_handler,conn) == AE_ERR) return C_ERR;
return C_OK;
}
下面看看ae_handler函数,即scoket.c文件的connSocketEventHandler干了什么
static void connSocketEventHandler(struct aeEventLoop *el, int fd, void *clientData, int mask)
{
/* Handle normal I/O flows */
if (!invert && call_read) {
if (!callHandler(conn, conn->read_handler)) return;//可以看到会调conn->read_handler函数,即connSocketSetReadHandler中绑定的readQueryFromClient
}
/* Fire the writable event. */
if (call_write) {
if (!callHandler(conn, conn->write_handler)) return;
}
/* If we have to invert the call, fire the readable event now
* after the writable one. */
if (invert && call_read) {
if (!callHandler(conn, conn->read_handler)) return;
}
}
readQueryFromClient 函数就是解析客户端数据了,