以下是在零声教育的听课记录。
如有侵权,请联系我删除。
链接:零声教育官网
一、网络io与select,poll。epoll
网络IO ,会涉及到两个系统对象 一个是 用户空间 调用 IO 的进程或者线程,另一个是 内核空间的 内核系统, 比如 发生 IO 操作 read 时,它会经历两个阶段:
- 等待数据准备就绪
- 将数据 从内核拷贝到进程 或者线程 中。
因为在以上两个阶段上各有不同的情况,所以出现了多种网络IO 模型
1. 五种IO 网络模型
1. 1 阻塞IO blocking IO
在 linux 中,默认情况下所有的 socket 都是 blocking ,一个典型的读操作流程
当用户进程调用了read 这个系统调用, kernel(内核) 就开始了 IO 的第一个阶段:准备数据。对于network IO 来说,很多时候数据在一开始还没有到达(比如,还没有收到一个完整的数据包),这个时候 kernel 就要等待足够的数据到来。而在用户进程这边,整个进程会被阻塞。当 kernel一直等到数据准备好了,它就会将数据从 kernel 中拷贝到用户内存,然后 kernel 返回结果,用户进程才解除 block 的状态,重新运行起来。
所以, blocking IO 的特点就是在 IO 执行的两个阶段(等待数据和拷贝数据两个阶段)都被block 了。
几乎所有的程序员第一次接触到的网络编程都是从 listen() 、 send() 、 recv() 等接口开始的,这些接口都是阻塞型的。使用这些接口可以很方便的构建服务器 客户机的模型。下面是一个简单地“一问一答”的服务器。
大部分的socket 接口都是阻塞型的。所谓阻塞型接口是指系统调用(一般是IO 接口)不返回调用结果并让当前线程一直阻塞,只有当该系统调用获得结果或者超时出错时才返回。
实际上,除非特别指定,几乎所有的IO 接口 ( 包括socket 接口 ) 都是阻塞型的。这给网络编程带来了一个很大的问题,如在调用send()的同时,线程将被阻塞,在此期间,线程将无法执行任何运算或响应任何的网络请求。
一个简单的改进方案是在服务器端使用多线程(或多进程)。多线程(或多进程)的目的是让每个连接都拥有独立的线程(或进程),这样任何一个连接的阻塞都不会影响其他的连接。具体使用多进程还是多线程,并没有一个特定的模式。
传统意义上,进程的开销要远远大于线程,所以如果需要同时为较多的客户机提供服务,则不推荐使用多进程;如果单个服务执行体需要消耗较多的CPU 资源,譬如需要进行大规模或长时间的数据运算或文件访问,则进程较为安全。通常,使用pthread_create ()创建新线程,fork()创建新进程。
我们假设对上述的服务器 / 客户机模型,提出更高的要求,即让服务器同时为多个客户机提供一问一答的服务。于是有了如下的模型。
在上述的线程 / 时间图例中,主线程持续等待客户端的连接请求,如果有连接,则创建新线程,并在新线程中提供为前例同样的问答服务。
很多初学者可能不明白为何一个socket 可以accept 多次。实际上socket 的设计者可能特意为多客户机的情况留下了伏笔,让accept()能够返回一个新的socket。下面是accept 接口的原型:
int accept(int sockfd, struct sockaddr *addr, socklen_t *addrlen);
- 功能:接收客户端连接,默认是一个阻塞的函数,阻塞等待客户端连接
- 参数:
- sockfd : 用于监听的文件描述符
- addr : 传出参数,记录了连接成功后客户端的地址信息(ip,port)
- addrlen : 指定第二个参数的对应的内存大小
- 返回值:
- 成功 :用于通信的文件描述符
- -1 : 失败
输入参数sockfd 是从socket(),bind()和listen()中沿用下来的socket 句柄值(其实就是文件描述符)。执行完bind()和listen()后,操作系统已经开始在指定的端口处监听所有的连接请求,如果有请求,则将该连接请求加入请求队列。调用accept()接口正是从 socket 中sockfd 的请求队列抽取第一个连接信息,创建一个与sockfd同类的新的socket 返回句柄。新的socket 句柄即是后续read()和recv()的输入参数。如果请求队列当前没有请求,则accept() 将进入阻塞状态直到有请求进入队列。
上述多线程的服务器模型似乎完美的解决了为多个客户机提供问答服务的要求,但其实并不尽然。如果要同时响应成百上千路的连接请求,则无论多线程还是多进程都会严重占据系统资源,降低系统对外界响应效率,而线程与进程本身也更容易进入假死状态。
很多程序员可能会考虑使用“线程池”或“连接池”。“线程池”旨在减少创建和销毁线程的频率,其维持一定合理数量的线程,并让空闲的线程重新承担新的执行任务。“连接池”维持连接的缓存池,尽量重用已有的连接、减少创建和关闭连接的频率。这两种技术都可以很好的降低系统开销,都被广泛应用很多大型系统,如websphere、tomcat 和各种数据库等。但是,“线程池”和“连接池”技术也只是在一定程度上缓解了频繁调用IO 接口带来的资源占用。而且,所谓“池”始终有其上限,当请求大大超过上限时,“池”构成的系统对外界的响应并不比没有池的时候效果好多少。所以使用“池”必须考虑其面临的响应规模,并根据响应规模调整“池”的大小。
对应上例中的所面临的可能同时出现的上千甚至上万次的客户端请求, 线程池 或 连接池 或许可以缓解部分压力,但是不能解决所有问题。总之,多线程模型可以方便高效的解决小规模的服务请求,但面对大规模的服务请求,多线程模型也会遇到瓶颈,可以用非阻塞接口来尝试解决这 个问题。
1.2 非阻塞 IO non blocking IO
Linux 下,可以通过设置 socket 使其变为 non blocking 。当对一个 non blocking socket 执行读操作时,流程是这个样子:
从图中可以看出,当用户进程发出read 操作时,如果 kernel(内核) 中的数据还没有准备好,那么它并不会 block 用户进程,而是立刻返回一个 error 。从用户进程角度讲 ,它发起一个read 操作后,并不需要等待,而是马上就得到了一个结果。用户进程判断结果是一个 error时,它就知道数据还没有准备好,于是它可以再次发送 read 操作。一旦 kernel 中的数据准备好了,并且又再次收到了用户进程的 system call ,那么它马上就将数据拷贝到了用户内存,然后返回 所以,在非阻塞式 IO 中,用户进程其实是需要不断的主动询问 kernel数据准备好了没有。
在非阻塞状态下, recv() 接口在被调用后立即返回,返回值代表了不同的含义。如在本例中,
- recv()返回值大于 0 ,表示接受数据完毕,返回值即是接受到的字节数
- recv()返回 0 ,表示连接已经正常断开
- recv() 返回 -1 ,且 errno 等于 EAGAIN ,表示 recv 操作还没执行完成;
- recv()返回 -1 ,且 errno 不等于 EAGAIN ,表示 recv 操作遇到系统错误 errno 。非阻塞的接口相比于阻塞型接口的显著差异在于,在被调用之后立即返回。使用如下的函数可以将某句柄 fd 设为非阻塞状态。
fcntl ( fd, F_SETFL, O_NONBLOCK);
下面将给出只用一个线程,但能够同时从多个连接中检测数据是否送达,并且接受数据的模型。
可以看到服务器线程可以通过循环调用recv()接口,可以在单个线程内实现对所有连接的数据接收工作。但是上述模型绝不被推荐。因为,循环调用recv()将大幅度推高CPU占用率;此外,在这个方案中recv()更多的是起到检测“操作是否完成”的作用,实际操作系统提供了更为高效的检测“操作是否完成“作用的接口,例如select()多路复用模式,可以一次检测多个连接是否活跃。
1.3 多路复用IO(IO multiplexing)
IO multiplexing 这个词可能有点陌生,但是提到select/epoll,大概就都能明白了。有些地方也称这种IO 方式为事件驱动IO(event driven IO)。我们都知道,select/epoll 的好处就在于单个process 就可以同时处理多个网络连接的IO。它的基本原理就是select/epoll 这个function会不断的轮询所负责的所有socket,当某个socket 有数据到达了,就通知用户进程。它的流程如图:
当用户进程调用了select ,那么整个进程会被 block ,而同时 kernel 会 监视 所有 select 负责的 socket ,当任何一个 socket 中的数据准备好了, select 就会返回。这个时候用户进程再调用 read 操作,将数据从 kernel 拷贝到用户进程。
这个图和blocking IO 的图其实并没有太大的不同,事实上还更差一些。因为这里需要使用两个系统调用 (select 和 read )),而 blocking IO 只调用了一个系统调用 read 。但是使用 select 以后最大的优势是用户可以在一个线程内同时处理多个 socket 的 IO 请求。用户可以注册多个 socket ,然后不断 地调用 select 读取被激活的 socket ,即可达到在同一个线程内同时处理多个 IO 请求的目的。而在同步阻塞模型中,必须通过多线程的方式才能达到这个目的。(多说一句:所以,如果处理的连接数不是很高的话,使用select/epoll 的 web server 不一定比使用 multi threading + blocking IO 的 webserver 性能更好,可能延迟还更大。 select/epoll 的优势并不是对于单个连接能处理得更快,而是在于能处理更多的连接。)
在多路复用模型中,对于每一个socket ,一般都设置 成为 non blocking ,但是,如上图所示,整个用户的 process 其实是一直被 block 的。只不过 process 是被 select 这个函数 block ,而不是被 socket IO 给 block 。因此 select() 与非阻塞 IO 类似。
大部分 Unix/Linux 都支持 select 函数,该函数用于探测多个文件句柄的状态变化。下面给出 select 接口的原型:
FD_ZERO( int fd, fd_set* fds);
FD_SET( int fd, fd_set* fds);
FD_ ISSET( int fd, fd_set* fds);
FD_CLR( int fd, fd_set* fds);
int select (int nfds, fd_set *readfds, fd_set *writefds, fd_set* exceptfds, struct timeval *timeout)
这里, fd_set 类型可以简单的理解为按 bit 位标记句柄的队列,例如要在某 fd_set中标记一个值为 16 的句柄,则该 fd_set 的第 16 个 bit 位被标记为 1 。具体的置位、验证可使用 FD_SET 、 FD_ISSET 等宏实现。在 select() 函数中, readfds 、 writefds 和exceptfds 同时作为输入参数和输出参数。如果输入的 readfds 标记了 16 号句柄,则select() 将检测 16 号句柄是否可读。在 select() 返回后,可以通过检查 readfds 有否标记 16 号句柄,来判断该 可读 事件是否发生。另外,用户可以设置 timeout 时间。
下面将重新模拟上例中从多个客户端接收数据的模型。
上述模型只是描述了使用select()接口同时从多个客户端接收数据的过程;由于select()
接口可以同时对多个句柄进行读状态、写状态和错误状态的探测,所以可以很容易构建为多
个客户端提供独立问答服务的服务器系统。
这里需要指出的是,客户端的一个 connect() 操作,将在服务器端激发一个“可读事件”,所以 select() 也能探测来自客户端的 connect() 行为。
上述模型中,最关键的地方是如何动态维护select()的三个参数readfds、writefds和exceptfds。作为输入参数,readfds 应该标记所有的需要探测的“可读事件”的句柄,其中永远包括那个探测 connect() 的那个“母”句柄;同时,writefds 和 exceptfds 应该标记所有需要探测的“可写事件”和“错误事件”的句柄 ( 使用 FD_SET() 标记 )。
作为输出参数,readfds、writefds 和exceptfds 中的保存了 select() 捕捉到的所有事件的句柄值。程序员需要检查的所有的标记位 ( 使用FD_ISSET()检查 ),以确定到底哪些句柄发生了事件。
上述模型主要模拟的是“一问一答”的服务流程,所以如果select()发现某句柄捕捉到了"可读事件 ””,服务器程序应及时做 recv() 操作,并根据接收到的数据准备好待发送数据,并将对应的句柄值加入 writefds ,准备下一次的 可写事件 的 select() 探测。同样,如果 select() 发现某句柄捕捉到 可写事件 ””,则程序应及时做 send() 操作,并准备好下一次的 可 读事件 探测准备。
这种模型的特征在于每一个执行周期都会探测一次或一组事件,一个特定的事件会触发某个特定的响应。我们可以将这种模型归类为 事件驱动模型 。
相比其他模型,使用 select() 的事件驱动模型只用单线程(进程)执行,占用资源少,不消耗太多 CPU ,同时能够为多客户端提供服务。如果试图建立一个简单的事件驱动的服务器程序,这个模型有一定的参考价值。
但这个模型依旧有着很多问题。首先select() 接口并不是实现 事件驱动 的最好选择。因为当需要探测的句柄值较大时, select() 接口本身需要消耗大 量时间去轮询各个句柄。很 多操作系统提供了更为高效的接口,如 linux 提供了 epoll BSD 提供了 kqueue Solaris提供了 /dev/poll 。如果需要实现更高效的服务器程序,类似 epoll 这样的接口更被推荐。遗憾的是不同的操作系统特供的 epoll 接口有很大差异,所以使用类似于 epoll 的接口实现具有较好跨平台能力的服务器会比较困难。
其次,该模型将事件探测和事件响应夹杂在一起,一旦事件响应的执行体庞大,则对整个模型是灾难性的。如下例,庞大的执行体 1 的将直接导致响应事件 2 的执行体迟迟得不到执行,并在很大程度上降低了事件探测的及时性。
幸运的是,有很多高效的事件驱动库可以屏蔽上述的困难,常见的事件驱动库有libevent 库,还有作为 libevent 替代者的 libev 库。这些库会根据操作系统的特点选择最合适的事件探测接口,并且加入了信号 (signal) 等技术以支持异步响应,这使得这些库成为构建事件驱动模型的不二选择。下章将介绍如何使用 libev 库替换 select 或 epoll
接口,实现高效稳定的服务器模型。
实际上, Linux 内核从 2.6 开始,也引入了支持异步响应的 IO 操作,如 aio_read,aio_write ,这就是异步 IO 。
1.4 异步 IO Asynchronous I/O
Linux 下的 asynchronous IO 用在磁盘 IO 读写操作 ,不用于网络 IO ,从内核 2.6 版本才开始引入。先看一下它的流程
Linux中,可以调用 aio_read 函数告诉内核描述字缓冲区指针和缓冲区的大小、文件偏移及通知的方式,然后立即返回,当内核将数据拷贝到缓冲区后,再通知应用程序。
用户进程发起read 操作之后,立刻就可以开始去做其它的事。而另一方面,从 kernel的角度,当它受到一个 asynchronous read 之后,首先它会立刻返回,所以不会对用户进程产生任何 block 。然后, kernel 会等待数据准备完成,然后将数据拷贝到用 户内存,当这一切都完成之后, kernel 会给用户进程发送一个 signal ,告诉它 read 操作完成了。
用异步IO 实现的服务器这里就不举例了,以后有时间另开文章来讲述。异步 IO 是真正非阻塞的,它不会对请求进程产生任何的阻塞,因此对高并发的网络服务器实现至关重要。
到目前为止,已经将四个IO 模型都介绍完了。现在回过头来回答最初的那几个问题:blocking 和 non blocking 的区别在哪, synchronous IO 和 asynchronous IO 的区别在哪。
先回答最简单的这个:blocking 与 non blocking 。前面的介绍中其实已经很明确的说明了这两者的区别。调用 blocking IO 会一直 block 住对应的进程直到操作完成,而non blocking IO 在 kernel 还在准备数据的情况下会立刻返回。
两者的区别就在于 synchronous IO 做 ”IO 的时候会将 process 阻塞。按照这个定义,之前所述的 blocking IO non blocking IO IO multiplexing 都属于synchronous IO 。有人可能会说, non blocking IO 并没有被 block 啊。这里有个非常狡猾 的地方,定义中所指的 ”IO 是指真实的 IO 操作,就是例子中的 read 这个系统调用。 non blocking IO 在执行 read 这个系统调用的时候,如果 kernel 的数据没有准备好,这时候不会 block 进程。但是当 kernel 中数据准备好的时候, read 会将数据从 kernel 拷贝到用户内存中,这个时候进程是被 block 了,在这段时间内进程是被 block
的。 而 asynchronous IO 则不一样,当进程发起 IO 操作之后,就直接返回再也不理睬了,直到 kernel 发送一个信号,告诉进程说 IO 完成。在这整个过程中,进程完全没有被 block 。
1.5 信号驱动 IO signal driven I/O SIGIO(不常用)
首先我们允许套接口进行信号驱动 I/O, 并安装一个信号处理函数,进程继续运行并不阻塞。当数据准备好时,进程会收到一个 SIGIO 信号,可以在信号处理函数中调用 I/O 操作函数处理数据。当数据报准备好读取时,内核就为该进程产生一个 SIGIO 信号。我们随后既可以在信号处理函数中调用 read 读取数据报,并通知主循环数据已准备好待处理,也可以立即通知主循环,让它来读取数据报。无论如何处理 SIGIO 信号,这种模型的优势在于等待数据报到达 第一阶段 期间,进程可以继续执行,不被阻塞。免去了 select 的阻塞与轮询,当有活跃套接字时,由注册的 handler 处理。
经过上面的介绍,会发现 non blocking IO 和 asynchronous IO 的区别还是很明显的。在non blocking IO 中,虽然进程大部分时间都不会被 block ,但是它仍然要求进程去主动的 check, 并且当数据准备完成以后,也需要进程主动的再次调用 recvfrom 来将数据拷贝到用户内存。而 asynchronous IO 则完全不同。它就像是用户进程将整个 IO 操作交给了他人( kernel )完成,然后他人做完后发信号通知。在此期间,用户进程不需要去检查 IO 操作的状态,也不需要主动的去拷贝数据。
内核在第一个阶段是异步,在第二个阶段是同步;与非阻塞IO的区别在于它提供了消息通知机制,不需要用户进程不断的轮询检查,减少了系统API的调用次数,提高了效率。
2. 服务器模型 Reactor 与 Proactor
对高并发编程网络连接上的消息处理,可以分为两个阶段:等待消息准备好、消息处理。当使用默认的阻塞套接字时(例如上面提到的 1 个线程捆绑处理 1 个连接),往往是把这两个阶段合而为 一,这样操作套接字的代码所在的线程就得睡眠来等待消息准备好,这导致了高并发下线程会频繁的睡眠、唤醒,从而影响了 CPU 的使用效率。
高并发编程方法当然就是把两个阶段分开处理。即,等待消息准备好的代码段,与处理消息的代码段是分离的。当然,这也要求套接字必须是非阻塞的,否则,处理消息的代码段很容易导致条件不满足时,所在线程又进入了睡眠等待阶段。那么问题来了,等待消息准备好这个阶段怎么实现?它毕竟还是等待,这意味着线程还是要睡眠的!解决办法就是,线程主动查询,或者让 1 个线程为所有连接而等待!这就是 IO 多 路复用了。多路复用就是处理等待消息准备好这件事的,但它可以同时处理多个连接!它也可能 等待 ””,所以它也会导致线程睡眠,然而这不要紧,因为它一对多、它可以监控所有连接。这样,当我们的线程被唤醒执行时,就一定是有一些连接准备好被我们的代码执行了。
作为一个高性能服务器程序通常需要考虑处理三类事件: I/O 事件,定时事件及信号。两种 高效 的事件处理模型: Reactor 和 Proactor 。
2.1 Reactor 模型
首先来回想一下普通函数调用的机制:程序调用某函数,函数执行,程序等待,函数将结果和控制权返回给程 序,程序继续处理。 Reactor 释义 “反应堆 ”,是一种事件驱动机制。和普通函数调用的不同之处在于:应用程序不是主动的调用某个 API 完成处理,而是恰恰相反, Reactor 逆置了事件处理流程,应用程序需要提供相应的接口并注册到 Reactor 上,如果相应的事件发生, Reactor 将主动调用应用程序注册的接口,这些接口又称为 回调函数 。
Reactor模式是处理并发 I/O 比较常见的一种模式,用于同步 I/O ,中心思想是将所有要处理的 I/O 事件注册到一个中心 I/O 多路复用器上,同时主线程 进程阻塞在多路复用器上;一旦有 I/O 事件到来或是准备就绪 文件描述符或 socket 可读、写 )),多路复用器返回并将事先注册的相应 I/O 事件分发到对应的处理器中。
Reactor 模型有三个重要的组件:
- 多路复用器:由操作系统提供,在linux 上一般是select, poll, epoll 等系统调用。
- 事件分发器:将多路复用器中返回的就绪事件分到对应的处理函数中。
- 事件处理器:负责处理特定事件的处理函数。
具体流程如下:- 注册读就绪事件和相应的事件处理器;
- 事件分离器等待事件;
- 事件到来,激活分离器,分离器调用事件对应的处理器;
- 事件处理器完成实际的读操作,处理读到的数据,注册新的事件,然后返还控制权。
Reactor 模式是编写高性能网络服务器的必备技术之一,它具有如下的优点:
- 响应快,不必为单个同步时间所阻塞,虽然Reactor 本身依然是同步的;
- 编程相对简单,可以最大程度的避免复杂的多线程及同步问题,并且避免了多线程/进程的切换开销;
- 可扩展性,可以方便的通过增加Reactor 实例个数来充分利用CPU 资源;
- 可复用性,reactor 框架本身与具体事件处理逻辑无关,具有很高的复用性;
Reactor 模型开发效率上比起直接使用IO 复用要高,它通常是单线程的,设计目标是希望单线程使用一颗CPU 的全部资源,但也有附带优点,即每个事件处理中很多时候可以不考虑共享资源的互斥访问。可是缺点也是明显的,现在的硬件发展,已经不再遵循摩尔定律,CPU 的频率受制于材料的限制不再有大的提升,而改为是从核数的增加上提升能力,当程序需要使用多核资源时,Reactor 模型就会悲剧, 为什么呢?
如果程序业务很简单,例如只是简单的访问一些提供了并发访问的服务,就可以直接开启多个反应堆,每个反应堆对应一颗CPU 核心,这些反应堆上跑的请求互不相关,这是完全可以利用多核的。例如Nginx 这样的http 静态服务器。
2.2 Proactor 模型
具体流程如下:
- 处理器发起异步操作,并关注I/O 完成事件
- 事件分离器等待操作完成事件
- 分离器等待过程中,内核并行执行实际的I/O 操作,并将结果数据存入用户自定义缓冲区,最后通知事件分离器读操作完成
- I/O 完成后,通过事件分离器呼唤处理器
- 事件处理器处理用户自定义缓冲区中的数据
从上面的处理流程,我们可以发现proactor 模型最大的特点就是使用异步I/O。所有的I/O 操作都交由系统提供的异步I/O 接口去执行。工作线程仅仅负责业务逻辑。在Proactor 中,用户函数启动一个异步的文件操作。同时将这个操作注册到多路复用器上。多路复用器并不关心文件是否可读或可写而是关心这个异步读操作是否完成。异步操作是操作系统完成,用户程序不需要关心。多路复用器等待直到有完成通知到来。当操作系统完成了读文件操作 将读到的数据复制到了用户先前提供的缓冲区之后,通知多路复用器相关操作已完成。多路复用器再调 用相应的处理程序,处理数据。
Proactor增加了编程的复杂度,但给工作线程带来了更高的效率。Proactor可以在系统态将读写优化,利用I/O并行能力,提供一个高性能单线程模型。在windows上,由于没有epoll这样的机制,因此提供了IOCP来支持高并发, 由于操作系统做了较好的优化,windows较常采用Proactor的模型利用完成端口来实现服务器。在linux上,在2.6内核出现了aio接口,但aio实际效果并不理想,它的出现,主要是解决poll性能不佳的问题,但实际上经过测试,epoll的性能高于poll+aio,并且aio不能处理accept,因此linux主要还是以Reactor模型为主。
在不使用操作系统提供的异步 I/O 接口的情况下,还可以使用 Reactor 来模拟 Proactor,差别是:使用异步接口可以利用系统提供的读写并行能力,而在模拟的情况下,这需要在用户态实现。具体的做法只需要这样:
- 注册读事件(同时再提供一段缓冲区)
- 事件分离器等待可读事件
- 事件到来,激活分离器,分离器(立即读数据,写缓冲区)调用事件处理器
- 事件处理器处理数据,删除事件 需要再用异步接 口注册
我们知道, Boost.asio 库采用的即为 Proactor 模型。不过 Boost.asio 库在 Linux 平台采用epoll 实现的 Reactor 来模拟 Proactor ,并且另外开了一个线程来完成读写调度。
2.3 同步 I/O 模拟 Proactor 模型
1. 主线程往 epoll 内核事件表中注册 socket 上的读就绪事件。
2. 主线程调用 epoll_wait 等待 socket 上有数据可读。
3. 当 socket 上有数据可读时, epoll_wait 通知主线程。主线程从 socket 循环读取数据,直到没有更多数据 可读,然后将读取到的数据封装成一个请求对象并插入请求队列。
4. 睡眠在请求队列上的某个工作线程被唤醒,它获得请求对象并处理客户请求,然后往 epoll 内核事件表中注册 socket 上的写就绪事件。
5. 主线程调用 epoll_wait 等待 socket 可写。
6. 当 socket 可写时, epoll_wait 通知主线程。主线程往 socket 上写入服务器处理客户请求的结果。
两个模式的相同点,都是对某个IO 事件的事件通知 即告诉某个模块,这个 IO 操作可以进行或已经完成 。在结构上两者也有相同点: demultiplexor 负责提交 IO 操作(异步)、查询设备是否可操作(同步),然后当条件满足时,就回调注册处理函数。
不同点在于,异步情况下( Proactor),当回调注册的处理函数时,表示 IO 操作已经完成;同步情况下 ( Reactor),回调注册的处理函数时,表示 IO 设备可以进行某个操作 (can read or can write) ,注册的处理函数这个时候开始提交操作。
3. 代码解析
3.1 深入理解socket中的recv函数和send函数
这部分我觉得这位博主写的比较好,以下引自CSDN博主「Gopher大威」的原创文章
版权声明:本文为CSDN博主「Gopher大威」的原创文章
原文链接:https://blog.csdn.net/qq_36915078/article/details/107728888
3.1.1、函数原型
recv函数用于socket通信中接收消息,接口定义如下:
ssize_t recv(int socket, void *buf, size_t len, int flags)
参数一:指定接收端套接字描述符;
参数二:指向一个缓冲区,该缓冲区用来存放recv函数接收到的数据;
参数三:指明buf的长度;
参数四:一般置为0;
返回值:失败时,返回值小于0;超时或对端主动关闭,返回值等于0;成功时,返回值是返回接收数据的长度。
send函数用于socket通信中发送消息,接口定义如下:
ssize_t send(int socket, const void *buf, size_t len, int flags);
参数一:指定发送端套接字描述符;
参数二:指明一个存放应用程序要发送数据的缓冲区;
参数三:指明实际要发送的数据的字节数;
参数四:一般置0;
返回值:失败时,返回值小于0;超时或对端主动关闭,返回值等于0;成功时,返回值是返回发送数据的长度。
3.1.2、TCP socket中的buffer
每个TCP socket在内核中都有一个发送缓冲区和一个接受缓冲区,TCP的全双工工作模式以及TCP的流量和拥塞控制便依赖于这两个独立的buffer以及buffer的填充状态。
在这里插入图片描述
接受缓冲区把数据缓存入内核,如果没有调用read()系统调用的话,数据会一直缓存在socket的接受缓冲区内。不管进程是否调用recv()读取socket,对等端发来的数据都会经由内核接受并且缓存到socket的内核接受缓冲区之中。recv()所做的工作,就是把内核缓冲区中的数据拷贝到应用层用户的buffer里面,并返回拷贝的字节数。(注意:是拷贝,不是像read那样读取之后,清空接受缓冲区内的数据。)
进程调用send()发送数据的时候,将数据拷贝到socket的内核发送缓冲区之中,然后返回拷贝的字节数。send()返回之时,数据不一定会发送到对等端去,send()仅仅是把应用层buffer的数据拷贝到socket的内核发送缓冲区中,发送是TCP的事情。(注意:这里也是拷贝,不是像write那样发送之后,清空发送缓冲区内的数据。)
接受缓冲区被TCP用来缓存网络上接收到的数据,一直保存到应用进程读走为止。如果应用进程一直没有读取,接受缓冲区满了以后,发生的动作是:接收端通知发送端,接收窗口关闭(win=0)。这个便是滑动窗口上的实现。保证TCP套接口接受缓冲区不会溢出,从而保证了TCP是可靠传输。因为对方不允许发出超过所通告窗口大小的数据。这就是TCP的流量控制,如果对方无视窗口大小而发出了超过窗口大小的数据,则接收方TCP将丢弃它。
3.1.3、end()的工作原理
send()函数只负责将数据提交给协议层。当调用该函数时,send()先比较待发送数据的长度和套接字的发送缓冲区的长度:
- 当待拷贝数据的长度大于发送缓冲区的长度时,该函数返回SOCKET_ERROR;
- 当待拷贝数据的长度小于或等于发送缓冲区的长度时,那么send先检查协议是否正在发送发送套接字的发送缓冲区中的数据:
如果是就等待协议把数据发送完,再进行拷贝;
如果协议还没有开始发送套接字的发送缓冲区中的数据或者该发送缓冲区中没有数据,那么send就比较该发送缓冲区中的剩余空间和待拷贝数据的长度:
如果待拷贝数据的长度大于剩余空间的大小,send就一直等待协议把该发送缓冲区中的数据发完;
如果待拷贝数据的长度小于剩余空间大小,send就仅仅把buf中的数据拷贝到剩余空间中。(注意:并不是send把该套接字的发送缓冲区中数据传到连接的另一端,而是协议传的,send仅仅是把数据拷贝到该发送缓冲区的剩余空间里面。)
如果send函数拷贝成功,就返回实际拷贝的字节数;如果拷贝的过程中出现错误,send就返回SOCKET_ERROR;如果send在等待协议传送数据时网络断开的话,那么send函数也返回SOCKET_ERROR。
要注意,send函数把buffer中的数据成功拷贝到套接字的发送缓冲区中的剩余空间里面后,它就返回了,但是此时这些数据并不一定马上被传到连接的另一端。如果协议在后续的传输过程中出现网络错误的话,那么下一个socket函数就会返回SOCKET_ERROR。(每一个除send外的socket函数在执行的最开始总要先等待套接字的发送缓冲区的数据被协议传送完毕才能继续,如果在等待时出现网络错误,那么该socket函数就返回SOCKET_ERROR。)
3.1.4、recv()的工作原理
recv先检查套接字的接收缓冲区,如果该接收缓冲区中没有数据或者协议正在接收数据,那么recv就一直等待,直到协议把数据接收完毕。当协议把数据接收完毕,recv函数就把套接字的接收缓冲区中的数据拷贝到用户层的buffer中,(注意:协议接收到的数据可能大于buffer的长度,所以在这种情况下,要调用几次recv函数才能把套接字接收缓冲区中的数据拷贝完。)recv函数仅仅是拷贝数据,真正的接收数据是协议来完成的。
recv函数返回其实际拷贝的字节数。如果recv在拷贝时出错,那么就返回SOCKET_ERROR;如果recv函数在等待协议接收数据时网络中断了,那么它返回0。对方优雅的关闭socket并不影响本地recv的正常接收数据,如果协议缓冲区内没有数据,recv返回0,指示对方关闭;如果协议缓冲区有数据,则返回对应数据(可能需要多次recv),在最后一次recv时,返回0,指示对方关闭。
3.1.5、应用
在处理粘包问题的时候,其中一种方法是在包尾加上’\n’。加上‘\n’以后读取数据过程如下:
//@ssize_t:返回读的长度,若ssize_t < count,则表示读失败。
//@buf:接收数据内存首地址
//@count:接收数据长度
ssize_t readn(int fd, const void* buf, size_t count){
size_t nletf = count;
ssize_t nread;
char *bufp = (char *)buf;
while(nleft > 0){
if((nread = read(fd, bufp, nleft)) < 0){
if(errno == EINTR)
continue; //如果是中断,则继续读。
return -1;
}else if(nread == 0){ //若对方已关闭
return count - nleft; //返回读到的字节数
}
bufp += nread;
nleft -= nread;
}
return count;
}
ssize_t recv_peek(int sockfd, void *buf, size_t len){
while(1){
int ret = recv(sockfd, buf, len, MSG_PEEK); //MSG_PEEK,仅把tcp 接收缓冲区中的数据读取到buf中,并不把已读取的数据从tcp 接收缓冲区中移除,再次调用recv仍然可以读到刚才读到的数据。
if(ret == -1 && errno == EINTR)
continue;
return ret;
}
}
//@maxline 一行最大数
//先提前peek一下缓冲区,如果有数据,则从缓冲区中拷贝数据
//1、缓冲区中的数据带\n
//2、缓冲区中的数据不带\n
ssize_t readline(int sockfd, void *buf, size_t maxline){
int ret;
int nread;
char *bufp = buf;
int nleft = maxline;
int count = 0;
while(1){
//看一下缓冲区中有没有数据,并不移除内核缓冲区中的数据
ret = recv_peek(sockfd, bufp, nleft);
if(ret < 0) //失败
return ret;
else if(ret == 0) //对方已关闭
return ret;
nread = ret;
int i;
for(i = 0; i < nread; i++){
if(bufp[i] == '\n'){ //若缓冲区有\n
ret = readn(sockfd, bufp, i+1) ; //读走数据
if(ret != i+1)
exit(EXIT_FAILURE);
return ret + count; //有\n就返回,并返回读走的数据
}
}
if(nread > nleft) //如果读到的数据大于一行最大数, 异常处理
exit(EXIT_FAILURE);
nleft -= nread; //若缓冲区没有\n,把剩余的数据读走。
ret = readn(sockfd, bufp, nread);
if(ret != nread)
exit(EXIT_FAILURE);
bufp += nread; //bufp指针后移,再接着偷看(recv_peek)缓冲区数据,直到遇到\n
count += nread;
}
return -1;
}
在readline函数中,我们先用recv_peek”偷窥“ 一下现在缓冲区有多少个字符并读取到bufp,然后查看是否存在换行符’\n’。
如果存在,则使用readn连同换行符一起读取(清空缓冲区);
如果不存在,也清空一下缓冲区, 且移动bufp的位置,回到while循环开头,再次窥看。
注意,当我们调用readn读取数据时,那部分缓冲区是会被清空的,因为readn调用了read函数。
还需注意一点是,如果第二次才读取到了’\n’,则先用count保存了第一次读取的字符个数,然后返回的ret需加上原先的数据大小。
3.1.6、补充
在进行TCP协议传输的时候,要注意数据流传输的特点,recv和send不一定时一一对应的,也就是说并不是send一次,就一定recv一次就接收完,有可能send一次,recv多次才接收完,也有可能send多次,一次recv就接收完了。
TCP协议会保证数据的有序完整的传输,但是如何去正确完整的处理每一条信息,是程序员的事情。例如,服务器在循环recv,recv的缓冲区大小为100byte,客户端在循环send,每次send 6byte数据,则recv每次收到的数据可能为6byte, 12byte, 18byte,这是随机的,编程的时候注意正确处理。
3.2 select,poll,epoll代码解析
以下三个函数引出是牛客网的教程:https://www.nowcoder.com/courses/cover/live/690
3.2.1 select
// sizeof(fd_set) = 128 1024
#include <sys/time.h>
#include <sys/types.h>
#include <unistd.h>
#include <sys/select.h>
int select(int nfds, fd_set *readfds, fd_set *writefds,fd_set *exceptfds, struct timeval *timeout);
- 参数:
- nfds : 委托内核检测的最大文件描述符的值 + 1
- readfds : 要检测的文件描述符的读的集合,委托内核检测哪些文件描述符的读的属性,一般检测读操作
- 对应的是对方发送过来的数据,因为读是被动的接收数据,检测的就是读缓冲区
- 是一个传入传出参数
- writefds : 要检测的文件描述符的写的集合,委托内核检测哪些文件描述符的写的属性
- 委托内核检测写缓冲区是不是还可以写数据(不满的就可以写)
- exceptfds : 检测发生异常的文件描述符的集合
- timeout : 设置的超时时间
struct timeval {
long tv_sec; /* seconds */
long tv_usec; /* microseconds */
};
- NULL : 永久阻塞,直到检测到了文件描述符有变化
- tv_sec = 0 tv_usec = 0, 不阻塞
- tv_sec > 0 tv_usec > 0, 阻塞对应的时间
- 返回值 :
- -1 : 失败
- >0(n) : 检测的集合中有n个文件描述符发生了变化
// 将参数文件描述符fd对应的标志位设置为0
void FD_CLR(int fd, fd_set *set);
// 判断fd对应的标志位是0还是1, 返回值 : fd对应的标志位的值,0,返回0, 1,返回1
int FD_ISSET(int fd, fd_set *set);
// 将参数文件描述符fd 对应的标志位,设置为1
void FD_SET(int fd, fd_set *set);
// fd_set一共有1024 bit, 全部初始化为0
void FD_ZERO(fd_set *set);
3.2.2 poll
#include <poll.h>
struct pollfd {
int fd; /* 委托内核检测的文件描述符 */
short events; /* 委托内核检测文件描述符的什么事件 */
short revents; /* 文件描述符实际发生的事件 */
};
struct pollfd myfd;
myfd.fd = 5;
myfd.events = POLLIN | POLLOUT;
int poll(struct pollfd *fds, nfds_t nfds, int timeout);
- 参数:
- fds : 是一个struct pollfd 结构体数组,这是一个需要检测的文件描述符的集合
- nfds : 这个是第一个参数数组中最后一个有效元素的下标 + 1
- timeout : 阻塞时长
0 : 不阻塞
-1 : 阻塞,当检测到需要检测的文件描述符有变化,解除阻塞
>0 : 阻塞的时长
- 返回值:
-1 : 失败
>0(n) : 成功,n表示检测到集合中有n个文件描述符发生变化
3.2.3 epoll
Epoll 的工作模式:
- LT 模式 (水平触发)
假设委托内核检测读事件 -> 检测fd的读缓冲区
读缓冲区有数据 - > epoll检测到了会给用户通知:
- a.用户不读数据,数据一直在缓冲区,epoll 会一直通知
- b.用户只读了一部分数据,epoll会通知
- c.缓冲区的数据读完了,不通知
LT(level - triggered)是缺省的工作方式,并且同时支持 block 和 no-block socket。在这种做法中,内核告诉你一个文件描述符是否就绪了,然后你可以对这个就绪的 fd 进行 IO 操作。如果你不作任何操作,内核还是会继续通知你的。
- ET 模式(边沿触发)
假设委托内核检测读事件 -> 检测fd的读缓冲区
读缓冲区有数据 - > epoll检测到了会给用户通知
- a.用户不读数据,数据一致在缓冲区中,epoll下次检测的时候就不通知了
- b.用户只读了一部分数据,epoll不通知
- c.缓冲区的数据读完了,不通知
ET(edge - triggered)是高速工作方式,只支持 no-block socket。在这种模式下,当描述符从未就绪变为就绪时,内核通过epoll告诉你。然后它会假设你知道文件描述符已经就绪,并且不会再为那个文件描述符发送更多的就绪通知,直到你做了某些操作导致那个文件描述符不再为就绪状态了。但是请注意,如果一直不对这个 fd 作 IO 操作(从而导致它再次变成未就绪),内核不会发送更多的通知(only once)。
ET 模式在很大程度上减少了 epoll 事件被重复触发的次数,因此效率要比 LT 模式高。epoll工作在 ET 模式的时候,必须使用非阻塞套接口,以避免由于一个文件句柄的阻塞读/阻塞写操作把处理多个文件描述符的任务饿死。
#include <sys/epoll.h>
/*创建一个新的epoll实例。在内核中创建了一个数据,这个数据中有两个比较重要的数据,
一个是需要检测的文件描述符的信息(红黑树),还有一个是就绪列表,
存放检测到数据发送改变的文件描述符信息(双向链表)。*/
int epoll_create(int size);
- 参数:
size : 目前没有意义了。随便写一个数,必须大于0
- 返回值:
-1 : 失败
> 0 : 文件描述符,操作epoll实例的
typedef union epoll_data {
void *ptr;
int fd;
uint32_t u32;
uint64_t u64;
} epoll_data_t;
struct epoll_event {
uint32_t events; /* Epoll events */
epoll_data_t data; /* User data variable */
};
常见的Epoll检测事件:
- EPOLLIN
- EPOLLOUT
- EPOLLERR
- EPOLLET
// 对epoll实例进行管理:添加文件描述符信息,删除信息,修改信息
int epoll_ctl(int epfd, int op, int fd, struct epoll_event *event);
- 参数:
- epfd : epoll实例对应的文件描述符
- op : 要进行什么操作
EPOLL_CTL_ADD: 添加
EPOLL_CTL_MOD: 修改
EPOLL_CTL_DEL: 删除
- fd : 要检测的文件描述符
- event : 检测文件描述符什么事情
-
// 检测函数
int epoll_wait(int epfd, struct epoll_event *events, int maxevents, int
timeout);
- 参数:
- epfd : epoll实例对应的文件描述符
- events : 传出参数,保存了发送了变化的文件描述符的信息
- maxevents : 第二个参数结构体数组的大小
- timeout : 阻塞时间
- 0 : 不阻塞
- -1 : 阻塞,直到检测到fd数据发生变化,解除阻塞
- > 0 : 阻塞的时长(毫秒)
- 返回值:
- 成功,返回发送变化的文件描述符的个数 > 0
- 失败 -1
3.2.4 综合代码
这份代码建议大家用VScode打开,需要哪部分可以直接打开,看起来跟舒服
#include <errno.h>
#include <netinet/in.h>
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <sys/socket.h>
#include <sys/types.h>
#include <unistd.h>
#include <sys/poll.h>
#include <sys/epoll.h>
#include <pthread.h>
#define MAXLNE 4096 //缓存池4K
#define POLL_SIZE 1024 //poll和epoll那个数组的大小
//8m * 4G = 128 , 512
//C10k
void *client_routine(void *arg) { //多线程中与客户端通信
int connfd = *(int *)arg;
char buff[MAXLNE];
while (1) {
int n = recv(connfd, buff, MAXLNE, 0);//哪个文件描述符,放到哪里,容器多大,从这个容器的什么位置开始。n表示接收了多少数据
if (n > 0) {
buff[n] = '\0';
printf("recv msg from client: %s\n", buff);
send(connfd, buff, n, 0);//哪个文件描述符,从哪个容器发数据,发多少数据,从容器的什么位置开始
} else if (n == 0) {
close(connfd);
break;
}
}
return nullptr;
}
int main(int argc, char **argv) {
int listenfd, connfd, n;
char buff[MAXLNE];
if ((listenfd = socket(AF_INET, SOCK_STREAM, 0)) == -1) {
printf("create socket error: %s(errno: %d)\n", strerror(errno), errno);
return -1;
}
struct sockaddr_in servaddr;//服务端的地址
memset(&servaddr, 0, sizeof(servaddr));
servaddr.sin_family = AF_INET;
servaddr.sin_addr.s_addr = htonl(INADDR_ANY); //主机转网络IP
servaddr.sin_port = htons(9999); //主机转网络端口
//绑定服务器的地址和端口
if (bind(listenfd, (struct sockaddr *)&servaddr, sizeof(servaddr)) == -1) {
printf("bind socket error: %s(errno: %d)\n", strerror(errno), errno);
return 0;
}
//开放监听
if (listen(listenfd, 10) == -1) {
printf("listen socket error: %s(errno: %d)\n", strerror(errno), errno);
return 0;
}
#if 0
//单线程一个客户端的通信(最简单)
struct sockaddr_in client;
socklen_t len = sizeof(client);
if ((connfd = accept(listenfd, (struct sockaddr *)&client, &len)) == -1) {//从已完成三次握手的连接队列取出一个
printf("accept socket error: %s(errno: %d)\n", strerror(errno), errno);
return 0;
}
printf("========waiting for client's request========\n");
while (1) {//开始通信
n = recv(connfd, buff, MAXLNE, 0);//没考虑异常情况啊if (n == -1) {
if (n > 0) {
buff[n] = '\0';
printf("recv msg from client: %s\n", buff);
send(connfd, buff, n, 0);
} else if (n == 0) {
close(connfd);
}
//close(connfd);
}
#elif 0
//单线程一个客户端的通信,把accept函数放在循环里面将无法正常通信。
printf("========waiting for client's request========\n");
while (1) {
struct sockaddr_in client;
socklen_t len = sizeof(client);
if ((connfd = accept(listenfd, (struct sockaddr *)&client, &len)) == -1) {
printf("accept socket error: %s(errno: %d)\n", strerror(errno), errno);
return 0;
}
n = recv(connfd, buff, MAXLNE, 0);
if (n > 0) {
buff[n] = '\0';
printf("recv msg from client: %s\n", buff);
send(connfd, buff, n, 0);
} else if (n == 0) {
close(connfd);
}
//close(connfd);
}
#elif 0 //多线程
//多线程
while (1) {
struct sockaddr_in client;
socklen_t len = sizeof(client);
if ((connfd = accept(listenfd, (struct sockaddr *)&client, &len)) == -1) {//如果没有客户端连接,主函数会阻塞在这
printf("accept socket error: %s(errno: %d)\n", strerror(errno), errno);
return 0;
}
pthread_t threadid;
pthread_create(&threadid, NULL, client_routine, (void*)&connfd);
}
#elif 0 // select的用法,
// select的用法,
/*1.一请求一线程,很难突破C10K的并发量
2. 一个select可以做到1024个文件描述符fd的管理,如果我们多开几个线程每一个线程用一个select,
可以突破C10K的并发量,但是很难突破C1000K,他需要将监听的fd全部拷贝到内核态,再拷贝出来会消耗大量资源。
*/
fd_set rfds, rset, wfds, wset;
FD_ZERO(&rfds);
FD_SET(listenfd, &rfds);
FD_ZERO(&wfds);
int max_fd = listenfd;
while (1) {
rset = rfds;
wset = wfds;
int nready = select(max_fd+1, &rset, &wset, NULL, NULL);
if (FD_ISSET(listenfd, &rset)) { //有客户端请求连接。
struct sockaddr_in client;
socklen_t len = sizeof(client);
if ((connfd = accept(listenfd, (struct sockaddr *)&client, &len)) == -1) {
printf("accept socket error: %s(errno: %d)\n", strerror(errno), errno);
return 0;
}
FD_SET(connfd, &rfds);
if (connfd > max_fd) max_fd = connfd;
if (--nready == 0) continue; // 说明只有一个客户请求连接,没有客户需要通信
}
int i = 0;
for (i = listenfd+1;i <= max_fd;i ++) {
if (FD_ISSET(i, &rset)) { // 有客户发信息来了
n = recv(i, buff, MAXLNE, 0);
if (n > 0) {
buff[n] = '\0';
printf("recv msg from client: %s\n", buff);
FD_SET(i, &wfds);
//reactor
//send(i, buff, n, 0);
} else if (n == 0) { //客户断开连接
FD_CLR(i, &rfds); //回收资源
//printf("disconnect\n");
close(i);
}
if (--nready == 0) break;
} else if (FD_ISSET(i, &wset)) { // 需要发信息给客户
send(i, buff, n, 0);
FD_SET(i, &rfds);
}
}
}
#elif 0 // poll的用法,
// poll的用法,
/*
2.他没有1024的文件描述符的限制,使用结构体数组来存储他的fd,
数组的大小可以自己指定。(每个fd有输入、输出和错误可以检测)
可以突破C10K的并发量,但是很难突破C1000K,他需要将监听的fd全部拷贝到内核态,再拷贝出来会消耗大量资源。
*/
struct pollfd fds[POLL_SIZE] = {0};
fds[listenfd].fd = listenfd;
fds[listenfd].events = POLLIN;
int max_fd = listenfd;
int i = 0;
for (i = 1;i < POLL_SIZE; i++) {
fds[i].fd = -1;
}
while (1) {
int nready = poll(fds, max_fd+1, -1);
if (fds[listenfd].revents & POLLIN) {//有客户进来
struct sockaddr_in client;
socklen_t len = sizeof(client);
if ((connfd = accept(listenfd, (struct sockaddr *)&client, &len)) == -1) {
printf("accept socket error: %s(errno: %d)\n", strerror(errno), errno);
return 0;
}
printf("accept \n");
fds[connfd].fd = connfd;
fds[connfd].events = POLLIN;
if (connfd > max_fd) max_fd = connfd;
if (--nready == 0) continue;
}
//int i = 0;
for (i = listenfd+1;i <= max_fd; i++) {
if (fds[i].revents & POLLIN) { //检测读事件
n = recv(i, buff, MAXLNE, 0);
if (n > 0) {
buff[n] = '\0';
printf("recv msg from client: %s\n", buff);
send(i, buff, n, 0);
} else if (n == 0) { //客户断开
fds[i].fd = -1;
close(i);
}
if (--nready == 0) break;
}
}
}
#else // epoll的用法,(服务器并发的核心)
//poll/select -->
// epoll_create 创建一个集合
// epoll_ctl(ADD, DEL, MOD)
// epoll_wait
// epoll的用法,(服务器并发的核心)
/* 默认是水平触发。
1.创建一个新的epoll实例。在内核中创建了一个数据,这个数据中有两个比较重要的数据,一个是需要检
测的文件描述符的信息(红黑树),还有一个是就绪列表,存放检测到数据发送改变的文件描述符信息(双向链表)
*/
int epfd = epoll_create(1); //int size,以前是用固定大小的数组,现在用可变大小的链表管理
struct epoll_event events[POLL_SIZE] = {0}; //POLL_SIZE表示每次取出需要通信的量(IO处理生产者,消费者模型)。
//假设有100w的并发(客户),但是真正活跃的有1W就非常高了
struct epoll_event ev;//需要检测的事件
ev.events = EPOLLIN;
ev.data.fd = listenfd;
epoll_ctl(epfd, EPOLL_CTL_ADD, listenfd, &ev);
while (1) {
int nready = epoll_wait(epfd, events, POLL_SIZE, 5);
if (nready == -1) {
continue;
}
int i = 0;
for (i = 0;i < nready;i ++) {
int clientfd = events[i].data.fd;
if (clientfd == listenfd) {//有客户端来
struct sockaddr_in client;
socklen_t len = sizeof(client);
if ((connfd = accept(listenfd, (struct sockaddr *)&client, &len)) == -1) {
printf("accept socket error: %s(errno: %d)\n", strerror(errno), errno);
return 0;
}
printf("accept\n");
ev.events = EPOLLIN;
ev.data.fd = connfd;
epoll_ctl(epfd, EPOLL_CTL_ADD, connfd, &ev);
} else if (events[i].events & EPOLLIN) {//有消息到
n = recv(clientfd, buff, MAXLNE, 0);
if (n > 0) {
buff[n] = '\0';
printf("recv msg from client: %s\n", buff);
send(clientfd, buff, n, 0);
} else if (n == 0) { //
ev.events = EPOLLIN;
ev.data.fd = clientfd;
epoll_ctl(epfd, EPOLL_CTL_DEL, clientfd, &ev);
close(clientfd);
}
}
}
}
#endif
close(listenfd);
return 0;
}