Redis网络相关的结构体 和 reactor模式

news2024/7/6 17:45:07

目录

1. epoll的封装

结构体aeApiStae

 创建epoll fd的封装

epoll_ctl的封装

epoll_wait的封装

2. 结构体aeFileEvent、aeFiredEvent、aeTimeEvent

结构体aeFileEvent

结构体aeFiredEvent 

结构体aeTimeEvent

3. struct aeEventLoop

 aeEventLoop相关的函数

1. 创建eventloop

2. 创建aeFileEvent

3. 删除aeFileEvent

4. 开始事件循环

4.什么时候设置回调函数的?

1. 绑定SleepProc

2. 绑定服务端的读事件回调函数

3. 绑定客户端的读事件回调函数

4. 绑定客户端的写事件回调函数

5.绑定时间事件

5. 有了IO多路复用,为什么还需要reactor模式?

IO多路复用与事件驱动

IO同步与异步的判断

reactor模式的优点

6. 通过Redis网络部分,学到如何实现reactor模式


上一章节讲解了Redis的网络交互流程,没有对关于网络部分的结构体有具体的讲解,所以该文章就主要讲解Redis网络部分使用的结构体。

1. epoll的封装

epoll有三个函数调用:

  • epoll_create:创建epoll实例,返回一个epoll fd,即是用于管理待检测的文件描述符的集合。
  • epoll_ctl:管理红黑树实例上的节点,可以进行添加、修改、删除操作。
  • epoll_wait(int epfd, struct epoll_event * events, int maxevents, int timeout)。等待就绪的fd,并返回就绪的fd,存储在events中,以及个数。

结构体aeApiStae

  • 其内部有变量epfd,即是调用epoll_create创建出来fd
  • *events即是就绪的fd存储的位置(数组)
typedef struct aeApiState {
    int epfd;
    struct epoll_event *events;
} aeApiState;

 创建epoll fd的封装

主要流程:

  1. 申请内存空间给结构体aeApieState,申请内存空间给成员变量events。
  2. 使用epoll_create创建出epfd
//现在还没了解到aeEventLoop,可以先不用管aeEventLoop相关的,不影响看代码。
static int aeApiCreate(aeEventLoop *eventLoop) {
    aeApiState *state = zmalloc(sizeof(aeApiState));

    if (!state) return -1;
    state->events = zmalloc(sizeof(struct epoll_event)*eventLoop->setsize);
    if (!state->events) {
        zfree(state);
        return -1;
    }
    state->epfd = epoll_create(1024); /* 1024 is just a hint for the kernel */
    if (state->epfd == -1) {
        zfree(state->events);
        zfree(state);
        return -1;
    }
    anetCloexec(state->epfd);
    eventLoop->apidata = state;
    return 0;
}

//重置aeApiState的events的大小,即是重置存放就绪fd的空间大小
static int aeApiResize(aeEventLoop *eventLoop, int setsize) {
    aeApiState *state = eventLoop->apidata;

    state->events = zrealloc(state->events, sizeof(struct epoll_event)*setsize);
    return 0;
}

//释放结构体aeApiState
static void aeApiFree(aeEventLoop *eventLoop) {
    aeApiState *state = eventLoop->apidata;

    close(state->epfd);
    zfree(state->events);
    zfree(state);
}

epoll_ctl的封装

主要是两个:添加和删除。

  • 添加的就是使用epoll_ctl(epfd,EPOLL_CTL_ADD ,....)
  • 删除时使用epoll_ctl(epfd,EPOLL_CTL_MOD,....)
static int aeApiAddEvent(aeEventLoop *eventLoop, int fd, int mask) {
    aeApiState *state = eventLoop->apidata;
    struct epoll_event ee = {0}; /* avoid valgrind warning */
    /* If the fd was already monitored for some event, we need a MOD
     * operation. Otherwise we need an ADD operation. */
    int op = eventLoop->events[fd].mask == AE_NONE ?
            EPOLL_CTL_ADD : EPOLL_CTL_MOD;

    ee.events = 0;
    //得到要关注的事件类型mask
    mask |= eventLoop->events[fd].mask; /* Merge old events */
    if (mask & AE_READABLE) ee.events |= EPOLLIN;
    if (mask & AE_WRITABLE) ee.events |= EPOLLOUT;
    ee.data.fd = fd;
    if (epoll_ctl(state->epfd,op,fd,&ee) == -1) return -1;
    return 0;
}

static void aeApiDelEvent(aeEventLoop *eventLoop, int fd, int delmask) {
    aeApiState *state = eventLoop->apidata;
    struct epoll_event ee = {0}; /* avoid valgrind warning */
    int mask = eventLoop->events[fd].mask & (~delmask);

    ee.events = 0;
    if (mask & AE_READABLE) ee.events |= EPOLLIN;
    if (mask & AE_WRITABLE) ee.events |= EPOLLOUT;
    ee.data.fd = fd;
    if (mask != AE_NONE) {
        epoll_ctl(state->epfd,EPOLL_CTL_MOD,fd,&ee);
    } else {
        /* Note, Kernel < 2.6.9 requires a non null event pointer even for
         * EPOLL_CTL_DEL. */
        epoll_ctl(state->epfd,EPOLL_CTL_DEL,fd,&ee);
    }
}

