1、什么是Reactor模型
Reactor意思是“反应堆”,是一种事件驱动机制。
和普通函数调用的不同之处在于:应用程序不是主动的调用某个 API 完成处理,而是恰恰相反,Reactor逆置了事件处理流程,应用程序需要提供相应的接口并注册到 Reactor 上,如果相应的时间发生,Reactor将主动调用应用程序注册的接口,这些接口又称为“回调函数”。
对于刚开始接触这个机制,个人感觉翻译成“感应器”可能会更好理解一点,因为注册在Reactor上的函数就像感应器一样,只要有事件到达,就会触发它开始工作。
Reactor 模式是编写高性能网络服务器的必备技术之一。
2、Reactor模型的优点
- 响应快,不必为单个同步时间所阻塞,虽然 Reactor 本身依然是同步的;
- 编程相对简单,可以最大程度的避免复杂的多线程及同步问题,并且避免了多线程/进程的切换开销;
- 可扩展性强,可以方便的通过增加 Reactor 实例个数来充分利用 CPU 资源;
- 可复用性高,reactor 框架本身与具体事件处理逻辑无关,具有很高的复用性;
Reactor 模型开发效率上比起直接使用 IO 复用要高,它通常是单线程的,设计目标是希望单线程使用一颗 CPU 的全部资源。优点即每个事件处理中很多时候可以不考虑共享资源的互斥访问。可是缺点也是明显的,现在的硬件发展,已经不再遵循摩尔定律,CPU 的频率受制于材料的限制不再有大的提升,而改为是从核数的增加上提升能力,当程序需要使用多核资源时,Reactor 模型就会悲剧 , 为什么呢?如果程序业务很简单,例如只是简单的访问一些提供了并发访问的服务,就可以直接开启多个反应堆,每个反应堆对应一颗 CPU 核心,这些反应堆上跑的请求互不相关,这是完全可以利用多核的。例如 Nginx 这样的 http 静态服务器。
3、通过对网络编程(epoll)代码的优化,深入理解Reactor模型
1、epoll的普通版本,根据fd类型(listen_fd和client_fd)分为两大类处理。
如果是listen_fd,调用accept处理连接请求;
如果是client_fd,调用recv或者send处理数据。
代码实现:
#include <stdio.h>
#include <string.h>
#include <stdlib.h>
#include <unistd.h>
#include <netinet/tcp.h>
#include <arpa/inet.h>
#include <sys/epoll.h>
#include <errno.h>
int main(int argc, char* argv[])
{
if (argc < 2)
return -1;
int port = atoi(argv[1]); //字符串转换为整型
int sockfd = socket(AF_INET, SOCK_STREAM, 0);
if (sockfd < 0)
return -1;
struct sockaddr_in addr;
memset(&addr, 0, sizeof(struct sockaddr_in)); //新申请的空间一定要置零
addr.sin_family = AF_INET;
addr.sin_port = htons(port); //转换成网络字节序
addr.sin_addr.s_addr = INADDR_ANY;
if (bind(sockfd, (struct sockaddr*)&addr, sizeof(struct sockaddr_in)) < 0)
return -2;
if (listen(sockfd, 5) < 0)
return -3;
//epoll
int epfd = epoll_create(1); //创建epoll,相当于红黑树的根节点
struct epoll_event ev, events[1024] = {0}; //events相当于就绪队列,一次性可以处理的集合
ev.events = EPOLLIN;
ev.data.fd = sockfd;
epoll_ctl(epfd, EPOLL_CTL_ADD, sockfd, &ev); //将ev节点加入到epoll,此处的sockfd参数随便添加没有意义,需要操作系统索引和它有对应的句柄
while (1)
{
int nready = epoll_wait(epfd, events, 1024, -1); //第四个参数-1表示一直等待,有事件才返回
if (nready < 1) //没有事件触发,nready代表触发事件的个数
break;
int i = 0;
for (i = 0; i < nready; i++) //epoll_wait带出的就绪fd包括两大类:1、处理连接的listen_fd,2、处理数据的send和recv
{
if (events[i].data.fd == sockfd) //如果是listenfd,就将它加入到epoll
{
struct sockaddr_in client_addr;
memset(&client_addr, 0, sizeof(struct sockaddr_in));
socklen_t client_len = sizeof(client_addr);
int client_fd = accept(sockfd, (struct sockaddr *)&client_addr, &client_len);
if (client_fd <= 0)
continue;
char str[INET_ADDRSTRLEN] = {0};
printf("recv from IP = %s ,at Port= %d\n", inet_ntop(AF_INET, &client_addr.sin_addr, str, sizeof(str)), ntohs(client_addr.sin_port));
ev.events = EPOLLIN | EPOLLET; //epoll默认是LT模式
ev.data.fd = client_fd;
epoll_ctl(epfd, EPOLL_CTL_ADD, client_fd, &ev);
}
else //fd进行读写操作
{
//对fd的读写操作没有分开
int client_fd = events[i].data.fd;
char buf[1024] = {0};
int ret = recv(client_fd, buf, 1024, 0);
if (ret < 0)
{
if (errno == EAGAIN || errno == EWOULDBLOCK)
{
//
}
else
{
//
}
printf("ret < 0,断开连接:%d\n", client_fd);
close(client_fd);
ev.events = EPOLLIN;
ev.data.fd = client_fd;
epoll_ctl(epfd, EPOLL_CTL_DEL, client_fd, &ev);
}
else if (ret == 0) //接收到了客户端发来的断开连接请求FIN后,没有及时调用close函数,进入了CLOSE _WAIT状态
{
printf("ret = 0,断开连接:%d\n", client_fd);
close(client_fd);
ev.events = EPOLLIN;
ev.data.fd = client_fd;
epoll_ctl(epfd, EPOLL_CTL_DEL, client_fd, &ev); //close关闭连接后要将它既是从epoll中删除
}
else
{
printf("Recv: %s, %d Bytes\n", buf, ret);
}
//区分fd的读写操作,即recv和send
if (events[i].events & EPOLLIN)
{
int client_fd = events[i].data.fd;
char buf[1024] = {0};
int ret = recv(client_fd, buf, 1024, 0);
if (ret < 0)
{
if (errno == EAGAIN || errno == EWOULDBLOCK)
{
//...
}
else
{
//...
}
printf("ret < 0,断开连接:%d\n", client_fd);
close(client_fd);
ev.events = EPOLLIN;
ev.data.fd = client_fd;
epoll_ctl(epfd, EPOLL_CTL_DEL, client_fd, &ev);
}
else if (ret == 0) //接收到了客户端发来的断开连接请求FIN后,没有及时调用close函数,进入了CLOSE _WAIT状态
{
printf("ret = 0,断开连接:%d\n", client_fd);
close(client_fd);
ev.events = EPOLLIN;
ev.data.fd = client_fd;
epoll_ctl(epfd, EPOLL_CTL_DEL, client_fd, &ev); //close关闭连接后要将它既是从epoll中删除
}
else
{
printf("Recv: %s, %d Bytes\n", buf, ret);
}
}
if (events[i].events & EPOLLOUT) //为什么需要判断EPOLLOUT,而不是直接else?因为一个fd有可能同时存在可读和可写事件的
{
int client_fd = events[i].data.fd;
char buf[1024] = {0};
send(client_fd, buf, sizeof(buf), 0);
}
}
}
}
return 0;
}
2、epoll的优化版本,根据事件类型(读和写)分为两大类处理。
代码实现:
for (i = 0; i < nready; i++) //epoll_wait带出的就绪fd包括两大类:1、处理连接的listen_fd,2、处理数据的send和recv
{
//区分fd的读写操作
if (events[i].events & EPOLLIN)
{
if (events[i].data.fd == sockfd) //如果是listenfd,就将它加入到epoll
{
struct sockaddr_in client_addr;
memset(&client_addr, 0, sizeof(struct sockaddr_in));
socklen_t client_len = sizeof(client_addr);
int client_fd = accept(sockfd, (struct sockaddr *)&client_addr, &client_len);
if (client_fd <= 0)
continue;
char str[INET_ADDRSTRLEN] = {0};
printf("recv from IP = %s ,at Port= %d\n", inet_ntop(AF_INET, &client_addr.sin_addr, str, sizeof(str)), ntohs(client_addr.sin_port));
ev.events = EPOLLIN | EPOLLET; //epoll默认是LT模式
ev.data.fd = client_fd;
epoll_ctl(epfd, EPOLL_CTL_ADD, client_fd, &ev);
}
else
{
int client_fd = events[i].data.fd;
char buf[1024] = {0};
int ret = recv(client_fd, buf, 1024, 0);
if (ret < 0)
{
if (errno == EAGAIN || errno == EWOULDBLOCK)
{
//...
}
else
{
//...
}
printf("ret < 0,断开连接:%d\n", client_fd);
close(client_fd);
ev.events = EPOLLIN;
ev.data.fd = client_fd;
epoll_ctl(epfd, EPOLL_CTL_DEL, client_fd, &ev);
}
else if (ret == 0) //接收到了客户端发来的断开连接请求FIN后,没有及时调用close函数,进入了CLOSE _WAIT状态
{
printf("ret = 0,断开连接:%d\n", client_fd);
close(client_fd);
ev.events = EPOLLIN;
ev.data.fd = client_fd;
epoll_ctl(epfd, EPOLL_CTL_DEL, client_fd, &ev); //close关闭连接后要将它既是从epoll中删除
}
else
{
printf("Recv: %s, %d Bytes\n", buf, ret);
}
}
}
//为什么需要判断EPOLLOUT,而不是直接else?因为一个fd有可能同时存在可读和可写事件的
if (events[i].events & EPOLLOUT)
{
int client_fd = events[i].data.fd;
char buf[1024] = {0};
send(client_fd, buf, sizeof(buf), 0);
}
}
3、epoll的Reactor模式, epoll由以前的对网络io(fd)进行管理,转变成对events事件进行管理。
代码实现:
#include <stdio.h>
#include <string.h>
#include <stdlib.h>
#include <unistd.h>
#include <netinet/tcp.h>
#include <arpa/inet.h>
#include <sys/epoll.h>
#include <errno.h>
//每个fd所对应的信息
struct sockitem
{
int sockfd;
int (*callback)(int fd, int events, void*arg);
char sendbuf[1024];
char recvbuf[1024];
};
//每个epoll所对应的信息
struct epollitem
{
int epfd;
struct epoll_event events[1024]; //events相当于就绪队列,一次性可以处理的集合
};
struct epollitem *eventloop = NULL;
int recv_cb(int fd, int events, void*arg);
int send_cb(int fd, int events, void*arg);
int accept_cb(int fd, int events, void*arg)
{
printf("---accept_cb(int fd, int events, void*arg)---\n");
struct sockaddr_in client_addr;
memset(&client_addr, 0, sizeof(struct sockaddr_in));
socklen_t client_len = sizeof(client_addr);
int client_fd = accept(fd, (struct sockaddr *)&client_addr, &client_len);
if (client_fd <= 0)
return -1;
char str[INET_ADDRSTRLEN] = {0};
printf("recv from IP = %s ,at Port= %d\n", inet_ntop(AF_INET, &client_addr.sin_addr, str, sizeof(str)), ntohs(client_addr.sin_port));
struct epoll_event ev;
ev.events = EPOLLIN | EPOLLET; //epoll默认是LT模式
struct sockitem *si = (struct sockitem*)malloc(sizeof(struct sockitem));
si->sockfd = client_fd;
si->callback = recv_cb;
ev.data.ptr = si;
epoll_ctl(eventloop->epfd, EPOLL_CTL_ADD, client_fd, &ev);
return client_fd;
}
int recv_cb(int fd, int events, void*arg)
{
printf("---recv_cb(int fd, int events, void*arg)---\n");
struct epoll_event ev;
struct sockitem *sit = (struct sockitem*)arg;
int ret = recv(fd, sit->recvbuf, 1024, 0);
if (ret < 0)
{
if (errno == EAGAIN || errno == EWOULDBLOCK)
{
//...
}
else
{
//...
}
printf("ret < 0,断开连接:%d\n", fd);
ev.events = EPOLLIN;
epoll_ctl(eventloop->epfd, EPOLL_CTL_DEL, fd, &ev); //close关闭连接后要将它既是从epoll中删除
close(fd);
free(sit); //连接关闭后释放内存
}
else if (ret == 0) //接收到了客户端发来的断开连接请求FIN后,没有及时调用close函数,进入了CLOSE _WAIT状态
{
printf("ret = 0,断开连接:%d\n", fd);
ev.events = EPOLLIN;
epoll_ctl(eventloop->epfd, EPOLL_CTL_DEL, fd, &ev);
close(fd);
free(sit);
}
else
{
printf("Recv from recvbuf: %s, %d Bytes\n", sit->recvbuf, ret);
ev.events = EPOLLIN | EPOLLOUT; //
sit->sockfd = fd;
sit->callback = send_cb;
ev.data.ptr = sit;
epoll_ctl(eventloop->epfd, EPOLL_CTL_MOD, fd, &ev);
}
return ret;
}
int send_cb(int fd, int events, void*arg)
{
struct epoll_event ev;
struct sockitem *sit = (struct sockitem*)arg;
strncpy(sit->sendbuf, sit->recvbuf, sizeof(sit->recvbuf) + 1);
send(fd, sit->sendbuf, sizeof(sit->recvbuf) + 1, 0);
ev.events = EPOLLIN | EPOLLET; //
sit->sockfd = fd;
sit->callback = recv_cb;
ev.data.ptr = sit;
epoll_ctl(eventloop->epfd, EPOLL_CTL_MOD, fd, &ev);
return fd;
}
int main(int argc, char* argv[])
{
if (argc < 2)
return -1;
int port = atoi(argv[1]); //字符串转换为整型
int sockfd = socket(AF_INET, SOCK_STREAM, 0);
if (sockfd < 0)
return -1;
struct sockaddr_in addr;
memset(&addr, 0, sizeof(struct sockaddr_in)); //新申请的空间一定要置零
addr.sin_family = AF_INET;
addr.sin_port = htons(port); //转换成网络字节序
addr.sin_addr.s_addr = INADDR_ANY;
if (bind(sockfd, (struct sockaddr*)&addr, sizeof(struct sockaddr_in)) < 0)
return -2;
if (listen(sockfd, 5) < 0)
return -3;
//epoll
eventloop = (struct epollitem *)malloc(sizeof(struct epollitem));
eventloop->epfd = epoll_create(1); //创建epoll,相当于红黑树的根节点
struct epoll_event ev;
ev.events = EPOLLIN | EPOLLET;
struct sockitem *si = (struct sockitem*)malloc(sizeof(struct sockitem));
si->sockfd = sockfd;
si->callback = accept_cb;
ev.data.ptr = si; //将fd和对应的回调函数绑定一起带进epoll
epoll_ctl(eventloop->epfd, EPOLL_CTL_ADD, sockfd, &ev); //将ev节点加入到epoll,此处的sockfd参数随便添加没有意义,需要操作系统索引和它有对应的句柄
while (1)
{
int nready = epoll_wait(eventloop->epfd, eventloop->events, 1024, -1); //第四个参数-1表示一直等待,有事件才返回
if (nready < 1) //没有事件触发,nready代表触发事件的个数
break;
int i = 0;
for (i = 0; i < nready; i++)
{
//区分fd的读写操作
if (eventloop->events[i].events & EPOLLIN)
{
struct sockitem *sit = (struct sockitem*)eventloop->events[i].data.ptr;
sit->callback(sit->sockfd, eventloop->events[i].events, sit); //不用区分listen_fd和recv_fd,相应的fd都会调用他们所对应的callback
}
//为什么需要判断EPOLLOUT,而不是直接else?因为一个fd有可能同时存在可读和可写事件的
if (eventloop->events[i].events & EPOLLOUT)
{
struct sockitem *sit = (struct sockitem*)eventloop->events[i].data.ptr;
sit->callback(sit->sockfd, eventloop->events[i].events, sit);
}
}
}
return 0;
}
4、Reactor模型的应用
1、单线程模式的Reactor,参考libevent、redis;
2、多线程模式的Reactor,参考memcached;
3、多进程模式的Reactor,参考nginx。