reactor (百万并发服务器) -- 1

news2025/1/6 19:26:13

        为了从点滴开始,文章会先从一些基础socket去补充一些经常发生但是没有很深入去思考的细节。然后我们再开始去设计reactor的设计,可以选择跳过起过前面部分。

        为了能从0开始去设计,测试,优化...整个过程会分为2-3篇文章输出,喜欢的可以点歌关注哦。

socket的API理解

        这部分不过多详细的去解释,只是对使用过程中容易忽略的地方进行补充。很基础的部分不是很了解的部分,可以看我的另一篇文章。

C++项目实战-socket编程_c++ socket-CSDN博客icon-default.png?t=N7T8https://blog.csdn.net/weixin_46120107/article/details/126528923

    //-- 创建(分配)一个管理者sockfd

    // 参数1:表示创建套接字使用的协议族   -- AF_INET(IPv4地址) AF_INET6(IPv6地址) AF_UNIX(本地通信)
    // 参数2:表示创建套接字使用的协议类型     -- SOCK_STREAM(字节流套接字) SOCK_DGRAM(数据报套接字)
    // 参数3: 表示创建套接字使用的协议 -- 0(由操作系统自动选择适当的协议类型) IPPROTO_TCP(TCP) TPPROTO_UDP(UDP)
    int sockfd = socket(AF_INET, SOCK_STREAM, 0);

        为了更好的理解,我们可以把socket理解为一家门店招聘一个管理者或者一个小饭店的老板。那么现在我们有管理者了,现在需要把这个管理者安排到指定的门店去工作。

-- 选择门店(设置这个门店接待的客人类型)
struct sockaddr_in serveraddr;
memset(&serveraddr, 0, sizeof(serveraddr));

-- 表示可以进门的顾客是 AF_INET(IPv4协议族的)
serveraddr.sin_family = AF_INET;
// INADDR_ANY(0.0.0.0) 表示可以接待来着任何地方的顾客 
// 127.0.0.1/localhost: 只可以让本机访问,其他计算机是无法访问的(只接待内部人员)
serveraddr.sin_addr.s_addr = htonl(INADDR_ANY);

-- 表示顾客可以从哪个门口进入
serveraddr.sin_port = htons(2048);


-- 将管理者安排到指定的门店
bind(sockfd, (struct sockaddr*)&serveraddr, sizeof(struct sockaddr));

        现在我们已经安排好我们的门店管理者(店长)了,我们还需要一个记录顾客进店的登记表,或者管理进店的秩序方式,防止发生踩踏事件,也为了公平起见,先来的客人,我们先进行服务。在计算机中采用队列的先进先出性质能很好的解决。

// --记录所有进门的顾客信息(连接)
// 创建等待队列,将所有经过端口连接的请求放到等待队列
// 如果队列已满,则新的请求就会被拒绝,这个数字并不是一个硬性要求,实际上系统会设定一个合适的值
listen(sockfd, 10);

        现在我们已经把管理方案和管理人员都安排好了,我们就可以开业了吗?等等,我们还需要记录顾客的信息呢?我们需要知道顾客从哪里来,这样我们才能更地道的为顾客提供服务。所以我们需要先准备一张表格,登记顾客的信息。

struct sockaddr_in clientaddr;
socklen_t len = sizeof(clientaddr);

        到现在为止,我们就完事具备了,等待顾客的光临,然后为每一个顾客专门安排一个一对一个的服务员,我们主打一个服务。

// -- 等待顾客到来,当顾客当来,填充clientaddr表,然后由店长叫一个专门的服务员来为顾客提供服务
// 这个函数是阻塞的,可以设置成非阻塞的方式(使用fcntl将套接字socket设置成非阻塞的即可)
int clientfd = accept(sockfd, (struct sockaddr*)&clientaddr, &len);

        接下来就是服务员与顾客之间的对话了。顾客提出需求,服务员进行解答和提供服务。

// 顾客提出要求,服务员通过recv的方式接收到 
recv(clientfd, buffer, 128, 0);

// 业务处理 --> 针对不同的需求,服务员会做出相应的处理方式
TODO

// 给顾客提供反馈,让顾客收获到快乐
send(clientfd, buffer, 128, 0);

        顾客总有离开的时候,这个时候我们需要把安排的服务员收回,下次让他为其他顾客提供服务。

close(clientfd);

        夜深了,门店需要打样了。我们要把管理者也收回。

 close(sockfd);

-------------------------------------------------------------------------------------------------------------------------

现在我们来实操一下(这里我们只进行TCP进行表述):

客户端我们就不写了,使用sockTools进行测试。

https://pan.baidu.com/share/init?surl=GqaKzEZWNvhXivm0FAnZngicon-default.png?t=N7T8https://pan.baidu.com/share/init?surl=GqaKzEZWNvhXivm0FAnZng提取码:s5wy

我们先思考2个问题:

  • 系统中出现大量TIME_WAIT状态的原因?
  • 系统中出现大量的CLOSE_WAIT状态的原因?

我们来复现第一种情况:系统中出现大量TIME_WAIT状态的原因?

#include <pthread.h>
#include <sys/socket.h>
#include <netinet/in.h>
#include <string.h>
#include <error.h>
#include <errno.h>
#include <stdio.h>
#include <unistd.h>


int main()
{

    int sockfd = socket(AF_INET, SOCK_STREAM, 0);
    if(-1 == sockfd)
    {
        perror("socket():");
        return -1;
    }

    struct sockaddr_in serveraddr;
    memset(&serveraddr, 0, sizeof(serveraddr));

    serveraddr.sin_family = AF_INET;
    serveraddr.sin_addr.s_addr = htons(INADDR_ANY);
    serveraddr.sin_port = htons(2048);   // 这里补充一个细节:端口1024以前是系统的,如果要用需要使用root权限

    if(-1 == bind(sockfd, (struct sockaddr*)&serveraddr, sizeof(serveraddr)))
    {
        perror("bind");
        return -1;
    }

    listen(sockfd, 10);

    struct sockaddr_in clientaddr;
    socklen_t len = sizeof(clientaddr);

    int clientfd = accept(sockfd, (struct sockaddr*)&clientaddr, &len);
    if(-1 == clientfd)
    {
        perror("clientfd");
        return -1;
    }

    // char buffer[128] = {0}; 
    // while(1)
    // {
    //     int count = recv(clientfd, buffer, 128, 0);
    //     if(count == 0)
    //     {
    //         printf("断开\n");
    //         close(clientfd);
    //     }
    //     else 
    //     {
    //         send(clientfd, buffer, 128, 0);
    //     }
    // }

    close(sockfd);

    return 0;
}