epoll_wait的封装

即是调用epoll_wait(...)。

static int aeApiPoll(aeEventLoop *eventLoop, struct timeval *tvp) {
    aeApiState *state = eventLoop->apidata;
    int retval, numevents = 0;
    
    //eventLoop->setsize就是state->evnets数组的元素个数
    retval = epoll_wait(state->epfd,state->events,eventLoop->setsize,
            tvp ? (tvp->tv_sec*1000 + (tvp->tv_usec + 999)/1000) : -1);
    if (retval > 0) {
        int j;

        numevents = retval;
        for (j = 0; j < numevents; j++) {    //遍历就绪的fd
            int mask = 0;    //mask就是该fd就绪的事件
            struct epoll_event *e = state->events+j;

            if (e->events & EPOLLIN) mask |= AE_READABLE;
            if (e->events & EPOLLOUT) mask |= AE_WRITABLE;
            if (e->events & EPOLLERR) mask |= AE_WRITABLE|AE_READABLE;
            if (e->events & EPOLLHUP) mask |= AE_WRITABLE|AE_READABLE;
            //把就绪的fd存储到 eventLoop->fired[j]中
            //可以先不用关注eventLoop,这里就是把就绪的fd存储到另一地方嘛
            eventLoop->fired[j].fd = e->data.fd;
            eventLoop->fired[j].mask = mask;
        }
    }
    return numevents;
}

2. 结构体aeFileEvent、aeFiredEvent、aeTimeEvent

结构体aeFileEvent

一个客户端建立连接后,会把该客户端的fd添加到epoll上,并监听其关注的事件。假如当前fd需要监听读事件,那该fd读就绪,那对应的读事件函数就会被调用。

所以,可以把fd关注的事件fd就绪会执行的回调函数封装成一个结构体,Redis叫其是aeFileEvent。

  • mask就是fd关注的事件类型
  • rfileProc是读就绪会执行的回调函数wfileProc就是写就绪会执行的回调函数。这两个都需要后续去定义和注册
  • clientData就是回调函数中的参数
/* File event structure */
typedef struct aeFileEvent {
    int mask; /* one of AE_(READABLE|WRITABLE|BARRIER) */
    aeFileProc *rfileProc;
    aeFileProc *wfileProc;
    void *clientData;
} aeFileEvent;

//回调函数的类型
typedef void aeFileProc(struct aeEventLoop *eventLoop, int fd, void *clientData, int mask);

//前面的形式可能不好看出,用c++11写的话,如下
//using aeFileProc=std::function<void(aeEventLoop* eventLoop,int fd,void* clientData,int mask)>;

 所以,客户端就会有自己的aeFileEvent,即是一个客户端就对应一个aeFileEvent。 当读或写就绪时,就会调用对应的函数

其实,服务端也是对应一个aeFileEvent,因为也要监听服务端的读事件。当客户端来连接,服务端读事件就绪,就会调用服务端的读事件回调函数进行accept。

所以就会有创建aeFileEvent,每个客户端或者一个服务端都会使用函数aeCreateFileEvent。(后续会详讲,先留个印象)

看到这里读者可能会有疑惑,不是说要对某个fd进行注册监听的吗,怎么该结构体没有fd的呢

那是因为aeFileEvent是存储在数组内。数组的下标就是该aeFileEvent对应的fd,也即是客户端对应的fd。(后续从代码中可以看出的)

结构体aeFiredEvent 

如名字一样,其就是就绪事件。一个就绪事件,就肯定是可以从中知道是哪个fd就绪,还有知道是哪种类型事件就绪。

  • fd表示epoll_wait返回的就绪fd
  • mask表示epol_wait返回的事件类型

epoll_wait返回的一个就绪fd就对应一个aeFireEvent。 

aeApiPoll就是把epoll_wait返回的就绪fd和事件类型 逐个赋值给aeFireEvent。

/* A fired event */
typedef struct aeFiredEvent {
    int fd;
    int mask;
} aeFiredEvent;


//把epoll_wait返回的就绪fd和事件赋值给fired[j]
static int aeApiPoll(aeEventLoop *eventLoop, struct timeval *tvp) {
    aeApiState *state = eventLoop->apidata;
    int retval, numevents = 0;
    
    //eventLoop->setsize就是state->evnets数组的元素个数
    retval = epoll_wait(state->epfd,state->events,eventLoop->setsize,
            tvp ? (tvp->tv_sec*1000 + (tvp->tv_usec + 999)/1000) : -1);
    if (retval > 0) {
        numevents = retval;
        for (int j = 0; j < numevents; j++) {    //遍历就绪的fd
            int mask = 0;    //mask就是该fd就绪的事件
            struct epoll_event *e = state->events+j;

            if (e->events & EPOLLIN) mask |= AE_READABLE;
            if (e->events & EPOLLOUT) mask |= AE_WRITABLE;
            if (e->events & EPOLLERR) mask |= AE_WRITABLE|AE_READABLE;
            if (e->events & EPOLLHUP) mask |= AE_WRITABLE|AE_READABLE;

            //把就绪的fd存储到 eventLoop->fired[j]中
            //可以先不用关注eventLoop,这里就是把就绪的fd存储到另一地方嘛
            eventLoop->fired[j].fd = e->data.fd;
            eventLoop->fired[j].mask = mask;
        }
    }
    return numevents;
}

