Redis中的Reactor模型源码探索

news2024/12/25 9:25:59

文章目录

  • 摘要
      • 了解Linux的epoll
      • 了解Reactor模型
  • 源码
      • initServer
      • initListeners
      • aeMain
  • 事件管理器
      • aeProcessEvents
      • 读事件

摘要

有时候在面试的时候会被问到Redis为什么那么快?有一点就是客户端请求和应答是基于I/O多路复用(比如linux的epoll)的Reactor模型,今天我们就基于这个点顺着Redis的源码进行探索。

PS:本文基于Redis7.2.0源码

建议阅读本文之前熟悉Linux的epoll和Reactor模型相关知识

了解Linux的epoll

在UNIX网络编程这本书中对socket有清晰的描述,server端的可用系统函数有socket、bind、listen、accept、read、write、close,其中accept、read、write是socket的获取完成三次握手的连接,读,写三种事件。

最基本的socket模型是阻塞模型,随着技术的演进,又有了非阻塞,I/O多路复用,信号驱动、异步,共五种I/O模型。其中I/O多路复用模型实用的最广的,而其又分为select,poll,epoll三种,性能依次提升,Redis在linux下的网络就是基于epoll的。
在这里插入图片描述

了解Reactor模型

Reactor模型在网络编程中非常有名,比如Java的Netty,优点是,相比多线(进)程在相同资源条件下可以同时处理更多的连接。Reactor模型分为单Reactor单线程、单Reactor多线程、多Reactor(一主多从)多线程三种类型。

Redis在V6.0之前都是单Reactor单线程类型,V6.0改造成了单Reactor多线程类型。

源码

我们知道 c 语言入口函数一般都是 main 函数,所以我们先从入口函数下手,Redis server端的入口函数在server.c文件中。

int main(int argc, char **argv) {
   //初始化服务,比如配置等
	initServer();
	//初始化网络listen
	initListeners();
	//开始主线程循环
	aeMain(server.el);
  aeDeleteEventLoop(server.el);
}

initServer

void initServer(void) {
   //初始化server这个全局变量
	/* Initialization after setting defaults from the config system. */
   server.aof_state = server.aof_enabled ? AOF_ON : AOF_OFF;
   //基于配置文件配置的最大连接数创建事件循环器
   server.el = aeCreateEventLoop(server.maxclients+CONFIG_FDSET_INCR);
   .....
    //创建定时任务管理器(比如key过期,客户端超时等),并设置回调函数为serverCron
    if (aeCreateTimeEvent(server.el, 1, serverCron, NULL, NULL) == AE_ERR) {
        serverPanic("Can't create event loop timers.");
        exit(1);
    }
    /* Register a readable event for the pipe used to awake the event loop
     * from module threads. */
    if (aeCreateFileEvent(server.el, server.module_pipe[0], AE_READABLE,
        modulePipeReadable,NULL) == AE_ERR) {
            serverPanic(
                "Error registering the readable event for the module pipe.");
    }

    /* Register before and after sleep handlers (note this needs to be done
     * before loading persistence since it is used by processEventsWhileBlocked. */
    aeSetBeforeSleepProc(server.el,beforeSleep);
    aeSetAfterSleepProc(server.el,afterSleep);
}

initListeners

这里实现了socket的系统函数的scoket,bind,listen三步调用。

根据配置有三种选择TCP/TLS/Unix */,三者都实现了ConnectionType结构体的定义,位于connection.h文件中,在connection.c文件的connTypeInitialize函数会注册这三种类型(CT_Socket,CT_TLS,CT_Unix)

当在配置文件配置了port(比如6379)时,就选用CT_Socket。