我们来看下上面的代码,当我们连接之后,立马就退出了。然后就进入了TIME_WAIT状态,接下来我们怎么分析呢?

在TCP连接中,主动关闭的一方会进入TIME_WAIT状态,这样是不正常的。说明服务器会总是断开连接导致系统中出现大量的TIME_CLOSE状态。导致原因可能是进程异常退出(崩溃)...

我们来复现第二种情况:系统中出现大量的CLOSE_WAIT状态的原因?

#include <pthread.h>
#include <sys/socket.h>
#include <netinet/in.h>
#include <string.h>
#include <error.h>
#include <errno.h>
#include <stdio.h>
#include <unistd.h>


int main()
{

    int sockfd = socket(AF_INET, SOCK_STREAM, 0);
    if(-1 == sockfd)
    {
        perror("socket():");
        return -1;
    }

    struct sockaddr_in serveraddr;
    memset(&serveraddr, 0, sizeof(serveraddr));

    serveraddr.sin_family = AF_INET;
    serveraddr.sin_addr.s_addr = htons(INADDR_ANY);
    serveraddr.sin_port = htons(2048);   // 这里补充一个细节:端口1024以前是系统的,如果要用需要使用root权限

    if(-1 == bind(sockfd, (struct sockaddr*)&serveraddr, sizeof(serveraddr)))
    {
        perror("bind");
        return -1;
    }

    listen(sockfd, 10);

    struct sockaddr_in clientaddr;
    socklen_t len = sizeof(clientaddr);

    int clientfd = accept(sockfd, (struct sockaddr*)&clientaddr, &len);
    if(-1 == clientfd)
    {
        perror("clientfd");
        return -1;
    }

    char buffer[128] = {0}; 
    while(1)
    {
        int count = recv(clientfd, buffer, 128, 0);
        if(count == 0)
        {
            printf("断开\n");
            // close(clientfd);
        }
        else 
        {
            send(clientfd, buffer, 128, 0);
        }
    }

    close(sockfd);

    return 0;
}

在TCP连接中,对端关闭连接后,收到FIN后发送ACK确认收到了要关闭的信息,随后进入CLOSE_WAIT状态,关闭后进入LAST_ACK状态。按这个道理分析,我们很容易得出当服务器未调用close出现在TCP状态中,对端调用close关闭连接后,服务器回送ACK,表明收到了消息后进入半连接状态,当服务器调用close后,退出CLOSE_WAIT状态。其实从字面就能猜出来,关闭等待嘛,合适关闭呢,调用close呗。说明连接没关闭,这样是很危险的,很容易造成描述符没有释放而程序崩溃。


这个话题到这里就结束了,后面遇到一些情况,会进行补充...

多线程/多进程服务器

        在上面的代码中我们很容易看出来,这个服务器最大的缺陷是什么?

        只能处理一个连接。我们来测试一下:

        连接1的情况:

        连接2的情况:

我们回到代码:

    int clientfd = accept(sockfd, (struct sockaddr*)&clientaddr, &len);
    if(-1 == clientfd)
    {
        perror("clientfd");
        return -1;
    }

    char buffer[128] = {0}; 

    // 循环
    while(1)
    {
        int count = recv(clientfd, buffer, 128, 0);
        if(count == 0)
        {
            printf("断开\n");
            close(clientfd);
        }
        else 
        {
            send(clientfd, buffer, 128, 0);
        }
    }

    close(sockfd);

   我们很容易看出来,我们没有为第二个连接提供专门服务员进行服务。那有什么办法吗?

   很容易想到,开线程/开进程,接下来我们就对我们的代码进行改进吧!!!

        哈哈,似乎问题得到的合理的解决,那么这样真的满足所有的需求吗?这里我才两个连接呢,如果我们有1w个连接同时在线呢? 很读朋友很快想到,那用线程池呗,其实不然。用线程池不是解决这个问题的核心,如果连接池的连接数拿到设置成1w吗?那如果有1w+1的连接呢,还不是要去创建往线程池中添加一个线程。

        这里插一个故事:apache的C10K问题。C10K问题是只支持一万个并发连接问题,在Apache中,每一个客户端请求的会分配一个独立的线程或者进程来处理。当并发请求增加时,系统将消耗更多的线程或者进程资源,这将导致内存和CPU资源的过度使用,从而影响服务器的性能。

        如何解决呢?怎么办呢?害,好像这个问题无解了。

        我们需要寻找在一种在一个线程中对应多个连接的方法 --> IO多路复用出现了

IO多路复用 

        在Linux中,IO多路复用主要包括了select/poll/epoll技术。接下来我们将对这个几个技术做一个的中的疑问提出一些看法,很基础的部分请跳到开头的提供的文章中。


        select

  • 为什么说select会受到1024的限制呢?
  • select的性能权限是什么呢?
  • select的使用上的缺点在哪呢?

我们先来使用和测试下select,验证下它是否能在一个线程中处理多个连接的问题。(传参的说明在代码中有注释,这里就不过多的重复了)

#include <pthread.h>
#include <sys/socket.h>
#include <netinet/in.h>
#include <string.h>
#include <error.h>
#include <errno.h>
#include <stdio.h>
#include <unistd.h>
#include <sys/select.h>