结构体aeTimeEvent

其是时间事件,可以认为是个定时器,定时会执行给定的函数。

从代码可以看到其是个双向链表。 

  • 每个时间事件都有一个事件id,aeEventLoop中的timeEventNextId是下一个要注册的时间事件id。
  • when_sec、when_ms是时间事件(定时器任务)的发生时间
  • timeProc是时间事件的处理回调函数。说明:aeTimeProc 需要返回一个 int 值,代表下次该超时事件触发的时间间隔。如果返回 - 1,则说明超时时间不需要再触发了,标记为删除即可
  • finalizerProc是时间事件要删除时的处理函数
//时间事件结构
typedef struct aeTimeEvent {
    long long id; //时间事件的唯一标识符,自增

    //事件的到达时间(即是执行时间)
    long when_sec; /* seconds */
    long when_ms; /* milliseconds */

    //事件处理函数 (到期执行的回调函数)
    aeTimeProc *timeProc;

    //事件释放函数 (回调函数)
    aeEventFinalizerProc *finalizerProc;
    void *clientData;               //多路复用库的私有数据
    //双向链表
    struct aeTimeEvent *prev;   
    struct aeTimeEvent *next;
    int refcount;     //以防止计时器事件在递归时间事件调用中释放
} aeTimeEvent;

//时间事件回调函数类型
typedef int aeTimeProc(struct aeEventLoop *eventLoop, long long id, void *clientData);

//删除定时事件的回调函数类型
typedef void aeEventFinalizerProc(struct aeEventLoop *eventLoop, void *clientData);

3. struct aeEventLoop

EventLoop是个事件循环,其主要功能就是实现 while(1){ epoll_wait(....) }。所以说是个事件循环,这也对epoll再进一步封装。

aeEventLoop是Reactor模型的具体抽象,把网络读写事件时间事件(定时器任务)可以统一处理。

//ae.h
// 事件循环
typedef struct aeEventLoop {
    int maxfd;       //已经注册的文件描述符的最大值
    int setsize;     //setsize是能注册的fd的最大值(epoll_wait函数中的参数maxevents)
    long long timeEventNextId;    //下一个要注册的时间事件id

    time_t lastTime;      //最后一次执行时间事件的时间
    aeFileEvent *events;     //是数组,已注册的文件事件 (就是IO event)
    aeFiredEvent *fired;     //数组,已就绪的文件事件

    aeTimeEvent *timeEventHead;    //定时器链表的头结点(头结点不一定是最早超时的任务)

    int stop;    //eventLoop的开关
    
    void *apidata; //具体的IO多路复用实现,即是前面讲的结构体aeApiState变量。
    aeBeforeSleepProc *beforesleep;//在处理事件前要执行的回调函数(即是在执行epoll_wait()之前)
    aeBeforeSleepProc *aftersleep;//在处理事件后要执行的回调函数(即是在执行epoll_wait()之后)
    int flags;          //设置的标识位
} aeEventLoop;

//进入循环等待之前的回调函数类型
typedef void aeBeforeSleepProc(struct aeEventLoop *eventLoop);

从上面的注释知道 aeEventLoop的成员变量的用处。所以这里就只重点讲解两个变量*events 和 *fired

  • *event就是个数组,setsize是数组的元素个数。events的元素是aeFileEvent。前面说了一个客户端对应一个aeFileEvent。所以,events就是存储需要被监听的客户端。events数组下标是aeFileEvent中的fd。
  • fired数组是读写已就绪的 网络事件 数组。数组元素是记录了就绪的fd及其epoll_wait返回的事件类型

 aeEventLoop相关的函数

1. 创建eventloop

aeEventLoop *aeCreateEventLoop(int setsize) {
    aeEventLoop *eventLoop;
    int i;
    //分配内存给eventLoop
    eventLoop = zmalloc(sizeof(*eventLoop))
    //分配内存给eventLoop中的events 和 fired 数组
    eventLoop->events = zmalloc(sizeof(aeFileEvent)*setsize);
    eventLoop->fired = zmalloc(sizeof(aeFiredEvent)*setsize);

    //设置其一些成员变量
    eventLoop->setsize = setsize;
    eventLoop->timeEventHead = NULL;
    eventLoop->timeEventNextId = 0;
    eventLoop->stop = 0;
    eventLoop->maxfd = -1;
    eventLoop->beforesleep = NULL;
    eventLoop->aftersleep = NULL;
    eventLoop->flags = 0;
    //创建epoll fd,即是调用epoll_create, 内部把创建出来的apiState赋值给eventLoop->apidata
    aeApiCreate(eventLoop)

    //当前每个aeFileEvent都不关注任何事件
    for (i = 0; i < setsize; i++)
        eventLoop->events[i].mask = AE_NONE;
    return eventLoop;
    //省略了些内存申请失败的处理................
}

2. 创建aeFileEvent

内部会调用aeApiAddEvent

