Linux 网络IO管理(epoll实现)

news2024/11/14 19:00:28

文章目录

  • 1、网络IO模型
    • 1.1、阻塞IO(blocking IO)
    • 1.2、非阻塞IO(non-blocking IO)
    • 1.3、多路复用 IO(IO multiplexing)
    • 1.4、异步 IO(Asynchronous I/O)
    • 1.5、信号驱动 IO(signal driven I/O, SIGIO)
    • 1.6、服务器模型 Reactor 与 Proactor
    • Reactor 模型
    • 同步 I/O 模拟 Proactor 模型
  • 2、select、poll、epoll实现
  • 3、reactor模型实现

1、网络IO模型

1.1、阻塞IO(blocking IO)

在linux 中,默认情况所有的socket都是blocking,一个典型的读操作流程

在这里插入图片描述
当用户进程调用了 read 这个系统调用,kernel 就开始了 IO 的第一个阶段:准备数据。对于network io 来说,很多时候数据在一开始还没有到达(比如,还没有收到一个完整的数据包),这个时候 kernel 就要等待足够的数据到来。而在用户进程这边,整个进程会被阻塞。当 kernel一直等到数据准备好了,它就会将数据从 kernel 中拷贝到用户内存,然后 kernel 返回结果,用户进程才解除 block 的状态,重新运行起来。所以,blocking IO 的特点就是在 IO 执行的两个阶段(等待数据和拷贝数据两个阶段)都被block 了。几乎所有的程序员第一次接触到的网络编程都是从 listen()、send()、recv() 等接口开始的,这些接口都是阻塞型的。使用这些接口可以很方便的构建服务器/客户机的模型。下面是一个简单地“一问一答”的服务器。

1.2、非阻塞IO(non-blocking IO)

Linux 下,可以通过设置 socket 使其变为 non-blocking。当对一个 non-blocking socket 执行读
操作时,流程是这个样子:

当用户进程发出 read 操作时,如果 kernel 中的数据还没有准备好,那
么它并不会 block 用户进程,而是立刻返回一个 error。从用户进程角度讲 ,它发起一个read 操作后,并不需要等待,而是马上就得到了一个结果。用户进程判断结果是一个 error时,它就知道数据还没有准备好,于是它可以再次发送 read 操作。一旦 kernel 中的数据准备好了,并且又再次收到了用户进程的 system call,那么它马上就将数据拷贝到了用户内存,然后返回,所以,在非阻塞式 IO 中,用户进程其实是需要不断的主动询问 kernel数据准备好了没有。

在非阻塞状态下,recv() 接口在被调用后立即返回,返回值代表了不同的含义。如在本例中,非阻塞的接口相比于阻塞型接口的显著差异在于,在被调用之后立即返回。使用如下的函数可以将某句柄 fd 设为非阻塞状态。

1.3、多路复用 IO(IO multiplexing)

IO multiplexing 这个词可能有点陌生,但是提到 select/epoll,大概就都能明白了。有些地方也称这种 IO 方式为事件驱动 IO(event driven IO)。我们都知道,select/epoll 的好处就在于单个 process 就可以同时处理多个网络连接的 IO。它的基本原理就是 select/epoll 这个 function会不断的轮询所负责的所有 socket,当某个 socket 有数据到达了,就通知用户进程。

在这里插入图片描述
当用户进程调用了 select,那么整个进程会被 block,而同时,kernel 会“监视”所有 select 负责的 socket,当任何一个 socket 中的数据准备好了,select 就会返回。这个时候用户进程再调用 read 操作,将数据从 kernel 拷贝到用户进程。这个图和 blocking IO 的图其实并没有太大的不同,事实上还更差一些。因为这里需要使用两个系统调用(select 和 read),而 blocking IO 只调用了一个系统调用(read)。但是使用 select 以后最大的优势是用户可以在一个线程内同时处理多个 socket 的 IO 请求。用户可以注册多个 socket,然后不断地调用 select 读取被激活的 socket,即可达到在同一个线程内同时处理多个 IO 请求的目的。而在同步阻塞模型中,必须通过多线程的方式才能达到这个目的。(多说一句:所以,如果处理的连接数不是很高的话,使用select/epoll 的 web server 不一定比使用 multi-threading + blocking IO 的 web server 性能更好,可能延迟还更大。select/epoll 的优势并不是对于单个连接能处理得更快,而是在于能处理更多的连接。)

1.4、异步 IO(Asynchronous I/O)