int main()
{
    int sockfd = socket(AF_INET, SOCK_STREAM, 0);

    struct sockaddr_in serveraddr;
    memset(&serveraddr, 0, sizeof(serveraddr));

    serveraddr.sin_family = AF_INET;
    serveraddr.sin_addr.s_addr = htonl(INADDR_ANY);
    serveraddr.sin_port = htons(2048);

    if(-1 == bind(sockfd, (struct sockaddr*)&serveraddr, sizeof(struct sockaddr)))
    {
        perror("bind()");
        return -1;
    } 

    listen(sockfd, 10);

    struct sockaddr_in clientaddr;
    socklen_t len = sizeof(clientaddr);

    // 定义监视IO集合
    fd_set rfds,rset;
    // 初始化时全部置为0
    FD_ZERO(&rfds);
    // 将需要监视的IO置1
    FD_SET(sockfd, &rfds);
    // 因为select内部的循环是 for(int i = 0; i < __nfds; ++i)
    // 为了减少循环,我们提前记录下来
    int maxFd = sockfd;

    int fds[1024] = {0};
    fds[maxFd] = 1;

    while(1)
    {
        
        rset = rfds;

        // 这里的+1很重要哦
        // 参数1: 表示需要监视的文件描述符数量+1
        // 参数2:表示需要监视的读事件集合
        // 参数3: 表示需要监视的写事件集合
        // 参数4:表示需要坚实的错误事件集合
        // 参数5: 超时时间,NULL表示阻塞方式
        // 返回: 就绪描述符个数
        int ready = select(maxFd + 1, &rset, NULL, NULL, NULL);

        // 对连接事件的处理
        if(FD_ISSET(sockfd, &rset))
        {
            struct sockaddr_in clientaddr;
            socklen_t len = sizeof(clientaddr);

            int clientfd = accept(sockfd, (struct sockaddr*)&clientaddr, &len);

            // 将新的需要监视的IO加入到集合中
            FD_SET(clientfd, &rfds);

            maxFd = clientfd;

            if(clientfd > maxFd)
            {
                maxFd = clientfd;
            }

            fds[clientfd] = 1;
        }

        for(int i = sockfd + 1; i <= maxFd; ++i)
        {
            if(FD_ISSET(i, &rset))
            {
                char buffer[128] = {0};
                int count = recv(i, buffer, 128, 0);
                if(count == 0)
                {
                    printf("断开 %d\n", i);
                    fds[i] = 0;
                    FD_CLR(i, &rfds);
                    close(i);

                    // 如果当前最大的被关闭了,则需要更新(减小无用循环)
                    if(i == maxFd)
                    {
                        for(int k = maxFd; k > sockfd + 1; --k)
                        {
                            if(fds[i] == 1)
                            {
                                maxFd = k;
                            }
                        }
                    }

                    continue;
                }
                send(i, buffer, count, 0);
            }
        }
    }
   
    close(sockfd);  

    return 0;
}

我们来复现第一种情况:为什么说select会受到1024的限制呢?

//-------------------------------------------//
#define __FD_SETSIZE		1024
//-------------------------------------------//

//-------------------------------------------//
#define __NFDBITS	(8 * (int) sizeof (__fd_mask))
//-------------------------------------------//

//-------------------------------------------//
typedef long int __fd_mask;
//-------------------------------------------//

/* fd_set for select and pselect.  */
typedef struct
  {
    /* XPG4.2 requires this member name.  Otherwise avoid the name
       from the global namespace.  */
#ifdef __USE_XOPEN
    __fd_mask fds_bits[__FD_SETSIZE / __NFDBITS];
# define __FDS_BITS(set) ((set)->fds_bits)
#else
    __fd_mask __fds_bits[__FD_SETSIZE / __NFDBITS];
# define __FDS_BITS(set) ((set)->__fds_bits)
#endif
  } fd_set;

这里面很重要的一个部分: __fd_mask fds_bits[__FD_SETSIZE / __NFDBITS]

__fd_mask --> long int

__FD_SETSIZE --> 1024

__NFDBITS --> long int

__fd_mask fds_bits[__FD_SETSIZE / __NFDBITS]-->long int fds_bits[128]-->8 * 128个字节 --> 1024个位-->每一个位对应一个IO,分别有1和0两种状态对应事件的发生或者没有发生。

select的性能权限是什么呢?

我们在写上面select的过程中就能体会到select的一些权限。

1.我们需要重复的复制集合,而且在select的内部,需要将集合从用户区复制到内核区,等有事件到来,又需要将数据从内核区复制到用户区。

2.每次的需要遍历整个IO集合。

select的使用上的缺点在哪呢?

1.参数太多,容易搞错。一共5个参数,用户体验不好。

2.每次需要把待检测的IO集合进行拷贝,对性能有影响。

3.对IO的数量有限制。


        poll

        对于poll而言,本质上是对select的改进。但是只停留在了表面,并没有解决性能问题,只是在参数层面做了优化和解除了通过宏定义来设置的限制。本质上没啥好说的,我们就来简单实现下吧,熟悉熟悉代码。

#include <sys/socket.h>
#include <errno.h>
#include <netinet/in.h>
#include <stdio.h>
#include <string.h>
#include <unistd.h>
#include <pthread.h>
#include <sys/poll.h>

int main()
{

    int sockfd = socket(AF_INET, SOCK_STREAM, 0);

    struct sockaddr_in serveraddr;
    memset(&serveraddr, 0, sizeof(struct sockaddr_in));

    serveraddr.sin_family = AF_INET;
    serveraddr.sin_addr.s_addr = htonl(INADDR_ANY);
    serveraddr.sin_port = htons(2048);

    if(-1 == bind(sockfd, (struct sockaddr*)&serveraddr, sizeof(struct sockaddr)))
    {
        perror("bind");
        return -1;
    }

    listen(sockfd, 10);
    
    // 准备一个事件消息的数组,这个长度由用户定义 --> 突破了select通过宏定义写死的缺陷
    struct pollfd fds[1024] = {0};
    fds[sockfd].fd = sockfd;
    fds[sockfd].events = POLLIN;
    int fd_in[1024] = {0};
    int maxFd = sockfd;
    fd_in[sockfd] = 1;

    while(1)
    {
        int nready = poll(fds, maxFd + 1, -1);

        if(fds[sockfd].revents & POLLIN)
        {
            struct sockaddr_in clientaddr;
            socklen_t len = sizeof(clientaddr);

            int clientfd = accept(sockfd, (struct sockaddr*)&clientaddr, &len);
            fds[clientfd].fd = clientfd;
            fds[clientfd].events = POLLIN;

            fd_in[clientfd] = clientfd;
            maxFd = clientfd;
        }

        int i = 0;
        for(i = sockfd + 1; i <= maxFd; ++i)
        {
            if(fds[i].revents & POLLIN)
            {
                char buffer[128] = {0};
				int count = recv(i, buffer, 128, 0);
                if(count == 0)
                {
                    fds[i].fd = -1;
                    fds[i].events = 0;

                    close(i);
                    continue;
                }

                send(i, buffer, count, 0);
            }
        }

    }


    return 0;
}


        epoll

        epoll对于Linux来说太重要了,如果没有epoll的存在,Linux只能停留在设备相关了开发了,epoll解决了select的问题。从设计层面来说,摒弃了之前的数组思维。那么epoll怎么设计的呢?为了方便理解,我们举一个例子,现在有一个快递站点,有用户,有一个快递员。对于用户来说寄快递只需要将快递放到快递站点,取快递只需要到快递站点去就行了,不用关系之后会不会有用户搬出和新用户的入住,将这层分割出来了。

        我们来用下吧。