int aeCreateFileEvent(aeEventLoop *eventLoop, int fd, int mask,
        aeFileProc *proc, void *clientData)
{
    if (fd >= eventLoop->setsize) {    //判断客户端fd是否大于events数组元素个数
        errno = ERANGE;
        return AE_ERR;        //若是大于,返回错误
    }
    //从events数组中获取一个元素
    aeFileEvent *fe = &eventLoop->events[fd]; //从这可以看出events的下标就是客户端的fd

    if (aeApiAddEvent(eventLoop, fd, mask) == -1)    //添加fd到epoll上,并关注事件mask
        return AE_ERR;
    fe->mask |= mask;    //设置结构体aeFileEvent关注的事件
    //设置回调函数
    if (mask & AE_READABLE) fe->rfileProc = proc;
    if (mask & AE_WRITABLE) fe->wfileProc = proc;
    fe->clientData = clientData;    //设置回调函数的参数
    
    //更新已监听的fd的最大值
    if (fd > eventLoop->maxfd)
        eventLoop->maxfd = fd;
    return AE_OK;
}

//创建时间事件,即是定时器
long long aeCreateTimeEvent(aeEventLoop *eventLoop, long long milliseconds,
        aeTimeProc *proc, void *clientData,
        aeEventFinalizerProc *finalizerProc)
{
    long long id = eventLoop->timeEventNextId++;
    aeTimeEvent *te;

    te = zmalloc(sizeof(*te));
    if (te == NULL) return AE_ERR;
    te->id = id;
    te->when = getMonotonicUs() + milliseconds * 1000;
    //设置定时器的回调函数
    te->timeProc = proc;
    te->finalizerProc = finalizerProc;
    te->clientData = clientData;    //设置回调函数的参数
    te->prev = NULL;
    te->next = eventLoop->timeEventHead;    //从这句代码可以看出其是往头部插入的
    te->refcount = 0;
    if (te->next)
        te->next->prev = te;
    eventLoop->timeEventHead = te;    //更新链表头部为新插入的节点
    return id;
}

3. 删除aeFileEvent

通过传入的fd找到evnets数组中的元素,之后调用aeApiDelEvent进行删除。之后吧该位置的

void aeDeleteFileEvent(aeEventLoop *eventLoop, int fd, int mask)
{
    if (fd >= eventLoop->setsize) return;
    //通过传入的fd找到events数组中的元素
    aeFileEvent *fe = &eventLoop->events[fd];
    if (fe->mask == AE_NONE) return;

    /* We want to always remove AE_BARRIER if set when AE_WRITABLE
     * is removed. */
    if (mask & AE_WRITABLE) mask |= AE_BARRIER;

    aeApiDelEvent(eventLoop, fd, mask);    //进行删除
    fe->mask = fe->mask & (~mask);    //取消监听mask事件
    if (fd == eventLoop->maxfd && fe->mask == AE_NONE) {
        /* Update the max fd */    
        int j;
        for (j = eventLoop->maxfd-1; j >= 0; j--)
            if (eventLoop->events[j].mask != AE_NONE) break;
        eventLoop->maxfd = j;    //更新已注册的fd的最大值
    }
}

4. 开始事件循环

aeMain就是一个while()循环,循环内部是aeProcessEvents函数。

void aeMain(aeEventLoop *eventLoop) {
    eventLoop->stop = 0;
    while (!eventLoop->stop) {
        aeProcessEvents(eventLoop, AE_ALL_EVENTS|
                                   AE_CALL_BEFORE_SLEEP|
                                   AE_CALL_AFTER_SLEEP);
    }
}

aeProcessEvents的主要步骤:

回调函数beforsleep和aftersleep可以先省略不关注的。因为目前其还没有用途,先知道有这两个回调函数就行。

  • 计算epoll_wait()需要的阻塞时间。
  • 执行beforsleep
  • epoll_wait等待事件发生。
  • 处理发生的IO事件,根据发生的事件类型来调用对应的回调函数:
    • 若是AE_READABLE类型调用rfileProc
    • 若是AE_WRITABLE类型调用wfileProc
  • 处理时间事件。若该时间事件是周期性的,执行完后会再添加到时间事件链表的。
//为了能更好理解读懂,该函数只展示了主体流程,有些细节是没有展示
int aeProcessEvents(aeEventLoop *eventLoop, int flags)
{
    int processed = 0, numevents;
    if (eventLoop->maxfd != -1 || ((flags & AE_TIME_EVENTS) && !(flags & AE_DONT_WAIT))){
        struct timeval tv, *tvp;
        //省略了epoll_wait函数参数timeout的计算过程,其不是框架的重点,后续可以再详细了解
        ...............................
        
        //执行befroesleep回调函数,这是在初始化server时绑定的,可以先不关注
        if (eventLoop->beforesleep != NULL && flags & AE_CALL_BEFORE_SLEEP)
            eventLoop->beforesleep(eventLoop);

        //即是调用epoll_wait, 等待就绪的fd
        numevents = aeApiPoll(eventLoop, tvp);
        ...................
        //遍历执行已就绪的fd的回调函数
        for (int j = 0; j < numevents; j++) {
            //在aeApiPoll中把就绪的fd和事件赋值给了fired[j]
            aeFileEvent *fe = &eventLoop->events[eventLoop->fired[j].fd];
            //得到就绪的fd和事件类型
            int mask = eventLoop->fired[j].mask;
            int fd = eventLoop->fired[j].fd;
            int fired = 0; /* Number of events fired for current fd. */
            
            //下面就执行对应的回调函数
            if (!invert && fe->mask & mask & AE_READABLE) {  //读就绪,执行读回调函数
                fe->rfileProc(eventLoop,fd,fe->clientData,mask);
                fired++;
                fe = &eventLoop->events[fd]; /* Refresh in case of resize. */
            }

            if (fe->mask & mask & AE_WRITABLE) {    //写就绪,执行写回调函数
                if (!fired || fe->wfileProc != fe->rfileProc) {
                    fe->wfileProc(eventLoop,fd,fe->clientData,mask);
                    fired++;
                }
            }
            ......................
            processed++;
        }
    }
    
    if (flags & AE_TIME_EVENTS)    //flags有需要,就执行时间事件,即是执行定时器任务
        processed += processTimeEvents(eventLoop);
    return processed; /* return the number of processed file/time events */
}