Linux 下的 asynchronous IO 用在磁盘 IO 读写操作,不用于网络 IO,从内核 2.6 版本才开始引入。先看一下它的流程

在这里插入图片描述

用户进程发起 read 操作之后,立刻就可以开始去做其它的事。而另一方面,从 kernel的角度,当它受到一个 asynchronous read 之后,首先它会立刻返回,所以不会对用户进程产生任何 block。然后,kernel 会等待数据准备完成,然后将数据拷贝到用户内存,当这一切都完成之后,kernel 会给用户进程发送一个 signal,告诉它 read 操作完成了。

1.5、信号驱动 IO(signal driven I/O, SIGIO)

首先我们允许套接口进行信号驱动 I/O,并安装一个信号处理函数,进程继续运行并不阻塞。当数据准备好时,进程会收到一个 SIGIO 信号,可以在信号处理函数中调用 I/O 操作函数处理数据。当数据报准备好读取时,内核就为该进程产生一个 SIGIO 信号。我们随后既可以在信号处理函数中调用 read 读取数据报,并通知主循环数据已准备好待处理,也可以立即通知主循环,让它来读取数据报。无论如何处理 SIGIO 信号,这种模型的优势在于等待数据报到达(第一阶段)期间,进程可以继续执行,不被阻塞。免去了 select 的阻塞与轮询,当有活跃套接字时,由注册的 handler 处理。

在这里插入图片描述

1.6、服务器模型 Reactor 与 Proactor

对高并发编程,网络连接上的消息处理,可以分为两个阶段:等待消息准备好、消息处理。当使用默认的阻塞套接字时(例如上面提到的 1 个线程捆绑处理 1 个连接),往往是把这两个阶段合而为一,这样操作套接字的代码所在的线程就得睡眠来等待消息准备好,这导致了高并发下线程会频繁的睡眠、唤醒,从而影响了 CPU 的使用效率。高并发编程方法当然就是把两个阶段分开处理。即,等待消息准备好的代码段,与处理消息的代码段是分离的。当然,这也要求套接字必须是非阻塞的,否则,处理消息的代码段很容易导致条件不满足时,所在线程又进入了睡眠等待阶段。那么问题来了,等待消息准备好这个阶段怎么实现?它毕竟还是等待,这意味着线程还是要睡眠的!解决办法就是,线程主动查询,或者让 1 个线程为所有连接而等待!这就是 IO 多路复用了。多路复用就是处理等待消息准备好这件事的,但它可以同时处理多个连接!它也可能“等待”,所以它也会导致线程睡眠,然而这不要紧,因为它一对多、它可以监控所有连接。这样,当我们的线程被唤醒执行时,就一定是有一些连接准备好被我们的代码执行了。作为一个高性能服务器程序通常需要考虑处理三类事件: I/O 事件,定时事件及信号。两种高效的事件处理模型:Reactor 和 Proactor。

Reactor 模型

首先来回想一下普通函数调用的机制:程序调用某函数,函数执行,程序等待,函数将结果和控制权返回给程序,程序继续处理。Reactor 释义“反应堆”,是一种事件驱动机制。和普通函数调用的不同之处在于:应用程序不是主动的调用某个 API 完成处理,而是恰恰相反,Reactor 逆置了事件处理流程,应用程序需要提供相应的接口并注册到 Reactor 上,如果相应的时间发生,Reactor 将主动调用应用程序注册的接口,这些接口又称为“回调函数”。Reactor 模式是处理并发 I/O 比较常见的一种模式,用于同步 I/O,中心思想是将所有要处理的 I/O 事件注册到一个中心 I/O 多路复用器上,同时主线程/进程阻塞在多路复用器上;一旦有 I/O 事件到来或是准备就绪(文件描述符或 socket 可读、写),多路复用器返回并将事先注册的相应 I/O 事件分发到对应的处理器中。

Reactor 模式是处理并发 I/O 比较常见的一种模式,用于同步 I/O,中心思想是将所有要处理的 I/O 事件注册到一个中心 I/O 多路复用器上,同时主线程/进程阻塞在多路复用器上;一旦有 I/O 事件到来或是准备就绪(文件描述符或 socket 可读、写),多路复用器返回并将事先注册的相应 I/O 事件分发到对应的处理器中。