#include <sys/socket.h>
#include <errno.h>
#include <netinet/in.h>
#include <stdio.h>
#include <string.h>
#include <unistd.h>
#include <pthread.h>
#include <sys/epoll.h>

int main()
{

    int sockfd = socket(AF_INET, SOCK_STREAM, 0);
    
    struct sockaddr_in serveraddr;
    memset(&serveraddr, 0, sizeof(struct sockaddr_in));

    serveraddr.sin_family = AF_INET;
    serveraddr.sin_addr.s_addr = htonl(INADDR_ANY);
    serveraddr.sin_port = htons(2048);

    bind(sockfd, (struct sockaddr*)&serveraddr, sizeof(struct sockaddr));

    listen(sockfd, 10);

    // 这里的参数没有意义,为了兼用2.6以前版本。传参时需要大于0
    int epfd = epoll_create(1);

    // 一个epoll事件
    struct epoll_event ev;
    ev.events = EPOLLIN;
    ev.data.fd = sockfd;

    // 把连接事件加入到集合中(本质上是一个红黑树)
    epoll_ctl(epfd, EPOLL_CTL_ADD, sockfd, &ev);

    // 存储epoll事件,这里我们暂定位1024
    struct epoll_event events[1024] = {0};

    while(1)
    {
        int nready = epoll_wait(epfd, events, 1024, -1);
        int i = 0;
        for(int i = 0; i < nready; ++i)
        {
            int connfd = events[i].data.fd;
            if(sockfd == connfd)
            {
                struct sockaddr_in clientaddr;   
                socklen_t len = sizeof(clientaddr);
                int clientfd = accept(sockfd, (struct sockaddr*)&clientaddr, &len);
                
                ev.events = EPOLLIN | EPOLLET;
                ev.data.fd = clientfd;
                epoll_ctl(epfd, EPOLL_CTL_ADD, clientfd, &ev);
            }
            else if(events[i].events & EPOLLIN)
            {
                char buffer[10] = {0};
                int count = recv(connfd, buffer, 10, 0);
                if(count == 0)
                {
                    epoll_ctl(epfd, EPOLL_CTL_DEL, connfd, NULL);
                    close(i);
                    continue;
                }

                send(connfd, buffer, count, 0);
            }
        }
    }

    close(sockfd);

    return 0;
}

用法很简答哈,其实这样的代码有点问题,不能让epoll实现百万级别的并发。这里我们先不讨论,我们后面封住的时候会一步步优化代码的。

        epoll有存在两种触发机制,水平触发(LT)和边缘触发(ET),所谓的水平触发就是当缓冲区有数据的时候会一直触发,知道缓冲区为空。而边缘触发是指来数据的时候,触发一次。怎么测出来这样的效果,我们把接收的buffer变小,然后分别修改为ET和LT及上打印,很容易看出区别。篇幅问题,这里就演示了,很简答的。

        我们先提出两个问题:

        1.epoll里面有没有mmap?

        2.epoll是不是线程安全的?

        3.什么时候用水平触发合适,什么时候用边缘触发合适?

        对于第一个和第二个问题,我们需要阅读epoll的源码(路径:eventpoll.c),这里带大家一起看一下。(源码:The Linux Kernel Archives) --> 建议拷过来

        epoll里面有没有mmap?

        通过查看eventpoll.c,并没有发现关于mmap相关的API出现,可见epoll中是不包含mmap的。epoll的性能好是取决于他的设计方式和数据结构,减少遍历次数,不需要像select和poll一样需要将整个数组遍历,只需要将返回的列表中就绪的进行操作,使用红黑树结构能够将查找效率从O(N)缩减至log(N),当并发量越大时,效果越明显。

        epoll是不是线程安全的?

        通过查看eventpoll.c,它的过程是在内核太完成的,不涉及到用户空间和多线程竞争关系。但值得注意的时,如果在,当需要多个线程或者多个进程同时操作同一个epoll的文件描述符时,需要注意红黑树和就绪队列所在的空间存在竞争问题。

        什么时候用水平触发合适,什么时候用边缘触发合适?

        无论什么场景使用水平触发和边缘触发都可以实现,需要考虑的是在什么时候使用哪种方式会更合适。

        边缘触发对于类似于代理的那种方式,比如不对数据进行处理,直接转发的情况下使用边缘触发。当业务处理比较慢的情况,读缓冲区数据比较多的情况,需要处理完业务然后再此去读,这时候使用水平触发。

        select和poll使用的都是水平触发方式。


        我们总算是把socket基础部分讲完了,现在我们要正式的开始进行reactor的封装了。

reactor

        通常情况下,在epoll对IO的处理有两种方式,一种是面向IO的处理模式,一只种是面向事件的处理方式。

        面向IO的方式不方便封装和维护,面向事件的方式发生什么事件就调用什么回调函数,例如发生读IO事件,那么就调用响应的读回调函数。换一种说法,通过面向连接的方式能够将IO事件与读写存储以及事件回调封装到一起。

        对于reactor的封装,我们有要采用面向事件的方式,事实上reactor也是这么做的。


        step01:首先我们需要考虑处理的对象是什么?

        ---> 能够想到我们需要处理的是IO

        step02:然后我们需要考虑如何封装?

        --> 对于一个IO来说有哪些属性或者行为,IO对应的文件描述符,读数据、写数据、各种事件的处理。看起来就这么多。

        stop03:代码怎么体现呢?        

        --> 使用结构体或者类将他们组装在一起。所以我们可以设计以下的结构:

struct conn_item
{
    int fd;   // 一个IO事件对应的文件描述符    
                              
    char rbuffer[BUFFER_LENGTH];        // 读缓冲区
    int rlen;
    char wbuffer[BUFFER_LENGTH];        // 写缓冲区
    int wlen;

    // 事件处理 - 连接、读事件、写事件
    RCALLBACK accept_callback;
    RCALLBACK read_callback;
    RCALLBACK write_callback;
};

        到目前为止,看起一切都是这么的顺其自然。

        stop04:现在我们已经设计好了一个IO需要处理的结构了,那么在使用的过程中,无可厚非会有很多个IO,为了方便我们定义一个数组来存储这些。当然你可以选择其他的数据结构进行存储。

struct conn_item connlist[1024] = {0};

        stop05:总的设计思路我们确定了下来,接下来我们需要把整个使用的框架先初步搭起来,因为这里我们使用epoll来做,本质上就是epoll的那一套。

#include <sys/socket.h>
#include <errno.h>
#include <netinet/in.h>
#include <stdio.h>
#include <string.h>
#include <unistd.h>
#include <sys/poll.h>
#include <sys/epoll.h>

#define BUFFER_LENGTH		1024

typedef int (*RCALLBACK)(int fd);

struct conn_item
{
    int fd;   // 一个IO事件对应的文件描述符    
                              
    char rbuffer[BUFFER_LENGTH];        // 读缓冲区
    int rlen;
    char wbuffer[BUFFER_LENGTH];        // 写缓冲区
    int wlen;

    // 事件处理 - 连接、读事件、写事件
    RCALLBACK accept_callback;
    RCALLBACK read_callback;
    RCALLBACK write_callback;
};

struct conn_item connlist[1024] = {0};

int accept_cb(int fd)
{
    return 0;
}

int read_cb(int fd)
{
    return 0;
}

int write_cb(int fd)
{
    return 0;
}

int main()
{

    int sockfd = socket(AF_INET, SOCK_STREAM, 0);
    if(-1 == sockfd)
    {
        perror("socket");
        return -1;
    }

    struct sockaddr_in serveraddr;
    serveraddr.sin_family = AF_INET;
    serveraddr.sin_addr.s_addr = htonl(INADDR_ANY);
    serveraddr.sin_port = htons(2048);

    if(-1 == bind(sockfd, (struct sockaddr*)&serveraddr, sizeof(serveraddr)))
    {
        perror("bind");
        return -1;
    }

    listen(sockfd, 10);

    // 将sockfd放入到监视集合中
    connlist[sockfd].fd = sockfd;
    connlist[sockfd].accept_callback = accept_cb;

    // 这里我们使用epoll
    int epfd = epoll_create(1);
    struct epoll_event ev;
    ev.data.fd = sockfd;
    ev.events = EPOLLIN;

    epoll_ctl(epfd, EPOLL_CTL_ADD, sockfd, &ev);

    struct epoll_event events[1024] = {0};  // 就绪事件
    while(1)
    {
        int nready = epoll_wait(epfd, events, 1024, -1);

        for(int i = 0; i < nready; ++i)
        {
            // 发生什么事件,就处理什么事件
            if(events[i].events & EPOLLIN)
            {
                // 连接事件
                if(events[i].data.fd == sockfd)
                {
                    // struct sockaddr_in clientaddr;
                    // socklen_t len = sizeof(clientaddr);
                    // int connfd = accept(sockfd, (struct sockaddr*)&clientaddr, &len);

                    // ev.data.fd = connfd;
                    // ev.events = EPOLLIN;

                    // epoll_ctl(epfd, EPOLL_CTL_ADD, connfd, &ev);

                    // connlist[connfd].fd = connfd;
                    // connlist[connfd].read_callback = read_cb;
                    // connlist[connfd].write_callback = write_cb;

                    connlist[sockfd].accept_callback(sockfd);
                }
                else 
                {
                    // 读事件
                    int connfd = events[i].data.fd;
                    connlist[connfd].read_callback(connfd);
                }
            }
            else if(events[i].events & EPOLLOUT)
            {
                // 写事件
                int connfd = events[i].data.fd;
                connlist[connfd].write_callback(connfd);
            }
        }
    }


    return 0;
}

        stop06:现在我们做完了大部分工作了,只需要将回调函数实现就可以了。之后我们就开始优化我们打代码。

#include <sys/socket.h>
#include <errno.h>
#include <netinet/in.h>
#include <stdio.h>
#include <string.h>
#include <unistd.h>
#include <sys/poll.h>
#include <sys/epoll.h>

#define BUFFER_LENGTH		1024

typedef int (*RCALLBACK)(int fd);

int accept_cb(int fd);
int read_cb(int fd);
int write_cb(int fd);


struct conn_item
{
    int fd;   // 一个IO事件对应的文件描述符    
                              
    char rbuffer[BUFFER_LENGTH];        // 读缓冲区
    int rlen;
    char wbuffer[BUFFER_LENGTH];        // 写缓冲区
    int wlen;

    // 事件处理 - 连接、读事件、写事件
    RCALLBACK accept_callback;
    RCALLBACK read_callback;
    RCALLBACK write_callback;
};

struct conn_item connlist[1024] = {0};
int epfd;

int accept_cb(int fd)
{
    // 创建连接
    struct sockaddr_in clientaddr;
    socklen_t len = sizeof(clientaddr);
    int connfd = accept(fd, (struct sockaddr*)&clientaddr, &len);

    struct epoll_event ev;
    ev.data.fd = connfd;
    ev.events = EPOLLIN;

    epoll_ctl(epfd, EPOLL_CTL_ADD, connfd, &ev);

    connlist[connfd].fd = connfd;
    connlist[connfd].read_callback = read_cb;
    connlist[connfd].write_callback = write_cb;

    return connfd;
}