void initListeners() {
 /* Setup listeners from server config for TCP/TLS/Unix */
    int conn_index;
    connListener *listener;
    if (server.port != 0) {//listen for TCP,
        conn_index = connectionIndexByType(CONN_TYPE_SOCKET);
        if (conn_index < 0)
            serverPanic("Failed finding connection listener of %s", CONN_TYPE_SOCKET);
        listener = &server.listeners[conn_index];
        listener->bindaddr = server.bindaddr;//我们在配置文件配置bind关键字时,可以配置多个,因此是个数组
        listener->bindaddr_count = server.bindaddr_count;
        listener->port = server.port;
        listener->ct = connectionByType(CONN_TYPE_SOCKET);//获取CT_Socket
    }
    ......
     /* create all the configured listener, and add handler to start to accept */
    int listen_fds = 0;
    for (int j = 0; j < CONN_TYPE_MAX; j++) {
        listener = &server.listeners[j];
        if (listener->ct == NULL)
            continue;

        if (connListen(listener) == C_ERR) {//该函数会调用CT_Socket的listen函数,最后完成scoket,bind,listen的系统调用,并设置为非阻塞I/O
        //在CT_Socket下调用链路为connListen=>(socket.c文件)connSocketListen=>listenToPort=>anetTcpServer(完成
        //scoket,bind,listen的系统调用)  ,anetNonBlock(设置为非阻塞I/O)
            serverLog(LL_WARNING, "Failed listening on port %u (%s), aborting.", listener->port, listener->ct->get_type(NULL));
            exit(1);
        }

        if (createSocketAcceptHandler(listener, connAcceptHandler(listener->ct)) != C_OK)
            serverPanic("Unrecoverable error creating %s listener accept handler.", listener->ct->get_type(NULL));
       listen_fds += listener->count;
    }
    ....
}

本函数将socket设置为可读状态,并绑定CT_Socket的accept_handler函数,即socket.c的connSocketAcceptHandler函数,用于处理客户端的连接建立

int createSocketAcceptHandler(connListener *sfd, aeFileProc *accept_handler) {
    int j;

    for (j = 0; j < sfd->count; j++) {
        if (aeCreateFileEvent(server.el, sfd->fd[j], AE_READABLE, accept_handler,sfd) == AE_ERR) {
            /* Rollback */
            for (j = j-1; j >= 0; j--) aeDeleteFileEvent(server.el, sfd->fd[j], AE_READABLE);
            return C_ERR;
        }
    }
    return C_OK;
}

aeMain

本函数用于进入主线程事件循环逻辑

void aeMain(aeEventLoop *eventLoop) {
    eventLoop->stop = 0;
    while (!eventLoop->stop) {
        aeProcessEvents(eventLoop, AE_ALL_EVENTS|
                                   AE_CALL_BEFORE_SLEEP|
                                   AE_CALL_AFTER_SLEEP);
    }
}

本章中很多ae开头的函数,比如aeMain、aeCreateFileEvent、aeProcessEvents等,这些函数位于ae.c文件,是Redis事件管理器的具体实现,具体见下一章。

事件管理器

在上一章中知道了通过aeMain开启了事件循环,那么先看一下aeProcessEvents函数。
在聊aeProcessEvents函数之前我们先看下Redis如何根据系统选择操作系统提供的I/O库的。
在ae.c文件开头可以看到如下代码:

/* Include the best multiplexing layer supported by this system.
 * The following should be ordered by performances, descending. */
#ifdef HAVE_EVPORT
#include "ae_evport.c"
#else
    #ifdef HAVE_EPOLL
    #include "ae_epoll.c" //linux下选择epoll
    #else
        #ifdef HAVE_KQUEUE
        #include "ae_kqueue.c"
        #else
        #include "ae_select.c"
        #endif
    #endif
#endif

aeProcessEvents

int aeProcessEvents(aeEventLoop *eventLoop, int flags)
{
	...
	//执行initServer函数中注册的beforesleep回调函数
  if (eventLoop->beforesleep != NULL && flags & AE_CALL_BEFORE_SLEEP)
            eventLoop->beforesleep(eventLoop);
    /* Call the multiplexing API, will return only on timeout or when
         * some event fires. */
  numevents = aeApiPoll(eventLoop, tvp);//获取socket事件,如果是epoll则是执行epoll_wait获取有事件的socket
  /* Don't process file events if not requested. */
 	if (!(flags & AE_FILE_EVENTS)) {
 			numevents = 0;
 	}
 	//执行initServer函数中注册的aftersleep 回调函数
 	if (eventLoop->aftersleep != NULL && flags & AE_CALL_AFTER_SLEEP)
      eventLoop->aftersleep(eventLoop);    
        
	...
	for (j = 0; j < numevents; j++) {
	  //读事件
	  //在上一章的createSocketAcceptHandler已经了解到socket listen后第一次绑定了socket.c的connSocketAcceptHandler
	  //那么如果是accept事件就会调用connSocketAcceptHandler,继而调用networking.c的createClient
		if (!invert && fe->mask & mask & AE_READABLE) {
         fe->rfileProc(eventLoop,fd,fe->clientData,mask);
         fired++;
         fe = &eventLoop->events[fd]; /* Refresh in case of resize. */
     }
		 //写事件
     if (fe->mask & mask & AE_WRITABLE) {
           if (!fired || fe->wfileProc != fe->rfileProc) {
                   fe->wfileProc(eventLoop,fd,fe->clientData,mask);
                   fired++;
           }
     }
     /* If we have to invert the call, fire the readable event now  after the writable one. */
     if (invert) {
         fe = &eventLoop->events[fd]; /* Refresh in case of resize. */
          if ((fe->mask & mask & AE_READABLE) && (!fired || fe->wfileProc != fe->rfileProc))
          {
                    fe->rfileProc(eventLoop,fd,fe->clientData,mask);
                    fired++;
           }
      }
	}
}

