Redis Reactor事件驱动模型源码

news2025/1/12 5:56:47

  前置学习:Redis server启动源码-CSDN博客

 Redis采用单线程Reactor模型

三个关键角色,即 reactor、acceptor、handler

三类处理事件,即连接事件、写事件、读事件。

建立连接(Acceptor)、监听accept、read、write事件(Reactor)、处理事件(Handler)

概念性的东西,实际简单一些,看源码简单些,定义事件、事件监听、事件分发处理。

注册事件

aeCreateEventLoop

初始化EventLoop,添加文件事件源、已就绪文件事件源、epoll事件源。

注册文件事件源、已就绪文件事件源
1、定义文件事件结构和已就绪事件结构
/* File event structure
 *
 * 文件事件结构
 */
typedef struct aeFileEvent {

    // 监听事件类型掩码,
    // 值可以是 AE_READABLE 或 AE_WRITABLE ,
    // 或者 AE_READABLE | AE_WRITABLE
    int mask; /* one of AE_(READABLE|WRITABLE) */

    // 读事件处理器
    aeFileProc *rfileProc;

    // 写事件处理器
    aeFileProc *wfileProc;

    // 多路复用库的私有数据
    void *clientData;

} aeFileEvent;
/* A fired event
 *
 * 已就绪事件
 */
typedef struct aeFiredEvent {

    // 已就绪文件描述符
    int fd;

    // 事件类型掩码,
    // 值可以是 AE_READABLE 或 AE_WRITABLE
    // 或者是两者的或
    int mask;

} aeFiredEvent;
2、初始化事件源(EventLoop),并将文件事件结构和已就绪事件结构加入事件源中。
/* State of an event based program 
 *
 * 事件处理器的状态
 */
typedef struct aeEventLoop {

    // 目前已注册的最大描述符
    int maxfd;   /* highest file descriptor currently registered */

    // 目前已追踪的最大描述符
    int setsize; /* max number of file descriptors tracked */

    // 用于生成时间事件 id
    long long timeEventNextId;

    // 最后一次执行时间事件的时间
    time_t lastTime;     /* Used to detect system clock skew */

    // 已注册的文件事件
    aeFileEvent *events; /* Registered events */

    // 已就绪的文件事件
    aeFiredEvent *fired; /* Fired events */

    // 时间事件
    aeTimeEvent *timeEventHead;

    // 事件处理器的开关
    int stop;

    // 多路复用库的私有数据
    void *apidata; /* This is used for polling API specific data */

    // 在处理事件前要执行的函数
    aeBeforeSleepProc *beforesleep;

} aeEventLoop;

/*
 * 初始化事件处理器状态
 */
aeEventLoop *aeCreateEventLoop(int setsize) {
    aeEventLoop *eventLoop;
    int i;aeMain

    // 创建事件状态结构
    if ((eventLoop = zmalloc(sizeof(*eventLoop))) == NULL) goto err;

    // 初始化文件事件结构和已就绪文件事件结构数组
    eventLoop->events = zmalloc(sizeof(aeFileEvent)*setsize);
    eventLoop->fired = zmalloc(sizeof(aeFiredEvent)*setsize);
    if (eventLoop->events == NULL || eventLoop->fired == NULL) goto err;
    // 设置数组大小
    eventLoop->setsize = setsize;
    // 初始化执行最近一次执行时间
    eventLoop->lastTime = time(NULL);

    // 初始化时间事件结构
    eventLoop->timeEventHead = NULL;
    eventLoop->timeEventNextId = 0;

    eventLoop->stop = 0;
    eventLoop->maxfd = -1;
    eventLoop->beforesleep = NULL;
    if (aeApiCreate(eventLoop) == -1) goto err;

    /* Events with mask == AE_NONE are not set. So let's initialize the
     * vector with it. */
    // 初始化监听事件
    for (i = 0; i < setsize; i++)
        eventLoop->events[i].mask = AE_NONE;

    // 返回事件循环
    return eventLoop;

err:
    if (eventLoop) {
        zfree(eventLoop->events);
        zfree(eventLoop->fired);
        zfree(eventLoop);
    }
    return NULL;
}
 注册Epoll事件
 1、定义Epoll事件结构体
/*
 * 事件状态
 */
typedef struct aeApiState {

    // epoll_event 实例描述符
    int epfd;

    // 事件槽
    struct epoll_event *events;

} aeApiState;
2、添加epoll事件源到EventLoop中
/*
 * 创建一个新的 epoll 实例,并将它赋值给 eventLoop
 */
