文章目录
- 1、网络IO模型
- 1.1、阻塞IO(blocking IO)
- 1.2、非阻塞IO(non-blocking IO)
- 1.3、多路复用 IO(IO multiplexing)
- 1.4、异步 IO(Asynchronous I/O)
- 1.5、信号驱动 IO(signal driven I/O, SIGIO)
- 1.6、服务器模型 Reactor 与 Proactor
- Reactor 模型
- 同步 I/O 模拟 Proactor 模型
- 2、select、poll、epoll实现
- 3、reactor模型实现
1、网络IO模型
1.1、阻塞IO(blocking IO)
在linux 中,默认情况所有的socket都是blocking,一个典型的读操作流程
当用户进程调用了 read 这个系统调用,kernel 就开始了 IO 的第一个阶段:准备数据。对于network io 来说,很多时候数据在一开始还没有到达(比如,还没有收到一个完整的数据包),这个时候 kernel 就要等待足够的数据到来。而在用户进程这边,整个进程会被阻塞。当 kernel一直等到数据准备好了,它就会将数据从 kernel 中拷贝到用户内存,然后 kernel 返回结果,用户进程才解除 block 的状态,重新运行起来。所以,blocking IO 的特点就是在 IO 执行的两个阶段(等待数据和拷贝数据两个阶段)都被block 了。几乎所有的程序员第一次接触到的网络编程都是从 listen()、send()、recv() 等接口开始的,这些接口都是阻塞型的。使用这些接口可以很方便的构建服务器/客户机的模型。下面是一个简单地“一问一答”的服务器。
1.2、非阻塞IO(non-blocking IO)
Linux 下,可以通过设置 socket 使其变为 non-blocking。当对一个 non-blocking socket 执行读
操作时,流程是这个样子:
当用户进程发出 read 操作时,如果 kernel 中的数据还没有准备好,那
么它并不会 block 用户进程,而是立刻返回一个 error。从用户进程角度讲 ,它发起一个read 操作后,并不需要等待,而是马上就得到了一个结果。用户进程判断结果是一个 error时,它就知道数据还没有准备好,于是它可以再次发送 read 操作。一旦 kernel 中的数据准备好了,并且又再次收到了用户进程的 system call,那么它马上就将数据拷贝到了用户内存,然后返回,所以,在非阻塞式 IO 中,用户进程其实是需要不断的主动询问 kernel数据准备好了没有。
在非阻塞状态下,recv() 接口在被调用后立即返回,返回值代表了不同的含义。如在本例中,非阻塞的接口相比于阻塞型接口的显著差异在于,在被调用之后立即返回。使用如下的函数可以将某句柄 fd 设为非阻塞状态。
1.3、多路复用 IO(IO multiplexing)
IO multiplexing 这个词可能有点陌生,但是提到 select/epoll,大概就都能明白了。有些地方也称这种 IO 方式为事件驱动 IO(event driven IO)。我们都知道,select/epoll 的好处就在于单个 process 就可以同时处理多个网络连接的 IO。它的基本原理就是 select/epoll 这个 function会不断的轮询所负责的所有 socket,当某个 socket 有数据到达了,就通知用户进程。
当用户进程调用了 select,那么整个进程会被 block,而同时,kernel 会“监视”所有 select 负责的 socket,当任何一个 socket 中的数据准备好了,select 就会返回。这个时候用户进程再调用 read 操作,将数据从 kernel 拷贝到用户进程。这个图和 blocking IO 的图其实并没有太大的不同,事实上还更差一些。因为这里需要使用两个系统调用(select 和 read),而 blocking IO 只调用了一个系统调用(read)。但是使用 select 以后最大的优势是用户可以在一个线程内同时处理多个 socket 的 IO 请求。用户可以注册多个 socket,然后不断地调用 select 读取被激活的 socket,即可达到在同一个线程内同时处理多个 IO 请求的目的。而在同步阻塞模型中,必须通过多线程的方式才能达到这个目的。(多说一句:所以,如果处理的连接数不是很高的话,使用select/epoll 的 web server 不一定比使用 multi-threading + blocking IO 的 web server 性能更好,可能延迟还更大。select/epoll 的优势并不是对于单个连接能处理得更快,而是在于能处理更多的连接。)
1.4、异步 IO(Asynchronous I/O)
Linux 下的 asynchronous IO 用在磁盘 IO 读写操作,不用于网络 IO,从内核 2.6 版本才开始引入。先看一下它的流程
用户进程发起 read 操作之后,立刻就可以开始去做其它的事。而另一方面,从 kernel的角度,当它受到一个 asynchronous read 之后,首先它会立刻返回,所以不会对用户进程产生任何 block。然后,kernel 会等待数据准备完成,然后将数据拷贝到用户内存,当这一切都完成之后,kernel 会给用户进程发送一个 signal,告诉它 read 操作完成了。
1.5、信号驱动 IO(signal driven I/O, SIGIO)
首先我们允许套接口进行信号驱动 I/O,并安装一个信号处理函数,进程继续运行并不阻塞。当数据准备好时,进程会收到一个 SIGIO 信号,可以在信号处理函数中调用 I/O 操作函数处理数据。当数据报准备好读取时,内核就为该进程产生一个 SIGIO 信号。我们随后既可以在信号处理函数中调用 read 读取数据报,并通知主循环数据已准备好待处理,也可以立即通知主循环,让它来读取数据报。无论如何处理 SIGIO 信号,这种模型的优势在于等待数据报到达(第一阶段)期间,进程可以继续执行,不被阻塞。免去了 select 的阻塞与轮询,当有活跃套接字时,由注册的 handler 处理。
1.6、服务器模型 Reactor 与 Proactor
对高并发编程,网络连接上的消息处理,可以分为两个阶段:等待消息准备好、消息处理。当使用默认的阻塞套接字时(例如上面提到的 1 个线程捆绑处理 1 个连接),往往是把这两个阶段合而为一,这样操作套接字的代码所在的线程就得睡眠来等待消息准备好,这导致了高并发下线程会频繁的睡眠、唤醒,从而影响了 CPU 的使用效率。高并发编程方法当然就是把两个阶段分开处理。即,等待消息准备好的代码段,与处理消息的代码段是分离的。当然,这也要求套接字必须是非阻塞的,否则,处理消息的代码段很容易导致条件不满足时,所在线程又进入了睡眠等待阶段。那么问题来了,等待消息准备好这个阶段怎么实现?它毕竟还是等待,这意味着线程还是要睡眠的!解决办法就是,线程主动查询,或者让 1 个线程为所有连接而等待!这就是 IO 多路复用了。多路复用就是处理等待消息准备好这件事的,但它可以同时处理多个连接!它也可能“等待”,所以它也会导致线程睡眠,然而这不要紧,因为它一对多、它可以监控所有连接。这样,当我们的线程被唤醒执行时,就一定是有一些连接准备好被我们的代码执行了。作为一个高性能服务器程序通常需要考虑处理三类事件: I/O 事件,定时事件及信号。两种高效的事件处理模型:Reactor 和 Proactor。
Reactor 模型
首先来回想一下普通函数调用的机制:程序调用某函数,函数执行,程序等待,函数将结果和控制权返回给程序,程序继续处理。Reactor 释义“反应堆”,是一种事件驱动机制。和普通函数调用的不同之处在于:应用程序不是主动的调用某个 API 完成处理,而是恰恰相反,Reactor 逆置了事件处理流程,应用程序需要提供相应的接口并注册到 Reactor 上,如果相应的时间发生,Reactor 将主动调用应用程序注册的接口,这些接口又称为“回调函数”。Reactor 模式是处理并发 I/O 比较常见的一种模式,用于同步 I/O,中心思想是将所有要处理的 I/O 事件注册到一个中心 I/O 多路复用器上,同时主线程/进程阻塞在多路复用器上;一旦有 I/O 事件到来或是准备就绪(文件描述符或 socket 可读、写),多路复用器返回并将事先注册的相应 I/O 事件分发到对应的处理器中。
Reactor 模式是处理并发 I/O 比较常见的一种模式,用于同步 I/O,中心思想是将所有要处理的 I/O 事件注册到一个中心 I/O 多路复用器上,同时主线程/进程阻塞在多路复用器上;一旦有 I/O 事件到来或是准备就绪(文件描述符或 socket 可读、写),多路复用器返回并将事先注册的相应 I/O 事件分发到对应的处理器中。
同步 I/O 模拟 Proactor 模型
- 主线程往 epoll 内核事件表中注册 socket 上的读就绪事件。
- 主线程调用 epoll_wait 等待 socket 上有数据可读。
- 当 socket 上有数据可读时,epoll_wait 通知主线程。主线程从 socket 循环读取数据,直到没有更多数据可读,然后将读取到的数据封装成一个请求对象并插入请求队列。
- 睡眠在请求队列上的某个工作线程被唤醒,它获得请求对象并处理客户请求,然后往 epoll 内核事件表中注册 socket 上的写就绪事件。
- 主线程调用 epoll_wait 等待 socket 可写。
- 当 socket 可写时,epoll_wait 通知主线程。主线程往 socket 上写入服务器处理客户请求的结果。
相同点和不同点:
两个模式的相同点,都是对某个 IO 事件的事件通知(即告诉某个模块,这个 IO 操作可以进行或已经完成)。在结构上两者也有相同点:demultiplexor 负责提交 IO 操作(异步)、查询设备是否可操作(同步),然后当条件满足时,就回调注册处理函数。不同点在于,异步情况下(Proactor),当回调注册的处理函数时,表示 IO 操作已经完成;同步情况下(Reactor),回调注册的处理函数时,表示 IO 设备可以进行某个操作(can read or can write),注册的处理函数这个时候开始提交操作。
2、select、poll、epoll实现
#include <errno.h>
#include <netinet/in.h>
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <sys/socket.h>
#include <sys/types.h>
#include <unistd.h>
#include <sys/poll.h>
#include <sys/epoll.h>
#include <pthread.h>
#define MAXLNE 4096
#define POLL_SIZE 1024
//8m * 4G = 128 , 512
//C10k
void *client_routine(void *arg) { //
int connfd = *(int *)arg;
char buff[MAXLNE];
while (1) {
int n = recv(connfd, buff, MAXLNE, 0);
if (n > 0) {
buff[n] = '\0';
printf("recv msg from client: %s\n", buff);
send(connfd, buff, n, 0);
} else if (n == 0) {
close(connfd);
break;
}
}
return NULL;
}
int main(int argc, char **argv)
{
int listenfd, connfd, n;
struct sockaddr_in servaddr;
char buff[MAXLNE];
if ((listenfd = socket(AF_INET, SOCK_STREAM, 0)) == -1) {
printf("create socket error: %s(errno: %d)\n", strerror(errno), errno);
return 0;
}
memset(&servaddr, 0, sizeof(servaddr));
servaddr.sin_family = AF_INET;
servaddr.sin_addr.s_addr = htonl(INADDR_ANY);
servaddr.sin_port = htons(9999);
if (bind(listenfd, (struct sockaddr *)&servaddr, sizeof(servaddr)) == -1) {
printf("bind socket error: %s(errno: %d)\n", strerror(errno), errno);
return 0;
}
if (listen(listenfd, 10) == -1) {
printf("listen socket error: %s(errno: %d)\n", strerror(errno), errno);
return 0;
}
#if 0
struct sockaddr_in client;
socklen_t len = sizeof(client);
if ((connfd = accept(listenfd, (struct sockaddr *)&client, &len)) == -1) {
printf("accept socket error: %s(errno: %d)\n", strerror(errno), errno);
return 0;
}
printf("========waiting for client's request========\n");
while (1) {
n = recv(connfd, buff, MAXLNE, 0);
if (n > 0) {
buff[n] = '\0';
printf("recv msg from client: %s\n", buff);
send(connfd, buff, n, 0);
} else if (n == 0) {
close(connfd);
}
//close(connfd);
}
#elif 0
printf("========waiting for client's request========\n");
while (1) {
struct sockaddr_in client;
socklen_t len = sizeof(client);
if ((connfd = accept(listenfd, (struct sockaddr *)&client, &len)) == -1) {
printf("accept socket error: %s(errno: %d)\n", strerror(errno), errno);
return 0;
}
n = recv(connfd, buff, MAXLNE, 0);
if (n > 0) {
buff[n] = '\0';
printf("recv msg from client: %s\n", buff);
send(connfd, buff, n, 0);
} else if (n == 0) {
close(connfd);
}
//close(connfd);
}
#elif 0
while (1) {
struct sockaddr_in client;
socklen_t len = sizeof(client);
if ((connfd = accept(listenfd, (struct sockaddr *)&client, &len)) == -1) {
printf("accept socket error: %s(errno: %d)\n", strerror(errno), errno);
return 0;
}
pthread_t threadid;
pthread_create(&threadid, NULL, client_routine, (void*)&connfd);
}
#elif 0
//
fd_set rfds, rset, wfds, wset;
FD_ZERO(&rfds);
FD_SET(listenfd, &rfds);
FD_ZERO(&wfds);
int max_fd = listenfd;
while (1) {
rset = rfds;
wset = wfds;
int nready = select(max_fd+1, &rset, &wset, NULL, NULL);
if (FD_ISSET(listenfd, &rset)) { //
struct sockaddr_in client;
socklen_t len = sizeof(client);
if ((connfd = accept(listenfd, (struct sockaddr *)&client, &len)) == -1) {
printf("accept socket error: %s(errno: %d)\n", strerror(errno), errno);
return 0;
}
FD_SET(connfd, &rfds);
if (connfd > max_fd) max_fd = connfd;
if (--nready == 0) continue;
}
int i = 0;
for (i = listenfd+1;i <= max_fd;i ++) {
if (FD_ISSET(i, &rset)) { //
n = recv(i, buff, MAXLNE, 0);
if (n > 0) {
buff[n] = '\0';
printf("recv msg from client: %s\n", buff);
FD_SET(i, &wfds);
//reactor
//send(i, buff, n, 0);
} else if (n == 0) { //
FD_CLR(i, &rfds);
//printf("disconnect\n");
close(i);
}
if (--nready == 0) break;
} else if (FD_ISSET(i, &wset)) {
send(i, buff, n, 0);
FD_SET(i, &rfds);
}
}
}
#elif 0
struct pollfd fds[POLL_SIZE] = {0};
fds[0].fd = listenfd;
fds[0].events = POLLIN;
int max_fd = listenfd;
int i = 0;
for (i = 1;i < POLL_SIZE;i ++) {
fds[i].fd = -1;
}
while (1) {
int nready = poll(fds, max_fd+1, -1);
if (fds[0].revents & POLLIN) {
struct sockaddr_in client;
socklen_t len = sizeof(client);
if ((connfd = accept(listenfd, (struct sockaddr *)&client, &len)) == -1) {
printf("accept socket error: %s(errno: %d)\n", strerror(errno), errno);
return 0;
}
printf("accept \n");
fds[connfd].fd = connfd;
fds[connfd].events = POLLIN;
if (connfd > max_fd) max_fd = connfd;
if (--nready == 0) continue;
}
//int i = 0;
for (i = listenfd+1;i <= max_fd;i ++) {
if (fds[i].revents & POLLIN) {
n = recv(i, buff, MAXLNE, 0);
if (n > 0) {
buff[n] = '\0';
printf("recv msg from client: %s\n", buff);
send(i, buff, n, 0);
} else if (n == 0) { //
fds[i].fd = -1;
close(i);
}
if (--nready == 0) break;
}
}
}
#else
//poll/select -->
// epoll_create
// epoll_ctl(ADD, DEL, MOD)
// epoll_wait
int epfd = epoll_create(1); //int size
struct epoll_event events[POLL_SIZE] = {0};
struct epoll_event ev;
ev.events = EPOLLIN;
ev.data.fd = listenfd;
epoll_ctl(epfd, EPOLL_CTL_ADD, listenfd, &ev);
while (1) {
int nready = epoll_wait(epfd, events, POLL_SIZE, 5);
if (nready == -1) {
continue;
}
int i = 0;
for (i = 0;i < nready;i ++) {
int clientfd = events[i].data.fd;
if (clientfd == listenfd) {
struct sockaddr_in client;
socklen_t len = sizeof(client);
if ((connfd = accept(listenfd, (struct sockaddr *)&client, &len)) == -1) {
printf("accept socket error: %s(errno: %d)\n", strerror(errno), errno);
return 0;
}
printf("accept\n");
ev.events = EPOLLIN;
ev.data.fd = connfd;
epoll_ctl(epfd, EPOLL_CTL_ADD, connfd, &ev);
} else if (events[i].events & EPOLLIN) {
n = recv(clientfd, buff, MAXLNE, 0);
if (n > 0) {
buff[n] = '\0';
printf("recv msg from client: %s\n", buff);
send(clientfd, buff, n, 0);
} else if (n == 0) { //
ev.events = EPOLLIN;
ev.data.fd = clientfd;
epoll_ctl(epfd, EPOLL_CTL_DEL, clientfd, &ev);
close(clientfd);
}
}
}
}
#endif
close(listenfd);
return 0;
}
3、reactor模型实现
#include <errno.h>
#include <netinet/in.h>
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <sys/socket.h>
#include <sys/types.h>
#include <unistd.h>
#include <sys/poll.h>
#include <sys/epoll.h>
#include <pthread.h>
#define MAXLNE 4096
#define POLL_SIZE 1024
#define BUFFER_LENGTH 1024
#define MAX_EPOLL_EVENT 1024
#define NOSET_CB 0
#define READ_CB 1
#define WRITE_CB 2
#define ACCEPT_CB 3
typedef int NCALLBACK(int fd, int event, void *arg);
struct nitem { // fd
int fd;
int status;
int events;
void *arg;
#if 0
NCALLBACK callback;
#else
NCALLBACK *readcb; // epollin
NCALLBACK *writecb; // epollout
NCALLBACK *acceptcb; // epollin
#endif
unsigned char sbuffer[BUFFER_LENGTH]; //
int slength;
unsigned char rbuffer[BUFFER_LENGTH];
int rlength;
};
struct itemblock {
struct itemblock *next;
struct nitem *items;
};
struct reactor {
int epfd;
struct itemblock *head;
};
int init_reactor(struct reactor *r);
int read_callback(int fd, int event, void *arg);
int write_callback(int fd, int event, void *arg);
int accept_callback(int fd, int event, void *arg);
struct reactor *instance = NULL;
struct reactor *getInstance(void) { //singleton
if (instance == NULL) {
instance = malloc(sizeof(struct reactor));
if (instance == NULL) return NULL;
memset(instance, 0, sizeof(struct reactor));
if (0 > init_reactor(instance)) {
free(instance);
return NULL;
}
}
return instance;
}
int nreactor_set_event(int fd, NCALLBACK cb, int event, void *arg) {
struct reactor *r = getInstance();
struct epoll_event ev = {0};
if (event == READ_CB) {
r->head->items[fd].fd = fd;
r->head->items[fd].readcb = cb;
r->head->items[fd].arg = arg;
ev.events = EPOLLIN;
} else if (event == WRITE_CB) {
r->head->items[fd].fd = fd;
r->head->items[fd].writecb = cb;
r->head->items[fd].arg = arg;
ev.events = EPOLLOUT;
} else if (event == ACCEPT_CB) {
r->head->items[fd].fd = fd;
r->head->items[fd].acceptcb = cb;
r->head->items[fd].arg = arg;
ev.events = EPOLLIN;
}
ev.data.ptr = &r->head->items[fd];
if (r->head->items[fd].events == NOSET_CB) {
if (epoll_ctl(r->epfd, EPOLL_CTL_ADD, fd, &ev) < 0) {
printf("epoll_ctl EPOLL_CTL_ADD failed, %d\n", errno);
return -1;
}
r->head->items[fd].events = event;
} else if (r->head->items[fd].events != event) {
if (epoll_ctl(r->epfd, EPOLL_CTL_MOD, fd, &ev) < 0) {
printf("epoll_ctl EPOLL_CTL_MOD failed\n");
return -1;
}
r->head->items[fd].events = event;
}
return 0;
}
int nreactor_del_event(int fd, NCALLBACK cb, int event, void *arg) {
struct reactor *r = getInstance();
struct epoll_event ev = {0};
ev.data.ptr = arg;
epoll_ctl(r->epfd, EPOLL_CTL_DEL, fd, &ev);
r->head->items[fd].events = 0;
return 0;
}
int write_callback(int fd, int event, void *arg) {
struct reactor *R = getInstance();
unsigned char *sbuffer = R->head->items[fd].sbuffer;
int length = R->head->items[fd].slength;
int ret = send(fd, sbuffer, length, 0);
if (ret < length) {
nreactor_set_event(fd, write_callback, WRITE_CB, NULL);
} else {
nreactor_set_event(fd, read_callback, READ_CB, NULL);
}
return 0;
}
// 5k qps
int read_callback(int fd, int event, void *arg) {
struct reactor *R = getInstance();
unsigned char *buffer = R->head->items[fd].rbuffer;
#if 0 //ET
int idx = 0, ret = 0;
while (idx < BUFFER_LENGTH) {
ret = recv(fd, buffer+idx, BUFFER_LENGTH-idx, 0);
if (ret == -1) {
break;
} else if (ret > 0) {
idx += ret;
} else {// == 0
break;
}
}
if (idx == BUFFER_LENGTH && ret != -1) {
nreactor_set_event(fd, read_callback, READ_CB, NULL);
} else if (ret == 0) {
nreactor_set_event
//close(fd);
} else {
nreactor_set_event(fd, write_callback, WRITE_CB, NULL);
}
#else //LT
int ret = recv(fd, buffer, BUFFER_LENGTH, 0);
if (ret == 0) { // fin
nreactor_del_event(fd, NULL, 0, NULL);
close(fd);
} else if (ret > 0) {
unsigned char *sbuffer = R->head->items[fd].sbuffer;
memcpy(sbuffer, buffer, ret);
R->head->items[fd].slength = ret;
printf("readcb: %s\n", sbuffer);
nreactor_set_event(fd, write_callback, WRITE_CB, NULL);
}
#endif
}
// web server
// ET / LT
int accept_callback(int fd, int event, void *arg) {
int connfd;
struct sockaddr_in client;
socklen_t len = sizeof(client);
if ((connfd = accept(fd, (struct sockaddr *)&client, &len)) == -1) {
printf("accept socket error: %s(errno: %d)\n", strerror(errno), errno);
return 0;
}
nreactor_set_event(connfd, read_callback, READ_CB, NULL);
}
int init_server(int port) {
int listenfd;
struct sockaddr_in servaddr;
char buff[MAXLNE];
if ((listenfd = socket(AF_INET, SOCK_STREAM, 0)) == -1) {
printf("create socket error: %s(errno: %d)\n", strerror(errno), errno);
return 0;
}
memset(&servaddr, 0, sizeof(servaddr));
servaddr.sin_family = AF_INET;
servaddr.sin_addr.s_addr = htonl(INADDR_ANY);
servaddr.sin_port = htons(port);
if (bind(listenfd, (struct sockaddr *)&servaddr, sizeof(servaddr)) == -1) {
printf("bind socket error: %s(errno: %d)\n", strerror(errno), errno);
return 0;
}
if (listen(listenfd, 10) == -1) {
printf("listen socket error: %s(errno: %d)\n", strerror(errno), errno);
return 0;
}
return listenfd;
}
int init_reactor(struct reactor *r) {
if (r == NULL) return -1;
int epfd = epoll_create(1); //int size
r->epfd = epfd;
// fd --> item
r->head = (struct itemblock*)malloc(sizeof(struct itemblock));
if (r->head == NULL) {
close(epfd);
return -2;
}
memset(r->head, 0, sizeof(struct itemblock));
r->head->items = malloc(MAX_EPOLL_EVENT * sizeof(struct nitem));
if (r->head->items == NULL) {
free(r->head);
close(epfd);
return -2;
}
memset(r->head->items, 0, (MAX_EPOLL_EVENT * sizeof(struct nitem)));
r->head->next = NULL;
return 0;
}
// accept --> EPOLL
int reactor_loop(int listenfd) {
struct reactor *R = getInstance();
struct epoll_event events[POLL_SIZE] = {0};
while (1) {
int nready = epoll_wait(R->epfd, events, POLL_SIZE, 5);
if (nready == -1) {
continue;
}
int i = 0;
for (i = 0;i < nready;i ++) {
struct nitem *item = (struct nitem *)events[i].data.ptr;
int connfd = item->fd;
if (connfd == listenfd) { //
item->acceptcb(listenfd, 0, NULL);
} else {
if (events[i].events & EPOLLIN) { //
item->readcb(connfd, 0, NULL);
}
if (events[i].events & EPOLLOUT) {
item->writecb(connfd, 0, NULL);
}
}
}
}
return 0;
}
int main(int argc, char **argv)
{
int connfd, n;
int listenfd = init_server(9999);
nreactor_set_event(listenfd, accept_callback, ACCEPT_CB, NULL);
//nreactor_set_event(listenfd, accept_callback, read_callback, write_callback);
reactor_loop(listenfd);
return 0;
}