同步 I/O 模拟 Proactor 模型

  1. 主线程往 epoll 内核事件表中注册 socket 上的读就绪事件。
  2. 主线程调用 epoll_wait 等待 socket 上有数据可读。
  3. 当 socket 上有数据可读时,epoll_wait 通知主线程。主线程从 socket 循环读取数据,直到没有更多数据可读,然后将读取到的数据封装成一个请求对象并插入请求队列。
  4. 睡眠在请求队列上的某个工作线程被唤醒,它获得请求对象并处理客户请求,然后往 epoll 内核事件表中注册 socket 上的写就绪事件。
  5. 主线程调用 epoll_wait 等待 socket 可写。
  6. 当 socket 可写时,epoll_wait 通知主线程。主线程往 socket 上写入服务器处理客户请求的结果。

相同点和不同点:

两个模式的相同点,都是对某个 IO 事件的事件通知(即告诉某个模块,这个 IO 操作可以进行或已经完成)。在结构上两者也有相同点:demultiplexor 负责提交 IO 操作(异步)、查询设备是否可操作(同步),然后当条件满足时,就回调注册处理函数。不同点在于,异步情况下(Proactor),当回调注册的处理函数时,表示 IO 操作已经完成;同步情况下(Reactor),回调注册的处理函数时,表示 IO 设备可以进行某个操作(can read or can write),注册的处理函数这个时候开始提交操作。

2、select、poll、epoll实现

#include <errno.h>
#include <netinet/in.h>
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <sys/socket.h>
#include <sys/types.h>
#include <unistd.h>

#include <sys/poll.h>
#include <sys/epoll.h>


#include <pthread.h>
 
#define MAXLNE  4096

#define POLL_SIZE	1024

//8m * 4G = 128 , 512
//C10k
void *client_routine(void *arg) { //

	int connfd = *(int *)arg;

	char buff[MAXLNE];

	while (1) {

		int n = recv(connfd, buff, MAXLNE, 0);
        if (n > 0) {
            buff[n] = '\0';
            printf("recv msg from client: %s\n", buff);

	    	send(connfd, buff, n, 0);
        } else if (n == 0) {
            close(connfd);
			break;
        }

	}

	return NULL;
}