static int aeApiCreate(aeEventLoop *eventLoop) {

    aeApiState *state = zmalloc(sizeof(aeApiState));

    if (!state) return -1;

    // 初始化事件槽空间
    state->events = zmalloc(sizeof(struct epoll_event)*eventLoop->setsize);
    if (!state->events) {
        zfree(state);
        return -1;
    }

    // 创建 epoll 实例
    state->epfd = epoll_create(1024); /* 1024 is just a hint for the kernel */
    if (state->epfd == -1) {
        zfree(state->events);
        zfree(state);
        return -1;
    }

    // 赋值给 eventLoop
    eventLoop->apidata = state;
    return 0;
}

aeCreateFileEvent

初始化事件回调,往Eventloop里面添加读事件的处理回调,写事件的处理回调,Epoll事件的处理回调。

/*
 * 根据 mask 参数的值,监听 fd 文件的状态,
 * 当 fd 可用时,执行 proc 函数
 */
int aeCreateFileEvent(aeEventLoop *eventLoop, int fd, int mask,
        aeFileProc *proc, void *clientData)
{
    if (fd >= eventLoop->setsize) {
        errno = ERANGE;
        return AE_ERR;
    }

    if (fd >= eventLoop->setsize) return AE_ERR;

    // 取出文件事件结构
    aeFileEvent *fe = &eventLoop->events[fd];

    // 监听指定 fd 的指定事件
    if (aeApiAddEvent(eventLoop, fd, mask) == -1)
        return AE_ERR;

    // 设置文件事件类型,以及事件的处理器
    fe->mask |= mask;
    if (mask & AE_READABLE) fe->rfileProc = proc;
    if (mask & AE_WRITABLE) fe->wfileProc = proc;

    // 私有数据
    fe->clientData = clientData;

    // 如果有需要,更新事件处理器的最大 fd
    if (fd > eventLoop->maxfd)
        eventLoop->maxfd = fd;

    return AE_OK;
}

static int aeApiAddEvent(aeEventLoop *eventLoop, int fd, int mask) {
    aeApiState *state = eventLoop->apidata;
    struct epoll_event ee;

    /* If the fd was already monitored for some event, we need a MOD
     * operation. Otherwise we need an ADD operation. 
     *
     * 如果 fd 没有关联任何事件,那么这是一个 ADD 操作。
     *
     * 如果已经关联了某个/某些事件,那么这是一个 MOD 操作。
     */
    int op = eventLoop->events[fd].mask == AE_NONE ?
            EPOLL_CTL_ADD : EPOLL_CTL_MOD;

    // 注册事件到 epoll
    ee.events = 0;
    mask |= eventLoop->events[fd].mask; /* Merge old events */
    if (mask & AE_READABLE) ee.events |= EPOLLIN;
    if (mask & AE_WRITABLE) ee.events |= EPOLLOUT;
    ee.data.u64 = 0; /* avoid valgrind warning */
    ee.data.fd = fd;

    if (epoll_ctl(state->epfd,op,fd,&ee) == -1) return -1;

    return 0;
}

aeCreateTimeEvent 

1、定义时间事件结构
/* Time event structure
 *
 * 时间事件结构
 */
typedef struct aeTimeEvent {

    // 时间事件的唯一标识符
    long long id; /* time event identifier. */

    // 事件的到达时间
    long when_sec; /* seconds */
    long when_ms; /* milliseconds */

    // 事件处理函数
    aeTimeProc *timeProc;

    // 事件释放函数
    aeEventFinalizerProc *finalizerProc;

    // 多路复用库的私有数据
    void *clientData;

    // 指向下个时间事件结构,形成链表
    struct aeTimeEvent *next;

} aeTimeEvent;
2、往EventLoop添加事件源回调处理方法。 

/*
 * 创建时间事件
 */
long long aeCreateTimeEvent(aeEventLoop *eventLoop, long long milliseconds,
        aeTimeProc *proc, void *clientData,
        aeEventFinalizerProc *finalizerProc)
{
    // 更新时间计数器
    long long id = eventLoop->timeEventNextId++;

    // 创建时间事件结构
    aeTimeEvent *te;

    te = zmalloc(sizeof(*te));
    if (te == NULL) return AE_ERR;

    // 设置 ID
    te->id = id;

    // 设定处理事件的时间
    aeAddMillisecondsToNow(milliseconds,&te->when_sec,&te->when_ms);
    // 设置事件处理器
    te->timeProc = proc;
    te->finalizerProc = finalizerProc;
    // 设置私有数据
    te->clientData = clientData;

    // 将新事件放入表头
    te->next = eventLoop->timeEventHead;
    eventLoop->timeEventHead = te;

    return id;
}

