阅读本文前,建议先看:Redis 事件循环(Event Loop)。
Redis 6 支持接收 3 种连接,对应的接收处理器如下:
- TCP:
acceptTcpHandler
; - TLS:
acceptTLSHandler
; - Unix Socket:
acceptUnixHandler
。
本文只关注 TCP 连接。
对于监听 Socket,没必要为它们创建
connection
对象和client
对象。
acceptTcpHandler
在事件循环过程中,epoll 监听到连接(读)事件,就会调用事先注册好的接收处理器 acceptTcpHandler
来处理。
acceptTcpHandler
定义在 networking.c
中。
void acceptTcpHandler(aeEventLoop *el, int fd, void *privdata, int mask) {
// 每一次最多接收MAX_ACCEPTS_PER_CALL(1000)个连接
int cport, cfd, max = MAX_ACCEPTS_PER_CALL;
char cip[NET_IP_STR_LEN]; // 保存客户端ip地址
// 未使用的变量,编译器会报警,这里关闭编译器警告
UNUSED(el);
UNUSED(mask);
UNUSED(privdata);
// 不断接收客户端连接,直到满1000个
while(max--) {
// 最终调用socket层系统调用 accept 接收连接
cfd = anetTcpAccept(server.neterr, fd, cip, sizeof(cip), &cport);
if (cfd == ANET_ERR) {
if (errno != EWOULDBLOCK)
serverLog(LL_WARNING,
"Accepting client connection: %s", server.neterr);
return;
}
anetCloexec(cfd); // 设置FD_CLOEXEC标志
serverLog(LL_VERBOSE,"Accepted %s:%d", cip, cport);
// 目前只有 Unix Socket 会传递 CLIENT_UNIX_SOCKET 标志
// tcp 和 tls 都是 0
acceptCommonHandler(connCreateAcceptedSocket(cfd),0,cip);
}
}
accept 获取 fd
anetTcpAccept
实际调用 accept
系统调用从内核拿完成 TCP 三次握手的连接,返回连接的句柄(fd)。后续对连接的所有操作都需要直接或间接的依赖这个 fd。
Linux 内核协议栈帮我们完成 TCP 三次握手,然后内核将建立好的 TCP 连接放到监听 Socket 的全连接队列,等待用户态应用程序调用 accept 系统调用获取连接。
创建连接对象
调用 connCreateAcceptedSocket
创建 connection
对象,并用 connection
对象包装连接句柄 fd。后续对底层 fd 的操作都由 connection
对象代理。
connection *connCreateAcceptedSocket(int fd) {
connection *conn = connCreateSocket(); // 创建connection对象
conn->fd = fd; // 关联底层fd
conn->state = CONN_STATE_ACCEPTING;
return conn;
}
connection *connCreateSocket() {
connection *conn = zcalloc(sizeof(connection)); // 分配连接对象
conn->type = &CT_Socket; // 链接类型是CT_Socket
conn->fd = -1;
return conn;
}
连接对象
struct connection {
ConnectionType *type; // 连接类型
ConnectionState state; // 连接状态
short int flags;
short int refs; // 引用计数
int last_errno;
void *private_data; // 指向client结构指针
ConnectionCallbackFunc conn_handler; // 接收处理器
ConnectionCallbackFunc write_handler; // 写处理器
ConnectionCallbackFunc read_handler; // 写处理器
int fd; // 封装了底层的fd
};
连接类型
连接类型被抽象成 ConnectionType
结构体,该结构体中定义了一系列函数接口,具体的连接类型(TCP/TLS)实现它们自己的函数接口。
TCP 连接的 ConnectionType
实现是 CT_Socket
(Unix Socket 实际上复用的 TCP 连接类型)。Redis 6 引入了 SSL/TLS,因此 TLS 的 ConnectionType
实现是 CT_TLS
。
CT_Socket
定义在 connection.c
中。
ConnectionType CT_Socket = {
.ae_handler = connSocketEventHandler,
.close = connSocketClose,
.write = connSocketWrite,
.read = connSocketRead,
.accept = connSocketAccept,
.connect = connSocketConnect,
.set_write_handler = connSocketSetWriteHandler,
.set_read_handler = connSocketSetReadHandler,
.get_last_error = connSocketGetLastError,
.blocking_connect = connSocketBlockingConnect,
.sync_write = connSocketSyncWrite,
.sync_read = connSocketSyncRead,
.sync_readline = connSocketSyncReadLine,
.get_type = connSocketGetType
};
通用的 accept 流程
将创建的 connection
对象作为参数调用 acceptCommonHandler
,接下来进入通用的 accept 流程。
对于 TCP 和 TLS ,flags
都为 0,对于 Unix_Socket,flags
为 CLIENT_UNIX_SOCKET
。
#define MAX_ACCEPTS_PER_CALL 1000
static void acceptCommonHandler(connection *conn, int flags, char *ip) {
client *c;
char conninfo[100];
UNUSED(ip);
// 检查连接状态
if (connGetState(conn) != CONN_STATE_ACCEPTING) {
// ..... 记录日志
connClose(conn); // 关闭连接
return;
}
/* 检查当前总连接数(客户端 + 集群内部连接)是否超过 maxclients 限制。*/
if (listLength(server.clients) + getClusterConnectionsCount() >= server.maxclients)
{
char *err;
if (server.cluster_enabled)
err = "-ERR max number of clients + cluster "
"connections reached\r\n";
else
err = "-ERR max number of clients reached\r\n";
// 发送错误消息
if (connWrite(conn,err,strlen(err)) == -1) {
/* Nothing to do, Just to avoid the warning... */
}
server.stat_rejected_conn++; // 被拒绝连接数加1,用于统计
connClose(conn); // 关闭连接
return;
}
// 创建client客户端对象,并包装连接对象
if ((c = createClient(conn)) == NULL) {
// .... 记录警告日志
connClose(conn); /* May be already closed, just ignore errors */
return;
}
/* Last chance to keep flags */
c->flags |= flags;
/* 启动连接接受操作。
* 请注意,connAccept() 在此处可以自由地执行以下两件事之一:
* 立即调用 clientAcceptHandler() 函数;
* 安排在未来的某个时间调用 clientAcceptHandler() 函数。
* 正因为如此,在调用 connAccept() 之后,我们不能再执行其他任何操作。
*/
if (connAccept(conn, clientAcceptHandler) == C_ERR) {
char conninfo[100];
if (connGetState(conn) == CONN_STATE_ERROR)
serverLog(LL_WARNING,
"Error accepting a client connection: %s (conn: %s)",
connGetLastError(conn), connGetInfo(conn, conninfo, sizeof(conninfo)));
freeClient(connGetPrivateData(conn)); // 销毁客户端对象
return;
}
}
创建客户端对象
createClient
方法用于创建 client,它代表着连接到 Redis 客户端,每个客户端都有各自的输入缓冲区和输出缓冲区,输入缓冲区存储客户端通过网络发送过来的数据,输出缓冲区则存储着 Redis 对客户端的响应数据。client 有三种类型,不同类型的对应缓冲区的大小都不同。
- 普通客户端:除了复制和订阅的客户端之外的所有连接
- slave 客户端:用于主从复制,主节点会为每个从节点单独建立一条连接用于命令复制。
- 订阅客户端:用于发布订阅功能。
createClient
位于 networking.c
。
client *createClient(connection *conn) {
client *c = zmalloc(sizeof(client));// 分配对象
// 将 NULL 作为 conn 传入就有可能创建一个无连接的客户端对象
// 这是很有用的,因为所有命令都需要在客户端的上下文中执行。
// 当在其他上下文(例如一个 Lua 脚本)中执行命令时,我们就需要一个无连接的客户端对象。
if (conn) {
connNonBlock(conn); // 连接设置为非阻塞
// 设置TCP_NODELAY选项,禁用 nagle 算法,存放到内核缓冲区中的数据会立即发出。
// 否则如果一次放到内核缓冲区中的数据数据包太小,则会在多个小的数据包凑成一个足够大的数据包后才会将数据发出。
connEnableTcpNoDelay(conn);
if (server.tcpkeepalive)
connKeepAlive(conn,server.tcpkeepalive); // 设置SO_KEEPALIVE选项,TCP层面的心跳检测
// 设置连接的读处理器为readQueryFromClient
connSetReadHandler(conn, readQueryFromClient);
connSetPrivateData(conn, c); // conn->private_data = client
}
selectDb(c,0); // 默认选择0号数据库
uint64_t client_id;
atomicGetIncr(server.next_client_id,client_id,1);
c->id = client_id; // 分配一个单调递增的客户端ID
c->resp = 2;
c->conn = conn;
c->name = NULL;
c->bufpos = 0;
c->qb_pos = 0;
c->querybuf = sdsempty();
.....
if (conn) linkClient(c); // 将client添加到server.clients链表尾
.....
return c;
}
linkClient
/* 这个函数将客户端链接到客户端的全局链表中。而 unlinkClient() 函数则执行相反的操作,此外它还会做其他一些事情。*/
void linkClient(client *c) {
// 添加到全局链表 server.clients 的尾部
listAddNodeTail(server.clients,c);
/* 请注意,我们会记住存储客户端的链表节点,
* 通过这种方式,在 unlinkClient() 函数中移除客户端时,将不需要进行线性扫描,
* 而仅仅是一个时间复杂度为常量的操作。 */
c->client_list_node = listLast(server.clients);
// 反转client id的字节序,将转换后的id作为key,client对象作为value,保存到基数树server.clients_index。
// 当后续需要通过 id 获取 client 对象时会(例如 CLIENT UNBLOCK 命令)从基数树中查询。
uint64_t id = htonu64(c->id);
raxInsert(server.clients_index,(unsigned char*)&id,sizeof(id),c,NULL);
}
连接对象为 NULL 的 client 不会被添加到 server.clients
和 server.clients_index
。
基数树可视化工具:https://www.cs.usfca.edu/~galles/visualization/RadixTree.html
设置连接的读处理器
connSetReadHandler(conn, readQueryFromClient);
刚建立的连接肯定首先需要监听它的读事件。
/* 注册一个读处理器,当连接可读时该处理器将被调用。如果传入的值为NULL,则会移除现有的处理器 */
static inline int connSetReadHandler(connection *conn, ConnectionCallbackFunc func) {
return conn->type->set_read_handler(conn, func);
}
TCP 连接的连接类型是 CT_Socket
,其 set_read_handler
函数接口的实现是 connSocketSetReadHandler
。
/* 注册一个读处理器,当连接可读时该处理器将被调用。如果传入的值为NULL,则会移除现有的处理器 */
static int connSocketSetReadHandler(connection *conn, ConnectionCallbackFunc func) {
// 重复设置相同的读处理器
if (func == conn->read_handler) return C_OK;
// func = readQueryFromClient
// 如果 func=NULL,意味着移除现有的读处理器
conn->read_handler = func;
// 如果移除读处理器
if (!conn->read_handler)
// 取消对fd读事件的关注
aeDeleteFileEvent(server.el,conn->fd,AE_READABLE);
else
// 添加到epoll中监听读事件
// 注意:读事件的回调函数是connSocketEventHandler
// 文件事件aeFileEvent的clientData字段指向connection对象
// 为什么叫clientData而不是connData,因为redis6之前没有connection对象,只有client对象
if (aeCreateFileEvent(server.el,conn->fd,
AE_READABLE,conn->type->ae_handler,conn) == AE_ERR) return C_ERR;
return C_OK;
}
注意:
读事件处理器是connSocketEventHandler
。
连接的读处理器是readQueryFromClient
。
事件循环中,首先触发的是读事件处理器connSocketEventHandler
,该函数再调用连接读处理器readQueryFromClient
。
虽然已经加入 epoll 进行监听了,但此时,连接对象的状态还是 CONN_STATE_ACCEPTING
,后面会将其改为 CONN_STATE_CONNECTED
。
connAccept()
connAccept(conn, clientAcceptHandler)
/* 连接模块不处理监听和接受套接字的操作,
* 因此我们假设在创建传入连接时已经有一个套接字。
*
* 因此,提供的文件描述符 (fd) 应该与一个已经通过 accept() 接受的套接字相关联。
*
* connAccept() 可能会直接调用 accept_handler(),也可能会返回并在稍后调用它。
* 这种行为有点不太常规,但目的是在不需要额外握手的情况下,减少等待下一个事件循环的需求。
*
* 重要提示:accept_handler 可能会决定关闭连接,调用 connClose()。
* 为了确保操作安全,在这种情况下,连接只会被标记为 CONN_FLAG_CLOSE_SCHEDULED,
* 并且 connAccept() 会返回一个错误。
*
* connAccept() 的调用者必须始终检查返回值,并且在出现错误 (C_ERR) 时必须调用 connClose()。
*/
static inline int connAccept(connection *conn, ConnectionCallbackFunc accept_handler) {
return conn->type->accept(conn, accept_handler);
}
CT_Socket
对 accept
接口的实现是 connSocketAccept
,位于 connection.c
。
static int connSocketAccept(connection *conn, ConnectionCallbackFunc accept_handler) {
int ret = C_OK;
if (conn->state != CONN_STATE_ACCEPTING) return C_ERR;
conn->state = CONN_STATE_CONNECTED;
connIncrRefs(conn); // 增加连接对象的引用计数
// 调用 clientAcceptHandler
if (!callHandler(conn, accept_handler)) ret = C_ERR;
connDecrRefs(conn); // 引用计数减1
return ret;
}
注意:连接的状态设置为
CONN_STATE_CONNECTED
。
调用 clientAcceptHandler
函数,位于 networking.c
。
clientAcceptHandler
主要完成连接验证、安全策略检查及信息统计。
void clientAcceptHandler(connection *conn) {
client *c = connGetPrivateData(conn);
// 检查连接状态
if (connGetState(conn) != CONN_STATE_CONNECTED) {
// ..... 记录警告日志
freeClientAsync(c); // 异步释放客户端资源
return;
}
/* 1. Redis处于保护模式(默认开启)。
* 2. 未绑定特定网络接口(bindaddr_count == 0,意味着绑定所有接口)。
* 3. 用户未设置密码(USER_FLAG_NOPASS)。
* 4. 客户端不是通过 Unix Socket 连接通信(Unix Socket是本地通信,自然无风险)
* 如果上面4个情况同时满足,需要Redis向用户说明如何解决该问题 */
if (server.protected_mode &&
server.bindaddr_count == 0 &&
DefaultUser->flags & USER_FLAG_NOPASS &&
!(c->flags & CLIENT_UNIX_SOCKET))
{
char cip[NET_IP_STR_LEN+1] = { 0 };
// 调用connPeerToString获取客户端IP地址
connPeerToString(conn, cip, sizeof(cip)-1, NULL);
// 检查是否为本地回环地址(IPv4 是 127.0.0.1 或 IPv6 是 ::1)
// 若IP非本地,向客户端发送错误信息并终止连接。
if (strcmp(cip,"127.0.0.1") && strcmp(cip,"::1")) {
char *err =
"-DENIED Redis is running in protected mode because protected "
.....; //一大串错误信息
// 向非本地客户端发送错误信息
if (connWrite(c->conn,err,strlen(err)) == -1) {
/* Nothing to do, Just to avoid the warning... */
}
server.stat_rejected_conn++; // 记录被拒绝的连接数,用于监控
freeClientAsync(c); // 异步释放客户端对象
return;
}
}
// 成功接收连接
server.stat_numconnections++; // 增加当前连接数,用于监控
moduleFireServerEvent(REDISMODULE_EVENT_CLIENT_CHANGE,
REDISMODULE_SUBEVENT_CLIENT_CHANGE_CONNECTED,
c); // 通知已注册模块(如监控插件)有新的客户端连接
}
上面的一大串错误消息大致意思是:
拒绝访问:Redis 正在保护模式下运行,因为已启用保护模式,未指定绑定地址,并且未向客户端请求认证密码。在此模式下,仅接受来自环回接口的连接。
如果您想从外部计算机连接到 Redis,您可以采用以下解决方案之一:
- 只需从环回接口(通过从运行 Redis 服务器的同一主机连接到 Redis)发送命令 “CONFIG SET protected-mode no” 来禁用保护模式。但是,如果您这样做,请确保 Redis 不会从互联网上被公开访问。使用 “CONFIG REWRITE” 使此更改永久生效。
- 或者,您可以通过编辑 Redis 配置文件,将保护模式选项设置为 “no”,然后重新启动服务器来禁用保护模式。
- 如果您只是为了测试而手动启动服务器,请使用 “–protected-mode no” 选项重新启动它。
- 设置一个绑定地址或一个认证密码。
注意:为了使服务器开始接受来自外部的连接,您只需执行上述操作之一即可。
后话
由上面我们知道,所有的连接统一设置了相同读处理器 connSocketEventHandler
并加入了 epoll 中进行监听。当对端向 Redis 发送请求后,会触发 connSocketEventHandler
的调用。
从 connSocketEventHandler
开始,之后做了哪些事,在后面的文章中会讲解。