4.什么时候设置回调函数的?

看aeProcessEvents时候可能会有疑惑,调用fe->rfileProc,但是都不知道该函数是什么内容的。

因为这是通过绑定的。就是把一个函数赋值给fe->rfileProc。

从main()入手。

//server.c
int main(int argc, char **argv) {
   .......................
    initServer();
}

//initServer函数中就有绑定回调函数的
void initServer(void) {
    //省略了很多其他部分的内容
    //socket(...),bind(...),listen(...),创建服务端的流程....................
    ............................................
    aeCreateTimeEvent(server.el, 1, serverCron, NULL, NULL);

    /* Create an event handler for accepting new connections in TCP  sockets. */
    createSocketAcceptHandler(&server.ipfd, acceptTcpHandler)

    /* Register before and after sleep handlers (note this needs to be done
     * before loading persistence since it is used by processEventsWhileBlocked. */
    aeSetBeforeSleepProc(server.el,beforeSleep);
    aeSetAfterSleepProc(server.el,afterSleep);
    ................................
}

1. 绑定SleepProc

void aeSetBeforeSleepProc(aeEventLoop *eventLoop, aeBeforeSleepProc *beforesleep) {
    eventLoop->beforesleep = beforesleep;
}

void aeSetAfterSleepProc(aeEventLoop *eventLoop, aeBeforeSleepProc *aftersleep) {
    eventLoop->aftersleep = aftersleep;
}

在服务器初始化时候就绑定了这两个函数,那么在运行aeProcessEvents时候,就会调用函数beforesleepaftersleep

2. 绑定服务端的读事件回调函数

就是在createSocketAcceptHandler,其就是调用aeCreateFileEvent。sfd->fd[j]就是服务端fd,AE_READABLE就是需要监听的事件,即是监听服务端的读事件。回调函数是accept_handler,即是把accept_handler赋值给fe->rfileProc。

从函数aeProcessEvents,会返回就绪的fd和就绪的事件类型。当服务端fd返回的时候,是读事件就绪,就会调用fe->rfileProc而这时rfileProc就是accpet_handler,即是会调用accpet_handler。

//sfd是个数组,我们看源码的话,目前就当其是一个元素的即可,就不用使用for
//就把这个函数当做就是使用aeCreateFileEvent即可。
int createSocketAcceptHandler(socketFds *sfd, aeFileProc *accept_handler) {
    for (int j = 0; j < sfd->count; j++) {
        aeCreateFileEvent(server.el, sfd->fd[j], AE_READABLE, accept_handler,NULL)
    }
    //错误处理没有展示
    return C_OK;
}

int aeCreateFileEvent(aeEventLoop *eventLoop, int fd, int mask,
        aeFileProc *proc, void *clientData)
{
    ................
    fe->mask |= mask;
    if (mask & AE_READABLE) fe->rfileProc = proc;
    if (mask & AE_WRITABLE) fe->wfileProc = proc;
    fe->clientData = clientData;
    .....................
}

回调函数acceptTcpHandler很明显会调用accpet去对客户建立连接的。

3. 绑定客户端的读事件回调函数

继续跟着函数acceptTcpHandler。

void acceptTcpHandler(aeEventLoop *el, int fd, void *privdata, int mask) {
    int cport, cfd, max = MAX_ACCEPTS_PER_CALL;

    while(max--) {
        //调用accept
        cfd = anetTcpAccept(server.neterr, fd, cip, sizeof(cip), &cport);
         ............................
        acceptCommonHandler(connCreateAcceptedSocket(cfd),0,cip);
    }
}

static void acceptCommonHandler(connection *conn, int flags, char *ip) {
    .................
    /* Create connection and client */
    client *c = createClient(conn))

    /* Last chance to keep flags */
    c->flags |= flags;
}

client *createClient(connection *conn) {
    client *c = zmalloc(sizeof(client));

    if (conn) {
        connSetReadHandler(conn, readQueryFromClient);    //设置客户端的读回调函数
        connSetPrivateData(conn, c);
    }
    ...................
    if (conn) linkClient(c);    //把该客户端添加到服务器server.client链表中保存
}

static inline int connSetReadHandler(connection *conn, ConnectionCallbackFunc func) {
    return conn->type->set_read_handler(conn, func);
}

set_read_handler也是个回调函数,其设置成了是函数connSocketSetReadHandler。