读事件

socket的accept和read都是读事件。所以在CT_Socket下,客户端第一次建立连接时调用socket.c的connSocketAcceptHandler
accept之后等待read事件

static void connSocketAcceptHandler(aeEventLoop *el, int fd, void *privdata, int mask) {
    int cport, cfd, max = MAX_ACCEPTS_PER_CALL;
    char cip[NET_IP_STR_LEN];
    UNUSED(el);
    UNUSED(mask);
    UNUSED(privdata);

    while(max--) {
        cfd = anetTcpAccept(server.neterr, fd, cip, sizeof(cip), &cport);//调用系统函数accept,并设置socket为非阻塞的
        if (cfd == ANET_ERR) {
            if (errno != EWOULDBLOCK)
                serverLog(LL_WARNING,
                    "Accepting client connection: %s", server.neterr);
            return;
        }
        serverLog(LL_VERBOSE,"Accepted %s:%d", cip, cport);
        acceptCommonHandler(connCreateAcceptedSocket(cfd, NULL),0,cip);
    }
}

//在networking.c文件
void acceptCommonHandler(connection *conn, int flags, char *ip) {
		...
		/* Create connection and client */
    if ((c = createClient(conn)) == NULL) {
        char addr[NET_ADDR_STR_LEN] = {0};
        char laddr[NET_ADDR_STR_LEN] = {0};
        connFormatAddr(conn, addr, sizeof(addr), 1);
        connFormatAddr(conn, laddr, sizeof(addr), 0);
        serverLog(LL_WARNING,
                  "Error registering fd event for the new client connection: %s (addr=%s laddr=%s)",
                  connGetLastError(conn), addr, laddr);
        connClose(conn); /* May be already closed, just ignore errors */
        return;
    }
		...
}
client *createClient(connection *conn) {
	...
	if (conn) {
        connEnableTcpNoDelay(conn);
        if (server.tcpkeepalive)
            connKeepAlive(conn,server.tcpkeepalive);
        connSetReadHandler(conn, readQueryFromClient);//重点。调用CT_Socket的set_read_handler函数,就socket.c文件的connSocketSetReadHandler
        connSetPrivateData(conn, c);
    }
	...
}

static int connSocketSetReadHandler(connection *conn, ConnectionCallbackFunc func) {
    if (func == conn->read_handler) return C_OK;

    conn->read_handler = func;//设置read事件的回调函数为readQueryFromClient
    if (!conn->read_handler)
        aeDeleteFileEvent(server.el,conn->fd,AE_READABLE);
    else
        if (aeCreateFileEvent(server.el,conn->fd,//创建AE_READABLE事件,并绑定读回调函数为CT_Socket的ae_handler函数,等待aeProcessEvents循环触发进行回调
                    AE_READABLE,conn->type->ae_handler,conn) == AE_ERR) return C_ERR;
    return C_OK;
}

下面看看ae_handler函数,即scoket.c文件的connSocketEventHandler干了什么

static void connSocketEventHandler(struct aeEventLoop *el, int fd, void *clientData, int mask)
{
		/* Handle normal I/O flows */
    if (!invert && call_read) {
        if (!callHandler(conn, conn->read_handler)) return;//可以看到会调conn->read_handler函数,即connSocketSetReadHandler中绑定的readQueryFromClient
    }
    /* Fire the writable event. */
    if (call_write) {
        if (!callHandler(conn, conn->write_handler)) return;
    }
    /* If we have to invert the call, fire the readable event now
     * after the writable one. */
    if (invert && call_read) {
        if (!callHandler(conn, conn->read_handler)) return;
    }
}