事件分发

执行EventLoop事件源里面的所有事件回调

/* Process every pending time event, then every pending file event
 * (that may be registered by time event callbacks just processed).
 *
 * 处理所有已到达的时间事件,以及所有已就绪的文件事件。
 *
 * Without special flags the function sleeps until some file event
 * fires, or when the next time event occurs (if any).
 *
 * 如果不传入特殊 flags 的话,那么函数睡眠直到文件事件就绪,
 * 或者下个时间事件到达(如果有的话)。
 *
 * If flags is 0, the function does nothing and returns.
 * 如果 flags 为 0 ,那么函数不作动作,直接返回。
 *
 * if flags has AE_ALL_EVENTS set, all the kind of events are processed.
 * 如果 flags 包含 AE_ALL_EVENTS ,所有类型的事件都会被处理。
 *
 * if flags has AE_FILE_EVENTS set, file events are processed.
 * 如果 flags 包含 AE_FILE_EVENTS ,那么处理文件事件。
 *
 * if flags has AE_TIME_EVENTS set, time events are processed.
 * 如果 flags 包含 AE_TIME_EVENTS ,那么处理时间事件。
 *
 * if flags has AE_DONT_WAIT set the function returns ASAP until all
 * the events that's possible to process without to wait are processed.
 * 如果 flags 包含 AE_DONT_WAIT ,
 * 那么函数在处理完所有不许阻塞的事件之后,即刻返回。
 *
 * The function returns the number of events processed. 
 * 函数的返回值为已处理事件的数量
 */
int aeProcessEvents(aeEventLoop *eventLoop, int flags)
{
    int processed = 0, numevents;

    /* Nothing to do? return ASAP */
    if (!(flags & AE_TIME_EVENTS) && !(flags & AE_FILE_EVENTS)) return 0;

    /* Note that we want call select() even if there are no
     * file events to process as long as we want to process time
     * events, in order to sleep until the next time event is ready
     * to fire. */
    if (eventLoop->maxfd != -1 ||
        ((flags & AE_TIME_EVENTS) && !(flags & AE_DONT_WAIT))) {
        int j;
        aeTimeEvent *shortest = NULL;
        struct timeval tv, *tvp;

        // 获取最近的时间事件
        if (flags & AE_TIME_EVENTS && !(flags & AE_DONT_WAIT))
            shortest = aeSearchNearestTimer(eventLoop);
        if (shortest) {
            // 如果时间事件存在的话
            // 那么根据最近可执行时间事件和现在时间的时间差来决定文件事件的阻塞时间
            long now_sec, now_ms;

            /* Calculate the time missing for the nearest
             * timer to fire. */
            // 计算距今最近的时间事件还要多久才能达到
            // 并将该时间距保存在 tv 结构中
            aeGetTime(&now_sec, &now_ms);
            tvp = &tv;
            tvp->tv_sec = shortest->when_sec - now_sec;
            if (shortest->when_ms < now_ms) {
                tvp->tv_usec = ((shortest->when_ms+1000) - now_ms)*1000;
                tvp->tv_sec --;
            } else {
                tvp->tv_usec = (shortest->when_ms - now_ms)*1000;
            }

            // 时间差小于 0 ,说明事件已经可以执行了,将秒和毫秒设为 0 (不阻塞)
            if (tvp->tv_sec < 0) tvp->tv_sec = 0;
            if (tvp->tv_usec < 0) tvp->tv_usec = 0;
        } else {
            
            // 执行到这一步,说明没有时间事件
            // 那么根据 AE_DONT_WAIT 是否设置来决定是否阻塞,以及阻塞的时间长度

            /* If we have to check for events but need to return
             * ASAP because of AE_DONT_WAIT we need to set the timeout
             * to zero */
            if (flags & AE_DONT_WAIT) {
                // 设置文件事件不阻塞
                tv.tv_sec = tv.tv_usec = 0;
                tvp = &tv;
            } else {
                /* Otherwise we can block */
                // 文件事件可以阻塞直到有事件到达为止
                tvp = NULL; /* wait forever */
            }
        }

        // 处理文件事件,阻塞时间由 tvp 决定
        numevents = aeApiPoll(eventLoop, tvp);
        for (j = 0; j < numevents; j++) {
            // 从已就绪数组中获取事件
            aeFileEvent *fe = &eventLoop->events[eventLoop->fired[j].fd];

            int mask = eventLoop->fired[j].mask;
            int fd = eventLoop->fired[j].fd;
            int rfired = 0;

           /* note the fe->mask & mask & ... code: maybe an already processed
             * event removed an element that fired and we still didn't
             * processed, so we check if the event is still valid. */
            // 读事件
            if (fe->mask & mask & AE_READABLE) {
                // rfired 确保读/写事件只能执行其中一个
                rfired = 1;
                fe->rfileProc(eventLoop,fd,fe->clientData,mask);
            }
            // 写事件
            if (fe->mask & mask & AE_WRITABLE) {
                if (!rfired || fe->wfileProc != fe->rfileProc)
                    fe->wfileProc(eventLoop,fd,fe->clientData,mask);
            }

            processed++;
        }
    }

    /* Check time events */
    // 执行时间事件
    if (flags & AE_TIME_EVENTS)
        processed += processTimeEvents(eventLoop);

    return processed; /* return the number of processed file/time events */
}

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/1294895.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

