写在文章开头
对于每一个建立的连接redis都会通过redisClient
来管理建立的socket连接的信息,本文将从源码的分析的角度来剖析的Redis
客户端的基本设计和实现。
Hi,我是 sharkChili ,是个不断在硬核技术上作死的 java coder ,是 CSDN的博客专家 ,也是开源项目 Java Guide 的维护者之一,熟悉 Java 也会一点 Go ,偶尔也会在 C源码 边缘徘徊。写过很多有意思的技术博客,也还在研究并输出技术的路上,希望我的文章对你有帮助,非常欢迎你关注我的公众号: 写代码的SharkChili 。
因为近期收到很多读者的私信,所以也专门创建了一个交流群,感兴趣的读者可以通过上方的公众号获取笔者的联系方式完成好友添加,点击备注 “加群” 即可和笔者和笔者的朋友们进行深入交流。
详解redis客户端
redis客户端基本数据结构
我们先简单介绍一下redis
的基本数据结构:
- 建立连接时
redis
服务端会通过fd
记录分配的客户端socket
文件描述符。 - 可以通过
name
字段来设置客户端的名称。 - 客户端操作的数据库都通过
db
来管理,记录操作的数据库的号码。 - 客户端可操作的指令集以及上一次指向的指令。
- 每当客户端通过指令进行管理数据库键值对时,对应的命令就会存储到
querybuf
,基于querybuf
我们可以解析出命令、key
、value
等客户发送指令信息。 - 对应常规的响应结果,
redis
客户端会用buf
记录。
typedef struct redisClient {
//客户端id
uint64_t id; /* Client incremental unique ID. */
//当前客户端socket的文件描述符
int fd;
//记录当前客户端操作的数据库指针
redisDb *db;
//客户端操作指令集和上一条指令的指针
struct redisCommand *cmd, *lastcmd;
//......
robj *name; /* As set by CLIENT SETNAME */
//客户端传入的字符串指令
sds querybuf;
//......
//响应给客户端的输出缓冲区和偏移量
int bufpos;
char buf[REDIS_REPLY_CHUNK_BYTES];
} redisClient;
连接建立与客户端创建
了解了客户端的基本结构之后,我们就从客户端发起连接并建立通信的整个过程了解一下客户端初始化的逻辑,当redis
服务端收到客户端的连接请求后,服务端会解析出这个establish socket的文件描述符fd,然后创建一个redisClient
对象记录该fd,并完成客户端初始化,而整个初始化的核心步骤大致为:
- 分配可操作数据库指针。
- 记录客户端socket套接字的fd。
- 初始化各种输入、输出、指令字段等各种初始化操作。
- 将其添加到服务端
cliens
链表中。
对应的我们给出接收客户端连接的操作的入口,即redis服务端处理客户端连接的函数acceptTcpHandler
,它拿到客户端socket的文件描述符fd之后,调用acceptCommonHandler完成客户端初始化和追加到server的客户端链表中:
void acceptTcpHandler(aeEventLoop *el, int fd, void *privdata, int mask) {
//......
while(max--) {
//获取客户端socket套接字的文件描述符fd
cfd = anetTcpAccept(server.neterr, fd, cip, sizeof(cip), &cport);
//......
//基于cfd 完成客户端初始化并添加到服务端的clients链表中
acceptCommonHandler(cfd,0);
}
}
可以看到acceptCommonHandler
会基于这个文件描述符创建redis
客户端,并将其追加到redis
链表的末尾:
tatic void acceptCommonHandler(int fd, int flags) {
redisClient *c;
//基于客户端socket的文件描述符创建redis客户端,如果创建失败则直接关闭socket
if ((c = createClient(fd)) == NULL) {
redisLog(REDIS_WARNING,
"Error registering fd event for the new client: %s (fd=%d)",
strerror(errno),fd);
close(fd); /* May be already closed, just ignore errors */
return;
}
//如果当前客户端数没有超过最大值(默认10000)则添加到链表clients末尾
if (listLength(server.clients) > server.maxclients) {
char *err = "-ERR max number of clients reached\r\n";
/* That's a best effort error message, don't check write errors */
if (write(c->fd,err,strlen(err)) == -1) {
/* Nothing to do, Just to avoid the warning... */
}
server.stat_rejected_conn++;
freeClient(c);
return;
}
server.stat_numconnections++;
c->flags |= flags;
}
了解了整体流程我们查看一下上文就说的客户端初始化代码,如笔者所说它会完成客户端基础信息初始化、socket的fd文件描述符信息维护、以及指令、输入、输出缓冲区空间初始化:
redisClient *createClient(int fd) {
redisClient *c = zmalloc(sizeof(redisClient));
//......
//默认分配数据库0
selectDb(c,0);
//初始化客户端id
c->id = server.next_client_id++;
//记录客户端socket的文件描述符的值
c->fd = fd;
//名字默认空
c->name = NULL;
//输出缓冲区偏移量为0
c->bufpos = 0;
//初始化输入缓冲区
c->querybuf = sdsempty();
//......
//命令指针初始化
c->cmd = c->lastcmd = NULL;
//......
//追加到server的clients链表中进行统一管理
if (fd != -1) listAddNodeTail(server.clients,c);
//......
return c;
}
服务端执行客户端发送的指令
完成基本连接建立工作之后,客户端就可以通过指令和服务端进行交互了,假设我们通过redis-cli
键入指令set key value
,redis
服务端收到该指令后,通过该字符串会解析出对应的命令、key
和value
,然后通过命令的字符串set
定位到指令setCommand
,结合解析出的key
、value
完成键值对保存操作:
redis
服务端轮询到对应客户端的指令后,通过fd
定位到这个客户端的redisClient
对象,调用该客户端的命令处理器readQueryFromClient
,将命令字符串querybuf
传入,调用processInputBuffer
解析出命令、key
、value
存入argv
中:
void readQueryFromClient(aeEventLoop *el, int fd, void *privdata, int mask) {
redisClient *c = (redisClient*) privdata;
int nread, readlen;
//......
//调用processInputBuffer解析字符串并执行指令
processInputBuffer(c);
server.current_client = NULL;
}
我们步入processInputBuffer
查看逻辑,可以看到它会调用processInlineBuffer
单行字符串解析或者processMultibulkBuffer
将字符串转为指令、key
、value
存到客户端的argv
数组中,然后再调用processCommand
处理解析出来的指令:
void processInputBuffer(redisClient *c) {
//循环遍历querybuf解析指令和键值对
while(sdslen(c->querybuf)) {
//......
//如果命令为单行则调用processInlineBuffer,如果多行则调用processMultibulkBuffer,将解析结果存入argv中
if (c->reqtype == REDIS_REQ_INLINE) {
if (processInlineBuffer(c) != REDIS_OK) break;
} else if (c->reqtype == REDIS_REQ_MULTIBULK) {
if (processMultibulkBuffer(c) != REDIS_OK) break;
} else {
redisPanic("Unknown request type");
}
//.......
if (c->argc == 0) {
resetClient(c);
} else {
//调用processCommand处理解析出来的指令和键值对
if (processCommand(c) == REDIS_OK)
resetClient(c);
}
}
}
基于上一步解析出来的字符串数组argv
,我们来到processCommand
,通过数组argv[0]
定位到命令为set
,于是从当前服务端的命令表拿到setCommand
方法并赋值给cmd
指针,注意这里的命令表在客户端最后通过call
函数完成调用:
int processCommand(redisClient *c) {
//......
//基于querybuf解析结果得到的数组argv的0索引位置定位到指令,以我们的操作为例则是set,然后赋值给cmd和lastcmd指针
c->cmd = c->lastcmd = lookupCommand(c->argv[0]->ptr);
//......
if (c->flags & REDIS_MULTI &&
c->cmd->proc != execCommand && c->cmd->proc != discardCommand &&
c->cmd->proc != multiCommand && c->cmd->proc != watchCommand)
{
//......
} else {//调用call方法执行我们定位到的指令函数
call(c,REDIS_CALL_FULL);
c->woff = server.master_repl_offset;
if (listLength(server.ready_keys))
handleClientsBlockedOnLists();
}
return REDIS_OK;
}
最终call
方法会走到setCommand
函数,其内部调用setGenericCommand
完成我们的key value存储后,响应OK给客户端,自此一次客户端命令解析和操作完成:
void setCommand(redisClient *c) {
//......
//调用setGenericCommand进行键值对存储
setGenericCommand(c,flags,c->argv[1],c->argv[2],expire,unit,NULL,NULL);
}
void setGenericCommand(redisClient *c, int flags, robj *key, robj *val, robj *expire, int unit, robj *ok_reply, robj *abort_reply) {
//......
//调用key value存到客户端指向的db中,以本文演示的客户端为例,就是存到db0中
setKey(c->db,key,val);
//......
//响应操作结果给客户端
addReply(c, ok_reply ? ok_reply : shared.ok);
}
小结
自此我们从redis客户端创建和指令交互的流程详细的分析的redis客户端这个结构体在redis中的作用,希望对你有帮助。
我是 sharkchili ,CSDN Java 领域博客专家,开源项目—JavaGuide contributor,我想写一些有意思的东西,希望对你有帮助,如果你想实时收到我写的硬核的文章也欢迎你关注我的公众号: 写代码的SharkChili 。
因为近期收到很多读者的私信,所以也专门创建了一个交流群,感兴趣的读者可以通过上方的公众号获取笔者的联系方式完成好友添加,点击备注 “加群” 即可和笔者和笔者的朋友们进行深入交流。
参考
《redis设计与实现》