readQueryFromClient 函数就是解析客户端数据了,

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/570269.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

【高级语言程序设计(一)】第 9 章:编译预处理命令

目录 前言 一、宏定义命令 &#xff08;1&#xff09;无参宏定义 &#xff08;2&#xff09;有参宏定义 ① 带参数的宏定义 ② 带参宏定义与函数的区别 二、文件包含命 &#xff08;1&#xff09;文件包含命令的定义 &#xff08;2&#xff09;文件包含命令的格式 &…

【Leetcode60天带刷】day02—— 977.有序数组的平方、209.长度最小的子数组、 59.螺旋矩阵II

题目&#xff1a;997.有序数组的平方 Leetcode原题链接&#xff1a;997.有序数组的平方——力扣 思考历程与知识点&#xff1a; 题目的意思很简单&#xff0c;就是把每个数的平方&#xff0c;按从小到大的顺序排个序&#xff0c;再输出出来。 第一想法是先每个数平方一遍&a…

Stream API的使用

使用Stream API对集合中的数据进行操作&#xff0c;就类似使用SQL语句对数据库执行查询 Stream不会存储数据Stream不会改变源对象&#xff0c;而是返回一个持有结果的新StreamStream是延迟执行的&#xff0c;只有在需要结果的时候才执行&#xff0c;即只有执行终止操作&#xf…

离散数学_十章-图 ( 2 ):图的术语和几种特殊的图

&#x1f4f7;10.2 图的术语和几种特殊的图 1. 基本术语1.1 邻接&#xff08;相邻&#xff09;1.2 邻居1.3 顶点的度1.4 孤立点1.5 悬挂点例题 2. 握手定理3. 握手定理的推论4. 带有有向边的图的术语4.1 邻接4.2 度——出度 和 入度4.3 例题&#xff1a; 5. 定理&#xff1a;入…

PHP 反序列化漏洞

PHP反序列化漏洞在实际测试中出现的频率并不高&#xff0c;主要常出现在CTF中。 PHP序列化概述 PHP序列化函数&#xff1a; serialize&#xff1a;将PHP的数据&#xff0c;数组&#xff0c;对象等序列化为字符串unserialize&#xff1a;将序列化后的字符串反序列化为数据&…

chatgpt赋能python:Python单词库的重要性

Python单词库的重要性 Python是一种高级编程语言&#xff0c;被广泛用于应用程序开发、网络编程、数据科学和人工智能开发等领域。而在Python编程中&#xff0c;单词库(或词典)的重要性不言而喻。单词库就是存放Python程序中经常使用的关键字、方法名、函数名等词汇的地方。本…

SpringBoot --- 实用篇

一、热部署 1.1、概念 什么是热部署&#xff1f;简单说就是你程序改了&#xff0c;现在要重新启动服务器&#xff0c;嫌麻烦&#xff1f;不用重启&#xff0c;服务器会自己悄悄的把更新后的程序给重新加载一遍&#xff0c;这就是热部署。 ​ 热部署的功能是如何实现的呢&…

谷歌浏览器被2345劫持

方法1&#xff1a; 打开控制面板的卸载程序&#xff0c;搜索2345&#xff0c;把那个恶心的“安全组件-2345”卸载掉&#xff01;&#xff01; 这个方法比修改 host 以及注册表要好使地多&#xff01; 参考网址&#xff1a; 【小技巧】修复chrome被2345劫持 方法2&#xff1a; …

Alma Linux 9.2、Rocky Linux 9.2现在是RHEL 9.2的替代品

随着Red Hat Enterprise Linux (RHEL) 9.2的发布&#xff0c;Alma Linux 9.2和Rocky Linux 9.2成为了RHEL 9.2的备选替代品。这两个Linux发行版旨在提供与RHEL兼容的功能和稳定性&#xff0c;以满足那些需要企业级操作系统的用户需求。本文将详细介绍Alma Linux 9.2和Rocky Lin…

nginx反向代理缓存