int read_cb(int fd)
{
    char* buffer = connlist[fd].rbuffer;
    int index = connlist[fd].rlen;

    int count = recv(fd, buffer + index, BUFFER_LENGTH - index, 0);
    if(0 == count)
    {
        epoll_ctl(epfd, EPOLL_CTL_DEL, fd, NULL);
        close(fd);
        return 0;
    }
    connlist[fd].rlen += count;

    // TODO
    printf("buffer:%s\n", buffer);
    memcpy(connlist[fd].wbuffer, buffer, count);
    connlist[fd].wlen = connlist[fd].rlen;

    struct epoll_event ev;
    ev.data.fd = fd;
    ev.events = EPOLLOUT;
    epoll_ctl(epfd, EPOLL_CTL_MOD, fd, &ev);

    return count;
}

int write_cb(int fd)
{
    char* buffer = connlist[fd].wbuffer;
    int index = connlist[fd].wlen;

    printf("%s\n", buffer);
    int count = send(fd, buffer, index, 0);

    struct epoll_event ev;
    ev.data.fd = fd;
    ev.events = EPOLLIN;
    epoll_ctl(epfd, EPOLL_CTL_MOD, fd, &ev);

    return count;
}

int main()
{

    int sockfd = socket(AF_INET, SOCK_STREAM, 0);
    if(-1 == sockfd)
    {
        perror("socket");
        return -1;
    }

    struct sockaddr_in serveraddr;
    serveraddr.sin_family = AF_INET;
    serveraddr.sin_addr.s_addr = htonl(INADDR_ANY);
    serveraddr.sin_port = htons(2480);

    if(-1 == bind(sockfd, (struct sockaddr*)&serveraddr, sizeof(serveraddr)))
    {
        perror("bind");
        return -1;
    }

    listen(sockfd, 10);

    // 将sockfd放入到监视集合中
    connlist[sockfd].fd = sockfd;
    connlist[sockfd].accept_callback = accept_cb;

    // 这里我们使用epoll
    epfd = epoll_create(1);
    struct epoll_event ev;
    ev.data.fd = sockfd;
    ev.events = EPOLLIN;

    epoll_ctl(epfd, EPOLL_CTL_ADD, sockfd, &ev);

    struct epoll_event events[1024] = {0};  // 就绪事件
    while(1)
    {
        int nready = epoll_wait(epfd, events, 1024, -1);

        for(int i = 0; i < nready; ++i)
        {
            // 发生什么事件,就处理什么事件
            if(events[i].events & EPOLLIN)
            {
                // 连接事件
                if(events[i].data.fd == sockfd)
                {
                    connlist[sockfd].accept_callback(sockfd);
                }
                else 
                {
                    // 读事件
                    int connfd = events[i].data.fd;
                    connlist[connfd].read_callback(connfd);
                }
            }
            else if(events[i].events & EPOLLOUT)
            {
                // 写事件
                int connfd = events[i].data.fd;
                connlist[connfd].write_callback(connfd);
            }
        }
    }

    close(sockfd);

    return 0;
}

      

  stop07:接下来我们来优化我们的代码,先在代码层进行。把一些公共的操作提取出来

  很明显,我们可以看到大量的epoll操作,这部分可以提到一个函数中:

void setEpoll(int fd, int mode)
{
    switch (mode) 
    {
        case 1:  // 添加
        {
            struct epoll_event ev;
            ev.data.fd = fd;
            ev.events = EPOLLIN;
            epoll_ctl(epfd, EPOLL_CTL_ADD, fd, &ev);
            break;
        }
        case 2:  // EPOLLOUT --> EPOLLIN
        {
            struct epoll_event ev;
            ev.data.fd = fd;
            ev.events = EPOLLIN;
            epoll_ctl(epfd, EPOLL_CTL_MOD, fd, &ev);
            break;
        }
        case 3:  // EPOLLIN --> EPOLLOUT
        {
            struct epoll_event ev;
            ev.data.fd = fd;
            ev.events = EPOLLOUT;
            epoll_ctl(epfd, EPOLL_CTL_MOD, fd, &ev);
            break;
        }
        case 4:  // 删除
        {
            epoll_ctl(epfd, EPOLL_CTL_DEL, fd, NULL);
            break;
        }
        default:
            break;
    }
}

观察者一块,能不能融到一起

        我们想一想,这两个逻辑是不会同时发生的,而且发的IO事件肯定不是同一个,因为accept_callback是在sockfd中发生的,而read_callback是在connfd中发生的。每一个IO对应一个conn_item。所以这个很适合使用联合体进行。

【联合体:允许在同一内存空间中存储不同数据类型的成员变量,但同一时间只能使用其中一个成员变量。联合体的所有成员变量从同一个地址开始,占用的空间大小取最大成员变量的大小,这使得联合体在底层内存上非常高效】

        更有甚者,连接事件在epoll中也是读事件,而且使用是函数指针,发生事件的IO又不同,所以我们可以这样设计:(其他地方代码最相应修改即可)

struct conn_item
{
    int fd;   // 一个IO事件对应的文件描述符    
                              
    char rbuffer[BUFFER_LENGTH];        // 读缓冲区
    int rlen;
    char wbuffer[BUFFER_LENGTH];        // 写缓冲区
    int wlen;

    // 事件处理 - 连接、读事件、写事件
    // union
    // {
    //     RCALLBACK accept_callback;
    //     RCALLBACK read_callback;
    // }recv_t;

    // RCALLBACK accept_callback;
    RCALLBACK read_callback;
    RCALLBACK write_callback;
};

完整代码

#include <sys/socket.h>
#include <errno.h>
#include <netinet/in.h>
#include <stdio.h>
#include <string.h>
#include <unistd.h>
#include <sys/poll.h>
#include <sys/epoll.h>

#define BUFFER_LENGTH		1024

typedef int (*RCALLBACK)(int fd);

int read_cb(int fd);
int write_cb(int fd);
void setEpoll(int fd, int mode);

struct conn_item
{
    int fd;   // 一个IO事件对应的文件描述符    
                              
    char rbuffer[BUFFER_LENGTH];        // 读缓冲区
    int rlen;
    char wbuffer[BUFFER_LENGTH];        // 写缓冲区
    int wlen;

    RCALLBACK read_callback;
    RCALLBACK write_callback;
};

struct conn_item connlist[1024] = {0};
int epfd;