int main(int argc, char **argv) 
{
    int listenfd, connfd, n;
    struct sockaddr_in servaddr;
    char buff[MAXLNE];
 
    if ((listenfd = socket(AF_INET, SOCK_STREAM, 0)) == -1) {
        printf("create socket error: %s(errno: %d)\n", strerror(errno), errno);
        return 0;
    }
 
    memset(&servaddr, 0, sizeof(servaddr));
    servaddr.sin_family = AF_INET;
    servaddr.sin_addr.s_addr = htonl(INADDR_ANY);
    servaddr.sin_port = htons(9999);
 
    if (bind(listenfd, (struct sockaddr *)&servaddr, sizeof(servaddr)) == -1) {
        printf("bind socket error: %s(errno: %d)\n", strerror(errno), errno);
        return 0;
    }
 
    if (listen(listenfd, 10) == -1) {
        printf("listen socket error: %s(errno: %d)\n", strerror(errno), errno);
        return 0;
    }
 #if 0
    struct sockaddr_in client;
    socklen_t len = sizeof(client);
    if ((connfd = accept(listenfd, (struct sockaddr *)&client, &len)) == -1) {
        printf("accept socket error: %s(errno: %d)\n", strerror(errno), errno);
        return 0;
    }

    printf("========waiting for client's request========\n");
    while (1) {

        n = recv(connfd, buff, MAXLNE, 0);
        if (n > 0) {
            buff[n] = '\0';
            printf("recv msg from client: %s\n", buff);

	    	send(connfd, buff, n, 0);
        } else if (n == 0) {
            close(connfd);
        }
        
        //close(connfd);
    }

#elif 0


    printf("========waiting for client's request========\n");
    while (1) {

		struct sockaddr_in client;
	    socklen_t len = sizeof(client);
	    if ((connfd = accept(listenfd, (struct sockaddr *)&client, &len)) == -1) {
	        printf("accept socket error: %s(errno: %d)\n", strerror(errno), errno);
	        return 0;
	    }

        n = recv(connfd, buff, MAXLNE, 0);
        if (n > 0) {
            buff[n] = '\0';
            printf("recv msg from client: %s\n", buff);

	    	send(connfd, buff, n, 0);
        } else if (n == 0) {
            close(connfd);
        }
        
        //close(connfd);
    }

#elif 0

	while (1) {

		struct sockaddr_in client;
	    socklen_t len = sizeof(client);
	    if ((connfd = accept(listenfd, (struct sockaddr *)&client, &len)) == -1) {
	        printf("accept socket error: %s(errno: %d)\n", strerror(errno), errno);
	        return 0;
	    }

		pthread_t threadid;
		pthread_create(&threadid, NULL, client_routine, (void*)&connfd);

    }

#elif 0

	// 
	fd_set rfds, rset, wfds, wset;

	FD_ZERO(&rfds);
	FD_SET(listenfd, &rfds);

	FD_ZERO(&wfds);

	int max_fd = listenfd;

	while (1) {

		rset = rfds;
		wset = wfds;

		int nready = select(max_fd+1, &rset, &wset, NULL, NULL);


		if (FD_ISSET(listenfd, &rset)) { //

			struct sockaddr_in client;
		    socklen_t len = sizeof(client);
		    if ((connfd = accept(listenfd, (struct sockaddr *)&client, &len)) == -1) {
		        printf("accept socket error: %s(errno: %d)\n", strerror(errno), errno);
		        return 0;
		    }

			FD_SET(connfd, &rfds);

			if (connfd > max_fd) max_fd = connfd;

			if (--nready == 0) continue;

		}

		int i = 0;
		for (i = listenfd+1;i <= max_fd;i ++) {

			if (FD_ISSET(i, &rset)) { // 

				n = recv(i, buff, MAXLNE, 0);
		        if (n > 0) {
		            buff[n] = '\0';
		            printf("recv msg from client: %s\n", buff);

					FD_SET(i, &wfds);

					//reactor
					//send(i, buff, n, 0);
		        } else if (n == 0) { //

					FD_CLR(i, &rfds);
					//printf("disconnect\n");
		            close(i);
					
		        }
				if (--nready == 0) break;
			} else if (FD_ISSET(i, &wset)) {

				send(i, buff, n, 0);
				FD_SET(i, &rfds);
			
			}

		}
		

	}

#elif 0


	struct pollfd fds[POLL_SIZE] = {0};
	fds[0].fd = listenfd;
	fds[0].events = POLLIN;

	int max_fd = listenfd;
	int i = 0;
	for (i = 1;i < POLL_SIZE;i ++) {
		fds[i].fd = -1;
	}

	while (1) {

		int nready = poll(fds, max_fd+1, -1);

	
		if (fds[0].revents & POLLIN) {

			struct sockaddr_in client;
		    socklen_t len = sizeof(client);
		    if ((connfd = accept(listenfd, (struct sockaddr *)&client, &len)) == -1) {
		        printf("accept socket error: %s(errno: %d)\n", strerror(errno), errno);
		        return 0;
		    }

			printf("accept \n");
			fds[connfd].fd = connfd;
			fds[connfd].events = POLLIN;

			if (connfd > max_fd) max_fd = connfd;

			if (--nready == 0) continue;
		}

		//int i = 0;
		for (i = listenfd+1;i <= max_fd;i ++)  {

			if (fds[i].revents & POLLIN) {
				
				n = recv(i, buff, MAXLNE, 0);
		        if (n > 0) {
		            buff[n] = '\0';
		            printf("recv msg from client: %s\n", buff);

					send(i, buff, n, 0);
		        } else if (n == 0) { //

					fds[i].fd = -1;

		            close(i);
					
		        }
				if (--nready == 0) break;

			}

		}

	}

#else

	//poll/select --> 
	// epoll_create 
	// epoll_ctl(ADD, DEL, MOD)
	// epoll_wait

	int epfd = epoll_create(1); //int size

	struct epoll_event events[POLL_SIZE] = {0};
	struct epoll_event ev;

	ev.events = EPOLLIN;
	ev.data.fd = listenfd;

	epoll_ctl(epfd, EPOLL_CTL_ADD, listenfd, &ev);

	while (1) {

		int nready = epoll_wait(epfd, events, POLL_SIZE, 5);
		if (nready == -1) {
			continue;
		}

		int i = 0;
		for (i = 0;i < nready;i ++) {

			int clientfd =  events[i].data.fd;
			if (clientfd == listenfd) {

				struct sockaddr_in client;
			    socklen_t len = sizeof(client);
			    if ((connfd = accept(listenfd, (struct sockaddr *)&client, &len)) == -1) {
			        printf("accept socket error: %s(errno: %d)\n", strerror(errno), errno);
			        return 0;
			    }

				printf("accept\n");
				ev.events = EPOLLIN;
				ev.data.fd = connfd;
				epoll_ctl(epfd, EPOLL_CTL_ADD, connfd, &ev);

			} else if (events[i].events & EPOLLIN) {

				n = recv(clientfd, buff, MAXLNE, 0);
		        if (n > 0) {
		            buff[n] = '\0';
		            printf("recv msg from client: %s\n", buff);

					send(clientfd, buff, n, 0);
		        } else if (n == 0) { //


					ev.events = EPOLLIN;
					ev.data.fd = clientfd;

					epoll_ctl(epfd, EPOLL_CTL_DEL, clientfd, &ev);

		            close(clientfd);
					
		        }

			}

		}

	}
	

#endif
 
    close(listenfd);
    return 0;
}