这里set_read_handler也是个回调函数是因为有两种类型的connection,Redis是把一个客户端封装成一个connnection(该结构体后续会讲解)

//当前就认为conn->type->ae_handler是参数func即可,内部有很多兜兜转转
static int connSocketSetReadHandler(connection *conn, ConnectionCallbackFunc func) {
    if (func == conn->read_handler) return C_OK;

    conn->read_handler = func;
    if (!conn->read_handler)
        aeDeleteFileEvent(server.el,conn->fd,AE_READABLE);
    else
        if (aeCreateFileEvent(server.el,conn->fd,
                    AE_READABLE,conn->type->ae_handler,conn) == AE_ERR) return C_ERR;
    return C_OK;
}

这样就把readQueryFromClient设置给了fe->rfileProc。当epoll_wait返回就绪的fd和事件,若该fd是客户端,也是读事件,那就会执行fe->rfileProc,即是执行readQueryFromClient

4. 绑定客户端的写事件回调函数

这个是在绑定的beforeSleep函数中。

void beforeSleep(struct aeEventLoop *eventLoop) {
    ..........................
    /* Handle writes with pending output buffers. */
    handleClientsWithPendingWritesUsingThreads();
    .............................
}

int handleClientsWithPendingWritesUsingThreads(void) {
    ...................
    listRewind(server.clients_pending_write,&li);
    while((ln = listNext(&li))) {
        client *c = listNodeValue(ln);

        //如果缓冲区中还有回复客户端的数据,就需要设置写回调函数,当epoll_wait返回客户端fd的写事件时候,就会执行sendReplyToClient
        if (clientHasPendingReplies(c) &&
                connSetWriteHandler(c->conn, sendReplyToClient) == AE_ERR)
        {
        }
    }
    ......................
}

static inline int connSetWriteHandler(connection *conn, ConnectionCallbackFunc func) {
    return conn->type->set_write_handler(conn, func, 0);
}

和set_read_handler一样,set_write_handler也是后面设置的。该函数是connSocketSetWriteHandler。

static int connSocketSetWriteHandler(connection *conn, ConnectionCallbackFunc func, int barrier) {
    if (func == conn->write_handler) return C_OK;

    conn->write_handler = func;
    if (barrier)
        conn->flags |= CONN_FLAG_WRITE_BARRIER;
    else
        conn->flags &= ~CONN_FLAG_WRITE_BARRIER;
    if (!conn->write_handler)
        aeDeleteFileEvent(server.el,conn->fd,AE_WRITABLE);
    else
        if (aeCreateFileEvent(server.el,conn->fd,AE_WRITABLE,
                    conn->type->ae_handler,conn) == AE_ERR) return C_ERR;
    return C_OK;
}

这样就把 sendReplyToClient设置给了fe->wfileProc。当epoll_wait返回客户端fd的写事件时候,就会调用fe->wfileProc,即是执行sendReplyToClient。

5.绑定时间事件

使用aeCreateTimeEvent来绑定时间事件的回调函数。在aeProcessEvents函数内部会调用processTimeEvents去执行定时器任务。

long long aeCreateTimeEvent(aeEventLoop *eventLoop, long long milliseconds,
        aeTimeProc *proc, void *clientData,
        aeEventFinalizerProc *finalizerProc)
{
    long long id = eventLoop->timeEventNextId++;
    aeTimeEvent *te;

    te = zmalloc(sizeof(*te));
    if (te == NULL) return AE_ERR;
    te->id = id;
    te->when = getMonotonicUs() + milliseconds * 1000;
    //设置事件的回调函数
    te->timeProc = proc;
    te->finalizerProc = finalizerProc;
    te->clientData = clientData;
    te->prev = NULL;
    te->next = eventLoop->timeEventHead;
    te->refcount = 0;
    if (te->next)
        te->next->prev = te;
    eventLoop->timeEventHead = te;
    return id;
}

5. 有了IO多路复用,为什么还需要reactor模式?

IO多路复用与事件驱动

首先要明确一点,reactor模式就是基于IO多路复用的。事件驱动也是IO多路复用的,不是说使用了reactor模式才是使用了事件驱动。

以事件为连接点,当有IO事件准备就绪时,就会通知用户,并且告知用户返回的是什么类型的事件,进而用户可以执行相对应的任务。这样就不用在IO等待上浪费资源,这便是事件驱动的核心思想。

比如你点了两份外卖,外卖A,外卖B。之后你无需时刻打电话去问外卖到了没。外卖到的时候,外卖员会打电话通知你。这中途你就可以做自己的事,不用纯纯等待。还有可以知道是外卖A到了还是外卖B到了,外卖员会告知是哪个外卖到的。

这个就是事件驱动。IO事件准备就绪时,会自动通知用户,并会告知其事件类型。

所以应该是,IO多路复用 + 回调机制 构成了 reactor模式

IO同步与异步的判断

还有reactor模式是同步的。因为其是使用IO多路复用的,而IO多路复用是同步的。

IO操作是同步还是异步,关键看数据在内核空间与用户空间的拷贝过程(数据读写的IO操作)

reactor模式的优点

网上很多说其可以很好处理高并发,但是我觉得IO多路复用也可以处理的。