int accept_cb(int fd)
{
    // 创建连接
    struct sockaddr_in clientaddr;
    socklen_t len = sizeof(clientaddr);
    int connfd = accept(fd, (struct sockaddr*)&clientaddr, &len);
    
    printf("connfd:%d\n", connfd);

    connlist[connfd].fd = fd;
    connlist[connfd].read_callback = read_cb;
    connlist[connfd].write_callback = write_cb;

    setEpoll(connfd, 1);

    return connfd;
}

int read_cb(int fd)
{
    char* buffer = connlist[fd].rbuffer;
    int index = connlist[fd].rlen;

    int count = recv(fd, buffer + index, BUFFER_LENGTH - index, 0);
    if(0 == count)
    {
        setEpoll(fd, 4);
        close(fd);
        return 0;
    }
    connlist[fd].rlen += count;

    // TODO
    printf("buffer:%s\n", buffer);
    memcpy(connlist[fd].wbuffer, buffer, count);
    connlist[fd].wlen = connlist[fd].rlen;

    setEpoll(fd, 3);

    return count;
}

int write_cb(int fd)
{
    char* buffer = connlist[fd].wbuffer;
    int index = connlist[fd].wlen;

    printf("%s\n", buffer);
    int count = send(fd, buffer, index, 0);

    setEpoll(fd, 2);

    return count;
}

void setEpoll(int fd, int mode)
{
    switch (mode) 
    {
        case 1:  // 添加
        {
            struct epoll_event ev;
            ev.data.fd = fd;
            ev.events = EPOLLIN;
            epoll_ctl(epfd, EPOLL_CTL_ADD, fd, &ev);
            break;
        }
        case 2:  // EPOLLOUT --> EPOLLIN
        {
            struct epoll_event ev;
            ev.data.fd = fd;
            ev.events = EPOLLIN;
            epoll_ctl(epfd, EPOLL_CTL_MOD, fd, &ev);
            break;
        }
        case 3:  // EPOLLIN --> EPOLLOUT
        {
            struct epoll_event ev;
            ev.data.fd = fd;
            ev.events = EPOLLOUT;
            epoll_ctl(epfd, EPOLL_CTL_MOD, fd, &ev);
            break;
        }
        case 4:  // 删除
        {
            epoll_ctl(epfd, EPOLL_CTL_DEL, fd, NULL);
            break;
        }
        default:
            break;
    }
}

int main()
{

    int sockfd = socket(AF_INET, SOCK_STREAM, 0);
    if(-1 == sockfd)
    {
        perror("socket");
        return -1;
    }

    struct sockaddr_in serveraddr;
    serveraddr.sin_family = AF_INET;
    serveraddr.sin_addr.s_addr = htonl(INADDR_ANY);
    serveraddr.sin_port = htons(2480);

    if(-1 == bind(sockfd, (struct sockaddr*)&serveraddr, sizeof(serveraddr)))
    {
        perror("bind");
        return -1;
    }

    listen(sockfd, 10);

    // 将sockfd放入到监视集合中
    connlist[sockfd].fd = sockfd;
    connlist[sockfd].read_callback = accept_cb;

    // 这里我们使用epoll
    epfd = epoll_create(1);

    setEpoll(sockfd, 1);

    struct epoll_event events[1024] = {0};  // 就绪事件
    while(1)
    {
        int nready = epoll_wait(epfd, events, 1024, -1);

        for(int i = 0; i < nready; ++i)
        {
            // 发生什么事件,就处理什么事件
            int connfd = events[i].data.fd;
            if(events[i].events & EPOLLIN)
            {
                // 读事件
                connlist[connfd].read_callback(connfd);
            }
            else if(events[i].events & EPOLLOUT)
            {
                // 写事件
                connlist[connfd].write_callback(connfd);
            }
        }
    }

    close(sockfd);

    return 0;
}

后续 

        由于篇幅问题,已经在逐步实现reactor过程中设计到不同的知识点,我们不方便一次完成,如果感兴趣可以点个关注。

        后续文章将进行wsl测试已经对buffer进行优化,设计合理的用户缓冲区,而不是使用定长的buffer进行实现。

        然后会对代码进行调整,目前的设计不方便提取出来使用,我们最后会封装成一个库的方式,提供一个.h和.c文件,以此方便移植。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/1155809.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

【NI-DAQmx入门】传感器基础知识

1.什么是传感器&#xff1f; 传感器可将真实的现象&#xff08;例如温度或压力&#xff09;转换为可测量的电流和电压&#xff0c;因而对于数据采集应用必不可少。接下来我们将介绍您所需的测量类型及其对应的传感器类型。在开始之前&#xff0c;您还可以先了解一些传感器术语&…

优化python程序执行速度

1、问题背景 最近使用python编写的一个蓝牙应用程序&#xff0c;在使用过程中发现传输大量数据时会产生丢包&#xff0c;导致无法接收到完整的数据包。蓝牙接收程序的代码如下。 #蓝牙数据接收处理线程def bt_recv_thread(self):recv_time time.time()while(self.thread_run)…

BUUCTF 神秘龙卷风 1

BUUCTF:https://buuoj.cn/challenges 题目描述&#xff1a; 神秘龙卷风转转转&#xff0c;科学家用四位数字为它命名&#xff0c;但是发现解密后居然是一串外星人代码&#xff01;&#xff01;好可怕&#xff01; 密文&#xff1a; 下载附件&#xff0c;解压得到一个.rar压缩…

动态蛇形卷积管状结构分割【CVPR 2023】

论文下载地址&#xff1a;Excellent-Paper-For-Daily-Reading/nn-block at main 类别&#xff1a;模块 时间&#xff1a;2023/10/31 摘要 血管和道路等管状结构在各种临床和自然环境中具有极其重要的意义&#xff0c;在这些环境中&#xff0c;精确分割对于下游任务的准确性…

C++ Set

定义 set不同于vector,strin,list这种存储容器&#xff0c;set是一种关联式容器&#xff0c;底层是搜二叉&#xff1b; 功能 set可以确定唯一的值&#xff0c;可以排序去重。 接口 insert() #include <iostream> #include<set> using namespace std;int main…

机泵设备如何通过设备健康管理平台实施预测性维护

机泵设备在工业生产中起着至关重要的作用&#xff0c;但长时间运行和频繁使用容易引发各种故障。为了提高机泵设备的可靠性和效率&#xff0c;预测性维护成为一种重要的管理策略。设备健康管理平台作为一种先进的工具&#xff0c;为机泵设备的预测性维护提供了有力支持。本文将…