背景 nginx 一般用来做反向代理和负载均衡&#xff0c;将客户端请求发送到后端的 jetty&#xff0c;并将 jetty 的响应发送给客户端。后端的 jetty 通常不止一个&#xff0c;nginx 根据配置来选择其中一个 jetty&#xff0c;比较常见的选择策略是轮询。示意图如下 启动缓存支…

oracle19c介绍和安装

目录 一、版本 &#xff08;1&#xff09;历史 &#xff08;2&#xff09;11g和12c管理方式区别 11g 12C &#xff08;3&#xff09;各个版本对操作系统要求 二、分类 &#xff08;1&#xff09;分为桌面类和服务器类 &#xff08;2&#xff09;分为企业版和标准版 三…

基于遗传算法的BP神经网络优化算法(matlab实现)

1 理论基础 1.1 BP神经网络概述 BP网络是一类多层的前馈神经网络。它的名字源于在网络训练的过程中&#xff0c;调整网络的权值的算法是误差的反向传播的学习算法&#xff0c;即为BP学习算法。BP算法是Rumelhart等人在1986年提出来的。由于它的结构简单&#xff0c;可调整的…

个人网站实现微信扫码登录

⭐个人网站实现微信扫码登录 &#x1f948;效果图 [外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-kzSrNgiv-1685034480658)(https://img.ggball.top/picGo/动画.gif)] &#x1f4d7;开发背景 为什么想用微信扫码登录呢&#xff1f; 起因是自己开发…

【CH32】| 02——常用外设 | GPIO

系列文章目录 【CH32】| 00——开发环境搭建 【CH32】| 01——新建工程 | 下载 | 运行 |调试 【CH32】| 02——常用外设 | GPIO 失败了也挺可爱&#xff0c;成功了就超帅。 文章目录 前言1. GPIO简介2. IO口的内部结构框图保护二极管上下拉电阻施密特触发器两个MOS管输出寄存器…

chatgpt赋能python:Python加速循环的执行方法详解

Python 加速循环的执行方法详解 Python是一门非常流行的编程语言&#xff0c;它可以在很多领域应用&#xff0c;比如Web开发、数据分析、机器学习等等。然而&#xff0c;Python执行速度较慢&#xff0c;特别是在循环语句中&#xff0c;代码执行效率会大打折扣。在本文中&#…

【基于ROS Melodic环境安装rosserial arduino】

【基于ROS Melodic环境安装rosserial arduino】 1. 简介2. 安装2.1 Ubuntu下的Arduino IDE安装2.2 Ubuntu下rosserial arduino软件安装2.3 安装ros_lib到Arduino IDE开发环境 3. 将ros_lib配置到 Arduino 环境库中4. 使用helloword5. 实验验证6.总结 1. 简介 这个教程展示如何…

Linux系统初始化命令的备忘单,Linux运维工程师收藏!

在管理和维护Linux系统时&#xff0c;有一些常用的命令可以帮助您进行系统初始化和配置。这些命令涵盖了各种任务&#xff0c;包括系统设置、用户管理、软件安装和网络配置等。 本文将为您提供一个Linux系统初始化命令的备忘单&#xff0c;以便在需要时方便查阅和使用。 系统设…

chatgpt赋能python:Python动画制作指南:从入门到精通

Python动画制作指南&#xff1a;从入门到精通 Python作为一种易学易用的编程语言&#xff0c;在数据分析、机器学习等领域已经得到广泛应用。但是你知道吗&#xff1f;Python还可以用来制作动画&#xff01;本文将为你介绍如何用Python制作动画&#xff0c;从入门到精通&#…

chatgpt赋能python:Python写计算器:从入门到精通

Python写计算器&#xff1a;从入门到精通 简介 计算器无疑是计算机编程中最基本且实用的工具之一。Python 作为一门易于学习且功能强大的编程语言&#xff0c;能够轻松实现计算器的功能。在本文中&#xff0c;我们将介绍如何使用 Python 编写一个简单的计算器。 如何实现&am…

chatgpt赋能python:Python的封装:提高代码的可维护性和可复用性

Python的封装&#xff1a;提高代码的可维护性和可复用性 在软件开发领域中&#xff0c;封装是一种重要的概念。它被用于隐藏程序的实现细节&#xff0c;使得程序的功能变得更加易于使用和维护。在Python编程中&#xff0c;封装是一种被广泛使用的技术&#xff0c;可以帮助你提…