3、reactor模型实现

#include <errno.h>
#include <netinet/in.h>
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <sys/socket.h>
#include <sys/types.h>
#include <unistd.h>

#include <sys/poll.h>
#include <sys/epoll.h>


#include <pthread.h>
 
#define MAXLNE  4096

#define POLL_SIZE	1024

#define BUFFER_LENGTH		1024
#define MAX_EPOLL_EVENT		1024

#define NOSET_CB	0
#define READ_CB		1
#define WRITE_CB	2
#define ACCEPT_CB	3


typedef int NCALLBACK(int fd, int event, void *arg);


struct nitem { // fd

	int fd;

	int status;
	int events;
	void *arg;
#if 0
	NCALLBACK callback;
#else
	NCALLBACK *readcb;   // epollin
	NCALLBACK *writecb;  // epollout
	NCALLBACK *acceptcb; // epollin
#endif
	unsigned char sbuffer[BUFFER_LENGTH]; //
	int slength;

	unsigned char rbuffer[BUFFER_LENGTH];
	int rlength;
	
};

struct itemblock {

	struct itemblock *next;
	struct nitem *items;

};

struct reactor {

	int epfd;
	struct itemblock *head; 

};

int init_reactor(struct reactor *r);

int read_callback(int fd, int event, void *arg);

int write_callback(int fd, int event, void *arg);

int accept_callback(int fd, int event, void *arg);


struct reactor *instance = NULL;

struct reactor *getInstance(void) { //singleton

	if (instance == NULL) {

		instance = malloc(sizeof(struct reactor));
		if (instance == NULL) return NULL;
		memset(instance, 0, sizeof(struct reactor));

		if (0 > init_reactor(instance)) {
			free(instance);
			return NULL;
		}

	}

	return instance;
}



int nreactor_set_event(int fd, NCALLBACK cb, int event, void *arg) {

	struct reactor *r = getInstance();
	
	struct epoll_event ev = {0};
	
	if (event == READ_CB) {
		r->head->items[fd].fd = fd;
		r->head->items[fd].readcb = cb;
		r->head->items[fd].arg = arg;

		ev.events = EPOLLIN;
		
	} else if (event == WRITE_CB) {
		r->head->items[fd].fd = fd;
		r->head->items[fd].writecb = cb;
		r->head->items[fd].arg = arg;

		ev.events = EPOLLOUT;
	} else if (event == ACCEPT_CB) {
		r->head->items[fd].fd = fd;
		r->head->items[fd].acceptcb = cb;
		r->head->items[fd].arg = arg;

		ev.events = EPOLLIN;
	}

	ev.data.ptr = &r->head->items[fd];

	
	if (r->head->items[fd].events == NOSET_CB) {
		if (epoll_ctl(r->epfd, EPOLL_CTL_ADD, fd, &ev) < 0) {
			printf("epoll_ctl EPOLL_CTL_ADD failed, %d\n", errno);
			return -1;
		}
		r->head->items[fd].events = event;
	} else if (r->head->items[fd].events != event) {

		if (epoll_ctl(r->epfd, EPOLL_CTL_MOD, fd, &ev) < 0) {
			printf("epoll_ctl EPOLL_CTL_MOD failed\n");
			return -1;
		}
		r->head->items[fd].events = event;
	}

	
	return 0;
}

int nreactor_del_event(int fd, NCALLBACK cb, int event, void *arg) {

	struct reactor *r = getInstance();
	
	struct epoll_event ev = {0};
	ev.data.ptr = arg;

	epoll_ctl(r->epfd, EPOLL_CTL_DEL, fd, &ev);
	r->head->items[fd].events = 0;

	return 0;
}



int write_callback(int fd, int event, void *arg) {

	struct reactor *R = getInstance();
	
	unsigned char *sbuffer = R->head->items[fd].sbuffer;
	int length = R->head->items[fd].slength;

	int ret = send(fd, sbuffer, length, 0);

	if (ret < length) {
		nreactor_set_event(fd, write_callback, WRITE_CB, NULL);
	} else {
		nreactor_set_event(fd, read_callback, READ_CB, NULL);
	}
	return 0;
}