还有说可扩展性,通过增加Reactor实例个数来充分利用CPU资源;那通过在其他线程再创建epoll也行的。

所以,我觉得一个很大的优势是:

  • 应用处理请求的逻辑,与事件分发框架完全分离,即是解耦,有很高的复用性

即写应用处理时,只需关注如何处理业务逻辑。Reactor框架本身与具体事件处理逻辑无关。

假如是把reactor模式做成一个网络库给用户使用。那用户就只需要关注处理请求的逻辑即可。该网络库对外开放setCallback函数。

假设,用户写服务器服务,收到客户端发来的数据(一串数字),想对数字做加法 或者想对数字做乘法都行。只要使用setCallback函数设置好处理请求的逻辑就行。这就是应用处理请求的逻辑,与事件分发框架完全分离,这是很方便的。

假如该框架是不对外开放的(就是用户不能使用setCallback),像Redis一样,为什么还需要使用reactor模式,为什么不可以只用IO多路复用,不用回调机制呢?

我认为,也是解耦的好处。Redis编写人员需要改变处理请求的逻辑时候,就只改变某个函数即可,不需要深入到epoll框架去改变,这就是很方便的。

6. 通过Redis网络部分,学到如何实现reactor模式

  1. 先对epoll进行封装,方便后续使用
  2. 需要对fd进行监听,并注册需要关注的事件类型,并注册关注的事件类型就绪时,需要执行的函数。所以,可以把fd关注的事件类型事件就绪执行的函数三者封装在一个结构体aeFileEvent内。
  3. 我们简单使用epoll时候,肯定是会写 while(1){ epoll_wait(....); }。这就是个事件循环,所以可以再封装个结构体EventLoop, 其也是对epoll的再次封装,即是会调用前面封装好的epoll的函数。EventLoop内部应该有存储aeFileEvent的数组。
    1. 在调用epoll_wait时,会返回就绪的fd和事件类型,把返回的(就绪fd和事件类型)数组 赋值给EventLoop的aeFileEvent数组。之后就是使用该aeFileEvent,根据事件类型执行对应的回调函数
  4. 提供setCallback函数,设置好对应的回调函数,即是把需要执行的函数 赋值给 aeFileEvent中的 事件就绪执行函数

Redis的reactor模式没有使用到结构体event_data_t的指针变量ptr。

若是想逐步实现recactor模式,这里推荐下本人写的0.仿造muduo,实现linux服务器开发思路

该文章专栏有详细的逐步实现的reactor模式的代码流程。欢迎查看。 

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/1625498.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

pycharm编辑器------快捷键

pycharm编辑器基础快捷键 上下文操作 01PyCharm 有数百个上下文相关操作&#xff0c;可以帮助您转换、改进和修正代码。按 AIt Enter 以调用“显示上下文操作"。 02我们来应用第一个快速修复:移除形参。 03您几乎可以在任何上下文中调用"显示上下文操作"。我们…

前端补充---15

一、新增表单 <!DOCTYPE html> <html lang"en"> <head><meta charset"UTF-8"><meta name"viewport" content"widthdevice-width, initial-scale1.0"><title>Document</title> </head&g…

C++11 数据结构7 队列的链式存储,实现,测试

前期考虑 队列是两边都有开口&#xff0c;那么在链式情况下&#xff0c;线性表的链式那一边作为对头好呢&#xff1f; 从线性表的核心的插入和删除算法来看&#xff0c;如果在线性表链表的头部插入&#xff0c;每次循环都不会走&#xff0c;但是删除的时候&#xff0c;要删除线…

IDEA中配置使用maven和配置maven的中央仓库

1 以汉化后的IDEA为例配置maven 打开idea选择文件 选择 设置 点击>构建.执行.部署 点击>构建工具 点击>Maven 其中Maven主路径 就是我们maven下载解压后的路径 可以通过边上的三个点选择你解压后的绝对路径&#xff0c;也可以直接把解压后的绝对路劲复制过来 以下…

C++之通俗易懂学模版

目录 一、了解什么是泛性编程 二、模版 1.函数模版 1.1 函数模板概念 1.2 函数模板格式 1.3 函数模板的原理 1.4 函数模板的实例化 1.5 模板参数的匹配原则 2.类模板 2.1 类模板的定义格式 2.2 类模板的实例化 3. 非类型模板参数 4. 模板的特化 4.1 概念 4.2 …

半导体晶圆厂内外网数据单向导出,什么样的方案才安全又便捷?

半导体晶圆厂企业为了隔绝外部⽹络有害攻击、保护⽹络和数据安全&#xff0c;通常采⽤物理隔离的⽅式&#xff0c;将企业内⽹与互联⽹隔离。⽹络隔离后&#xff0c;基于业务开展需求&#xff0c;部分重要数据仍需由内⽹导⼊及导出⾄外部⽹络区域。为保障数据的安全合规性&#…

【Qt常用控件】—— 多元素控件

目录 1.1 List Widget 1.2 Table Widget 1.3 Tree Widget 1.4 小结 Qt 中提供的多元素控件有: QListWidget QListView QTableWidget QTableView QTreeWidget QTreeView xxWidget 和 xxView 之间的区别 以 QTableWidget 和 QTableView 为例&#xff1a; QTableView 是基于…