第七届山东省黄炎培职业教育创新创业大赛圆满结束

山东省黄炎培职业教育创新创业大赛作为职教领域的一项品牌赛事&#xff0c;自举办以来&#xff0c;参赛院校覆盖面不断扩大&#xff0c;大赛水平和社会影响力不断提高&#xff0c;已成为全省职业教育领域的品牌赛事&#xff0c;是激发创新创业活力的重要抓手和有效载体&#xf…

UIAlertController 修改 title 或 message 样式相关

UIAlertController 文字换行后默认对齐方式为居中,若想调整其相关样式属性可以借鉴如下方式进行修改,具体实现方式 code 如下: NSString *msg "1、注销≠退出登录;\n注销:对不再使用的账号进行清空移除;注销后,App中数据将全部丢失,不可再找回;\n2、注销后,与账号相关的…

【Linux】配置JDKTomcat开发环境及MySQL安装和后端项目部署

目录 一、jdk安装配置 1. 传入资源 2. 解压 3. 配置 二、Tomcat安装 1. 解压开启 2. 开放端口 三、MySQL安装 1. 解压安装 2. 登入配置 四、后端部署 1. 数据库 2. 导入.war包 3. 修改端口 4.开启访问 一、jdk安装配置 打开虚拟机 Centos 登入账号&#xff…

控梦术(一)之什么是清明梦

控梦术 首先&#xff0c;问大家一个问题。在梦中&#xff0c;你知道自己是在做梦吗&#xff1f;科学数据表明&#xff0c;大约23%的人在过去一个月中&#xff0c;至少有一次在梦中意识到自己正在做梦。科学家把这叫做清醒梦或者叫做清明梦。科学家说&#xff0c;每个人都能学会…

【C++ 系列文章 -- 程序员考试 下午场 C++ 专题 201711 】

文章目录 1.1 C 题目六1.1.1 填空&#xff08;1&#xff09;详解1.1.2 填空&#xff08;2&#xff09;详解1.1.2.1 C this 的使用 1.1.3 填空&#xff08;3&#xff09;详解1.1.4 填空&#xff08;4&#xff09;详解1.1.5 填空&#xff08;5&#xff09;详解1.1.6 填空&#xf…

心理咨询预约小程序

随着微信小程序的日益普及&#xff0c;越来越多的人开始关注如何利用小程序来提供便捷的服务。对于心理咨询行业来说&#xff0c;搭建一个心理咨询预约小程序可以大大提高服务的效率和用户体验。本文以乔拓云平台为例&#xff0c;详细介绍如何轻松搭建一个心理咨询预约小程序。…

2023-简单点-yolox代码

yolox代码 yolox结构瞄一眼net代码常规convDWConv第一步第二步 CBA一套FocusSPPCSPDarknetFPNPANdecoupled predict headref yolox结构瞄一眼 net代码 #!/usr/bin/env python3 # -*- coding:utf-8 -*- # Copyright (c) Megvii, Inc. and its affiliates.import torch from tor…

JMeter如何开展性能测试

文章目录 性能测试指标理解透彻以及测算微聊性能测试性能测试流程准备流程 ​&#x1f451;作者主页&#xff1a;Java冰激凌 性能测试指标理解透彻以及测算 虚拟用户数&#xff1a; 线程 用户并发数&#xff1a;指在某一时间&#xff0c;一定数量的虚拟用户同时对系统的某个功…

k8s、pod

Pod k8s中的port【端口&#xff1a;30000-32767】 port &#xff1a;为Service 在 cluster IP 上暴露的端口 targetPort&#xff1a;对应容器映射在 pod 端口上 nodePort&#xff1a;可以通过k8s 集群外部使用 node IP node port 访问Service containerPort&#xff1a;容…

【C++】多态 ⑦ ( 多态机制实现原理 | 虚函数表概念 | 虚函数表工作机制 | vptr 指针 | 虚函数表运行时机制 | 虚函数与动态联编 )

文章目录 一、多态原理1、多态成立的三个条件2、虚函数表概念3、虚函数表工作机制4、vptr 指针5、虚函数表运行时机制6、虚函数与动态联编 二、代码示例 - 虚函数表1、代码实例分析 - 虚函数表创建与使用2、完整代码示例 一、多态原理 1、多态成立的三个条件 " 多态 "…

BoredHackerBlog: Cloud AV RT日记

目录 信息搜集 WEB漏洞攻击 拿shell 信息搜集 首先ifconfig查看自己IP&#xff0c; netdiscover查看同网段下主机 第三个应该是目标靶机。用nmap查看靶机开放端口&#xff1a; 开放22和8080&#xff0c;看看8080开的啥服务 WEB漏洞攻击 看到让我们输入邀请码。有输入框的第…

【原创】java+swing+mysql无偿献血管理系统设计与实现

摘要&#xff1a; 无偿献血管理系统是为了实现无偿献血规范化、有序化、高效化的管理而设计的。本文主要介绍使用java语言开发一个基于C/S架构的无偿献血管理系统&#xff0c;提高无偿献血管理的工作效率。 功能分析&#xff1a; 系统主要提供给管理员、无偿献血人员&#x…

C语言实现把两个升序数组合并成一个升序数组

完整代码&#xff1a; // 把两个升序数组合并成一个升序数组 #include<stdio.h> //单个数组的长度 #define N 5int main(){int arr1[N]{1,4,7,8,9};int arr2[N]{2,3,6,9,10};//创建合并后数组int arr3[2*N];//j为arr1的指针&#xff0c;k为arr2的指针int j0,k0;printf(&…

Emscripten + CMakeLists.txt 将 C++ 项目编译成 WebAssembly(.wasm)/js,并编译 Html 测试

背景&#xff1a;Web 端需要使用已有的 C 库&#xff08;使用 CMake 编译&#xff09;&#xff0c;需要将 C 项目编译成 WebAssembly(.wasm) 供 js 调用。 上篇文章《Mac 上安装 Emscripten》 已讲解如何安装配置 Emscripten 环境。 本篇文章主要讲解如何将基于 CMakeLists 配…