// 5k qps

int read_callback(int fd, int event, void *arg) {

	struct reactor *R = getInstance();

	unsigned char *buffer = R->head->items[fd].rbuffer;

	
#if 0 //ET
	int idx = 0, ret = 0;
	while (idx < BUFFER_LENGTH) {

		ret = recv(fd, buffer+idx, BUFFER_LENGTH-idx, 0);
		if (ret == -1) { 
			break;
		} else if (ret > 0) {
			idx += ret;
		} else {// == 0
			break;
		}

	}

	if (idx == BUFFER_LENGTH && ret != -1) {
		nreactor_set_event(fd, read_callback, READ_CB, NULL);
	} else if (ret == 0) {
		nreactor_set_event
		//close(fd);
	} else {
		nreactor_set_event(fd, write_callback, WRITE_CB, NULL);
	}
	
#else //LT

	int ret = recv(fd, buffer, BUFFER_LENGTH, 0);
	if (ret == 0) { // fin
		
		nreactor_del_event(fd, NULL, 0, NULL);
		close(fd);
		
	} else if (ret > 0) {

		unsigned char *sbuffer = R->head->items[fd].sbuffer;
		memcpy(sbuffer, buffer, ret);
		R->head->items[fd].slength = ret;

		printf("readcb: %s\n", sbuffer);
		nreactor_set_event(fd, write_callback, WRITE_CB, NULL);
	}
		
#endif

	

}


// web server 
// ET / LT
int accept_callback(int fd, int event, void *arg) {

	int connfd;
	struct sockaddr_in client;
    socklen_t len = sizeof(client);
    if ((connfd = accept(fd, (struct sockaddr *)&client, &len)) == -1) {
        printf("accept socket error: %s(errno: %d)\n", strerror(errno), errno);
        return 0;
    }

	nreactor_set_event(connfd, read_callback, READ_CB, NULL);

}



int init_server(int port) {

	int listenfd;
    struct sockaddr_in servaddr;
    char buff[MAXLNE];
 
    if ((listenfd = socket(AF_INET, SOCK_STREAM, 0)) == -1) {
        printf("create socket error: %s(errno: %d)\n", strerror(errno), errno);
        return 0;
    }
 
    memset(&servaddr, 0, sizeof(servaddr));
    servaddr.sin_family = AF_INET;
    servaddr.sin_addr.s_addr = htonl(INADDR_ANY);
    servaddr.sin_port = htons(port);
 
    if (bind(listenfd, (struct sockaddr *)&servaddr, sizeof(servaddr)) == -1) {
        printf("bind socket error: %s(errno: %d)\n", strerror(errno), errno);
        return 0;
    }
 
    if (listen(listenfd, 10) == -1) {
        printf("listen socket error: %s(errno: %d)\n", strerror(errno), errno);
        return 0;
    }
	return listenfd;

}

int init_reactor(struct reactor *r) {

	if (r == NULL) return -1;

	int epfd = epoll_create(1); //int size
	r->epfd = epfd;

	// fd --> item
	r->head = (struct itemblock*)malloc(sizeof(struct itemblock));
	if (r->head == NULL) {
		close(epfd);
		return -2;
	} 
	memset(r->head, 0, sizeof(struct itemblock));

	r->head->items = malloc(MAX_EPOLL_EVENT * sizeof(struct nitem));
	if (r->head->items == NULL) {
		free(r->head);
		close(epfd);
		return -2;
	}
	memset(r->head->items, 0, (MAX_EPOLL_EVENT * sizeof(struct nitem)));
	
	r->head->next = NULL;
	
	return 0;
}

// accept --> EPOLL
int reactor_loop(int listenfd) {

	struct reactor *R = getInstance();
	
	struct epoll_event events[POLL_SIZE] = {0};
	while (1) {

		int nready = epoll_wait(R->epfd, events, POLL_SIZE, 5);
		if (nready == -1) {
			continue;
		}

		int i = 0;
		for (i = 0;i < nready;i ++) {
			
			struct nitem *item = (struct nitem *)events[i].data.ptr;
			int connfd = item->fd;

			if (connfd == listenfd) { //
				item->acceptcb(listenfd, 0, NULL);
			} else {
			
				if (events[i].events & EPOLLIN) { //
					item->readcb(connfd, 0, NULL);
				
				} 
				if (events[i].events & EPOLLOUT) {
					item->writecb(connfd, 0, NULL);
		
				}
			}
		}

	}

	return 0;
}