Java:优先级队列(堆)

一、初识【堆】 1、什么是【优先级队列】&#xff1f; 前面的文章我们介绍过队列&#xff0c;队列是一种先进先出的数据结构&#xff0c;但是&#xff0c;在某些情况下&#xff0c;操作的数据可能需要有一个优先级来获取数据&#xff0c;例如优先获取队列中最大的元素&#xf…

这个合租室友真的没有一点公德心,还好他搬走了

这个合租室友真的没有一点公德心&#xff0c;还好他搬走了 这个出租屋有四个房间。 有三个卧室&#xff0c;和一个隔断。 我住三个卧室中的一个。下图中右边那个就是我住的。 2023年下半年&#xff0c;左边那个屋子来了一个新租户小白。 在住的过程中&#xff0c;隔断间的租…

致力于为企业提升媒体宣传的一种新策略-软文发稿和投放

随着新媒体时代的快速发展&#xff0c;媒体宣发的方式也在不断迭代&#xff0c;其中&#xff0c;“软文发稿”成为了许多企业非常看重的一种媒体宣发方式。那么&#xff0c;什么是“软文发稿”呢&#xff1f;这是一种通过撰写有新闻属性的广告文章&#xff0c;将企业的品牌、产…

武汉星起航:凭借专业优势,助力卖家孵化,推动跨境电商发展

在全球经济一体化的大背景下&#xff0c;跨境电商行业以其独特的优势&#xff0c;成为了推动国际贸易发展的重要力量。作为这一领域的佼佼者&#xff0c;武汉星起航电子商务有限公司积极践行“走出去”战略&#xff0c;凭借自营店铺运营经验和跨境电商资源的积累&#xff0c;利…

嵌入式学习59-ARM7(自动设备号和混杂设备)

知识零碎&#xff1a; 头文件查找&#xff1a; /arm/路径下的头文件 linux驱动程序的编写&#xff0c;编译&#xff0c;运行过程 -------------------------------------------------------------------------------------------------------------------------------- 1.…

photoshop如何使用PS中的吸管工具吸取软件外部的颜色?

第一步&#xff0c;打开PS&#xff0c;随意新建一个画布&#xff0c;或打开一个图片。 第二步&#xff0c;将PS窗口缩小&#xff0c;和外部窗口叠加放置&#xff0c;以露出后面的其它页面。 第三步&#xff0c;选中吸管工具&#xff0c;在PS窗口内单击一点吸取颜色&#xff0c;…

个人信息 | Q1违规通报分析出炉,通报量环比提升180%,工信部重点方向明晰!

为协助企业了解监管部门动向&#xff0c;爱加密基于移动应用安全大数据平台长期收集个人信息违规通报信息&#xff0c;并定期抽查新上架及有更新应用的个人信息合规情况&#xff0c;未来将定期公布部分分析结果&#xff0c;以便各企业了解监管侧及市场侧情况。 根据爱加密统计…

一文带你看懂多线程编程

title: 深入理解多线程编程 date: 2024/4/25 17:32:02 updated: 2024/4/25 17:32:02 categories: 后端开发 tags: 线程同步互斥锁死锁避免竞态条件线程池异步编程性能优化 第一章&#xff1a;多线程基础 1.1 线程概念与原理 线程&#xff1a;在操作系统中&#xff0c;一个…

探索SSH:常见功能与使用技巧

文章目录 远程登录密钥认证文件传输端口转发执行远程命令会话保持总结 SSH&#xff08;Secure Shell&#xff09;是一种安全网络协议&#xff0c;用于通过加密的方式在网络上安全地进行远程登录和执行命令。它是管理远程服务器和网络设备的重要工具之一。在本文中&#xff0c;我…

【TDengine】mac m1解决no taos in java.library.path

前言 使用macos搭建springbootmybatisplus&#xff0c;通过mqtt将数据更新到tdenigne 3.2.3&#xff0c;数据源使用远程服务器的tdengine。 问题 启动时报错&#xff1a; Caused by: java.lang.UnsatisfiedLinkError: no taos in java.library.path 以下是官方文档 打开本…

【电路笔记】-Colpitts振荡器

Colpitts振荡器 文章目录 Colpitts振荡器1、概述2、基本Colpitts 振荡器电路3、示例14、使用运算放大器的Colpitts振荡器5、总结Colpitts 振荡器设计使用两个中心抽头电容器与并联电感器串联,形成产生正弦振荡的谐振储能电路。 1、概述 在许多方面,Colpitts 振荡器与我们在上…

(开源版)企业AI名片S2B2C商城系统商业计划书

团队使命 擎动人工智能跃迁&#xff0c;融技术与商业之行 项目背景 话说2022年12月7日那天&#xff0c;国务院大大发布了个重磅消息&#xff0c;宣布咱们国家的三年抗疫大战终于告一段落&#xff0c;全面放开啦&#xff01;这意味着咱们的市场经济要重新焕发生机啦&#xff…

LeetCode //C - 38. Count and Say Medium Topics Companies

38. Count and Say The count-and-say sequence is a sequence of digit strings defined by the recursive formula: countAndSay(1) “1”countAndSay(n) is the way you would “say” the digit string from countAndSay(n-1), which is then converted into a differen…