使用高防IP防护有哪些优势

高防IP是针对互联网服务器在遭受大流量的DDoS攻击后导致服务不可用的情况下&#xff0c;推出的付费增值服务&#xff0c;用户可以通过配置高防IP&#xff0c;将攻击流量引流到高防IP&#xff0c;确保源站的稳定可靠。高防IP相当于搭建完转发的服务器。 高防IP有两种接入方式&a…

《Easy3d+Qt+VTK》学习

《Easy3dQtVTK》学习-1、编译与配置 一、编译二、配置注 一、编译 1、 资源下载&#xff1a;easy3d giuhub 2、解压缩 3、用qt打开CMakeLists.txt即可 4、点击项目&#xff0c;选择debug或者release&#xff0c;图中3处可自行选择&#xff0c;因为我的qt版本是6&#xff0c…

unity 2d 入门 飞翔小鸟 小鸟跳跃 碰撞停止挥动翅膀动画(十)

1、切换到动画器 点击make transition和exit关联起来 2、设置参数 勾选掉Has Exit Time 3、脚本给动画器传参 using System.Collections; using System.Collections.Generic; using UnityEngine;public class Fly : MonoBehaviour {//获取小鸟&#xff08;刚体&#xff09;p…

云原生系列1

1、虚拟机集群环境准备 VirtualBox类似vmware的虚拟化软件&#xff0c;去官网https://www.virtualbox.org/下载最新版本免费的&#xff0c;VirtualBox中鼠标右ctrl加home跳出鼠标到wins中。 VirtualBox安装步骤 https://blog.csdn.net/rfc2544/article/details/131338906 cent…

【广州华锐互动】VR煤矿生产事故体验系统为矿工提供一个身临其境的安全实训环境

随着科技的不断发展&#xff0c;虚拟现实&#xff08;VR&#xff09;技术已经逐渐走进我们的生活&#xff0c;为我们带来了前所未有的沉浸式体验。在许多领域&#xff0c;如教育、医疗、娱乐等&#xff0c;VR技术都发挥着重要作用。然而&#xff0c;当这项技术被用于模拟煤矿坍…

angular状态管理方案(ngrx)

完全基于redux的ngrx方案&#xff0c;我们看看在angular中如何实现。通过一个简单的计数器例子梳理下整个流程 一 安装 &#xff1a;npm i ngrx/store 这里特别要注意一点&#xff1a;安装 ngrx/store的时候会出现和angular版本不一致的问题 所以检查一下angular/core的版本…

东北地理所最新Nature通讯文章

作为城市的重要组织部分&#xff0c;城市湿地在水源供给、增湿降温、雨洪调蓄等多个方面发挥着极其重要的作用&#xff0c;2024年国际湿地日主题定为“湿地与人类福祉”。在此背景下&#xff0c;中国科学院东北地理与农业生态研究所毛德华研究员等在12月5日出版的Nature发表题为…

CentOS系统装机流程

目录 1、进入装机页面 2、配置分区 3、设置语言 4、软件安装&#xff08;我这里选的是最小化安装&#xff0c;一般情况下应该选Server&#xff09; 5、时区配置 ​编辑 6、Root登录密码 7、开始装机&#xff0c;重启后装机完成 1、进入装机页面 2、配置分区 3、设置语言…

Java编程中通用的正则表达式(一)

正则表达式&#xff08;Regular Expression&#xff0c;简称RegEx&#xff09;&#xff0c;又称常规表示法、正则表示、正规表示式、规则表达式、常式、表达式等&#xff0c;是计算机科学中的一个概念。正则表达式是用于描述某种特定模式的字符序列&#xff0c;特别是用来匹配、…

【技巧】RAR压缩文件如何解压?

RAR是一种文件压缩与归档的专利文件格式&#xff0c;很多时候在工作中都会使用到。既然是压缩格式&#xff0c;我们就需要解压才能得到里面的文件&#xff0c;对于电脑小白来说&#xff0c;可能不知道如何解压RAR文件&#xff0c;下面小编来分享一下。 解压压缩文件&#xff0…

57、postgresql 查询流程

在这里&#xff0c;我们简要概述了查询必须经过的阶段才能获得结果。 必须建立从应用程序到 PostgreSQL 服务器的连接。应用程序将查询传输到服务器&#xff0c;并等待接收服务器发回的结果。 解析器阶段检查应用程序传输的查询语法是否正确&#xff0c;并创建查询树。 重写系…

数字化和数智化一字之差,究竟有何异同点?

在2023杭州云栖大会的一展台内&#xff0c;桌子上放着一颗番茄和一个蛋糕&#xff0c;一旁的机器人手臂融入“通义千问”大模型技术后&#xff0c;变得会“思考”&#xff1a;不仅能描述“看”到了什么&#xff0c;还能确认抓取的是番茄而不是蛋糕。 “传统的机械臂通常都只能基…

Python-滑雪大冒险【附源码】

滑雪大冒险 《滑雪大冒险》是一款充满趣味性和挑战性的休闲竞技游戏&#xff0c;在游戏中&#xff0c;玩家将扮演一位勇敢的滑雪者&#xff0c;在雪山上展示他们的滑雪技巧&#xff0c;游戏采用2D图形界面&#xff0c;以第三人称视角呈现 运行效果&#xff1a;用方向键及方向键…

自定义按钮组/buttonGroup

样式结果 定义html <view class"button-group"><button :class"{ active: selectedButton index }"v-for"(button, index) in buttons" :key"index" click"handleButtonClick(index)">{{ button }}</button…

5G基站行业节能降耗 解决方案

截至2023年10月&#xff0c;我国5G基站总数达321.5万个&#xff0c;占全国通信基站总数的28.1%。然而&#xff0c;随着5G基站数量的快速增长&#xff0c;基站的能耗问题也逐渐日益凸显&#xff0c;基站的用电给运营商带来了巨大的电费开支压力&#xff0c;降低5G基站的能耗成为…

[Linux] LAMP架构

一、LAMP架构架构的概述 LAMP 架构是一种流行的 Web 应用程序架构&#xff0c;它的名称是由四个主要组件的首字母组成的&#xff1a; Linux&#xff08;操作系统&#xff09;&#xff1a; 作为操作系统&#xff0c;Linux 提供了服务器的基础。它负责处理硬件资源、文件系统管理…

【JNPF】好用、高性价比的低代码开发平台

目录 1.JNPF介绍 突出优势 2.JNPF的开放性与扩展性 平台的开放性&#xff1a; 平台高拓展性 在快速发展的软件开发领域&#xff0c;低代码平台已经成为了一种重要的开发方法&#xff0c;它使非专业开发人员也能够参与到软件开发中去&#xff0c;大大加速了软件开发的效率。…

积德无需人见,行善自有天知

人的一生中&#xff0c;多多少少都会做一些善事。有人是出于善心&#xff0c;有人是出于怜悯&#xff1b;有人是满足自己的虚荣心&#xff0c;有人是做给旁人看&#xff1b;有人是一时兴起&#xff0c;有人能坚持一辈子&#xff1b;有人捐助周围需要帮助的人&#xff0c;有人建…

python3(超详细-保姆级教程)

第一章&#xff1a; Python 是一个高层次的结合了解释性、编译性、互动性和面向对象的脚本语言。 Python 的设计具有很强的可读性&#xff0c;相比其他语言经常使用英文关键字&#xff0c;其他语言的一些标点符号&#xff0c;它具有比其他语言更有特色语法结构。 Python 是一…

【flink番外篇】1、flink的23种常用算子介绍及详细示例(1)- map、flatmap和filter

Flink 系列文章 一、Flink 专栏 Flink 专栏系统介绍某一知识点&#xff0c;并辅以具体的示例进行说明。 1、Flink 部署系列 本部分介绍Flink的部署、配置相关基础内容。 2、Flink基础系列 本部分介绍Flink 的基础部分&#xff0c;比如术语、架构、编程模型、编程指南、基本的…