int main(int argc, char **argv) 
{
    
 	int  connfd, n;

	int listenfd = init_server(9999);
	nreactor_set_event(listenfd, accept_callback, ACCEPT_CB, NULL);

	//nreactor_set_event(listenfd, accept_callback, read_callback, write_callback);
	

	reactor_loop(listenfd);
	 
    return 0;
}


本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/735782.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

可恶的剪绳子问题

1. 剑指 Offer 14- I. 剪绳子 题目描述&#xff1a;给你一根长度为 n 的绳子&#xff0c;请把绳子剪成整数长度的 m 段&#xff08;m、n都是整数&#xff0c;n>1并且m>1&#xff09;&#xff0c;每段绳子的长度记为 k[0],k[1]…k[m-1] 。请问 k[0]k[1]…*k[m-1] 可能的最…

3.6.共享内存的学习

目录 前言1. 共享内存2. shared memory案例3. 补充知识总结 前言 杜老师推出的 tensorRT从零起步高性能部署 课程&#xff0c;之前有看过一遍&#xff0c;但是没有做笔记&#xff0c;很多东西也忘了。这次重新撸一遍&#xff0c;顺便记记笔记。 本次课程学习精简 CUDA 教程-共享…

基于Qt5 实现的简易慕课爬取程序

基于Qt5 实现的简易慕课爬取程序 一、项目概述二、源代码 一、项目概述 名称&#xff1a;MookScrapy 这个项目主要是使用了 Qt 里面的 QNetworkAccessManager 去下载慕课网站的数据 https://coding.imooc.com&#xff0c;也就是这个网站里面的卡片信息。然后做一定的分析和展示…

【架构设计】酒店预订应用的系统设计架构(如 Airbnb、OYO)

Airbnb、Booking.com 和 OYO 等酒店预订应用程序如何提供从酒店列表到预订再到付款的流畅流程&#xff1f;而且都没有一个小故障&#xff01;在此博客中&#xff0c;您将获得对此的详细解释。由于它们非常庞大&#xff0c;以至于它们需要处理大量的用户流量。所以要管理这些&am…

计算机视觉:卷积核的参数可以通过反向传播学习到吗?

本文重点 在深度学习中,卷积神经网络(Convolutional Neural Networks, CNN)是一种常用的神经网络结构,其中卷积核是CNN的核心组件之一。卷积核是一个小矩阵,用于对输入数据进行卷积操作。卷积操作可以提取输入数据的特征,通过不同的卷积核可以提取不同的特征。 在前面课…

Wireshark TS | 二谈访问网页失败

前言 又一个访问网页失败的案例&#xff0c;该案例来自于 Wireshark sharkfest 2018 - Point And ShootPacket&#xff0c;其中的 Case 2 Cannot see homepage&#xff0c;描述的是来自 OSAKA 的用户抱怨访问一些网站页面不能显示&#xff0c;但是另外一些网站页面可以&#x…

软件测试岗位新标准:ISTQB认证与软件测试工程师职业发展

随着信息技术的飞速发展&#xff0c;软件测试行业也变得越来越重要。软件测试是保证软件质量的关键环节&#xff0c;因此&#xff0c;软件测试工程师的岗位也越来越受到重视。 ISTQB认证成为了衡量软件测试工程师职业能力的标准。 下面领测国际ISTQB考试认证中心就带您了解一下…

depot_tools问题记录 - 执行fetch/gclient命令无响应

文章目录 前言开发环境问题描述问题分析解决方案最后 前言 在研究将Dart dill文件序列化为可读文本时遇到的问题。 开发环境 macOS: 13.4 问题描述 之前使用depot_tools中的fetch/gclient命令还是正常的&#xff0c;今天想实测--no-history参数时突然遇到命令无响应的情况…

Redis 删除 key用 del 和 unlink 有啥区别?

问题 del 和 unlink 有啥区别啊&#xff1f;为什么String类型删除不会做异步删除&#xff1f; 彬彬回答 DEL 和 UNLINK 都是同步的释放 key 对象&#xff0c;区别是怎么释放后面的 value 对象 DEL 每次都是同步释放 value 部分&#xff0c;如果 value 很大&#xff0c;例如一…

Openssh升级方法详解

项目组linux服务器被绿盟扫描出openssh 1.0.2版本有漏洞&#xff0c;需要升级到7.5版本&#xff0c;以下是升级过程&#xff1a; 第一步 安装Telnet服务 先下Openssh软件包 看你需要什么版本http://ftp.openbsd.org/pub/OpenBSD/OpenSSH/portable/ 1.查看当前的ssh服务版本 …

【前端】网页开发精讲与实战 HTML Day 2

&#x1f680;Write In Front&#x1f680; &#x1f4dd;个人主页&#xff1a;令夏二十三 &#x1f381;欢迎各位→点赞&#x1f44d; 收藏⭐️ 留言&#x1f4dd; &#x1f4e3;系列专栏&#xff1a;前端 &#x1f4ac;总结&#xff1a;希望你看完之后&#xff0c;能对你有…

看完这篇 教你玩转渗透测试靶机Vulnhub——The Planets:Mercury

Vulnhub靶机The Planets:Mercury渗透测试详解 Vulnhub靶机介绍&#xff1a;Vulnhub靶机下载&#xff1a;Vulnhub靶机安装&#xff1a;Vulnhub靶机漏洞详解&#xff1a;①&#xff1a;信息收集&#xff1a;②&#xff1a;漏洞发现&#xff1a;③&#xff1a;SSH登入&#xff1a;…

架构师进阶之路 - 微服务怎么划分

目录 微服务划分目标 业务、技术、团队导向规划服务 领域检查 依赖DAG检查 分布式事务检查 性能分布检查 稳定&#xff08;易变&#xff09;性检查 调用链检查 微服务划分目标 我们常说服务的合理划分是微服务成功的重中之重&#xff0c;一个合理的服务划分应该符合一下…

SQL中如何用快照,恢复被误删的数据?

什么是快照 数据库快照是sql server 2005的一个新功能。MSDN上对它的定义是&#xff1a; 数据库快照是数据库&#xff08;称为“源数据库”&#xff09;的只读静态视图。在创建时&#xff0c;每个数据库快照在事务上都与源数据库一致。在创建数据库快照时&#xff0c;源数据库…

THREE.JS镜头随鼠标晃动效果

为了让动画更灵活并且简单 借助gsap让其具有更多可能&#xff0c;在未来更容易扩充其他动效 gsap Dom跟随鼠标移动 gsap.quickTo() 首先要监听鼠标移动&#xff0c;并且将移动的值转换到 -1 和 1 之间 方便处理 private mousemove(e: MouseEvent) {const x (e.clientX / inner…

spring10-配置数据元

他的作用是提高我们程序性能的&#xff1a;我们怎么用呢&#xff01;先创建我们数据源对象&#xff1a;创建初始化对象之后&#xff0c;创建数据源对象之后&#xff0c;会给我们一些初始化资源。 使用完后还给他 &#xff0c;这是一种环保的思想。 常见的数据源&#xff1a;底…

干货-卷起来,企业级web自动化测试实战落地(二)

目录&#xff1a;导读 前言一、Python编程入门到精通二、接口自动化项目实战三、Web自动化项目实战四、App自动化项目实战五、一线大厂简历六、测试开发DevOps体系七、常用自动化测试工具八、JMeter性能测试九、总结&#xff08;尾部小惊喜&#xff09; 前言 WebDriver的基本使…

毫米波雷达 TI IWR1443 测试官方程序(Out Of Box Demo)

IWR1443 windows 文章目录 1、准备工作1.1、mmWave SDK1.2、Code Composer Studio&#xff08;CCS&#xff09;1.3、Uniflash1.4、TI Cloud Agent 2、导入工程3、烧录3.1、先将 IWR1443 调到 Flashing Mode3.2、使用 UniFlash 软件 4、运行GUI4.1、IWR1443 调到 Functional Mo…

【计算机组成与体系结构Ⅰ】实验0 Logisim 入门实验

一、实验目的 1&#xff1a;掌握加减法器工作原理。 2&#xff1a;能够设计出一个n位加减法器。 3&#xff1a;熟悉Logisim软件使用。 二、实验环境 &#xff08;1&#xff09;Logisim 2.7.1 &#xff08;2&#xff09;Microsoft Windows 10 三、实验内容 1&#xff1a;设…

FastAPI中如何正确理解和使用:async和await

1 缘起 项目需要, 技术选型使用FastAPI。 开发过程中,遇到需要异步操作的场景, 查阅相关FastAPI异步信息的过程中,发现了async和await组合技, 通过阅读官方文档和实际测试,发现,async和await并不是传统意义上的异步(如线程池异步执行任务), async和await的融合技是应…