网络编程: reactor模式的步步探索与实现
- 一.步步探索
- 1.先看一下之前的BUG的影响
- 2.解决拼接式读取问题
- 3.进一步的探索
- 4.Connection的提出
- 5.EpollServer的修改并将监听套接字添加进去
- 6.小演示
- 二.协议与业务登场
- 1.协议,业务,解决粘包,序列反序列化等等的函数模块实现
- 2.读写异常事件的关心策略
- 3.handler.hpp的修改
- 4.client.cpp的快速编写
- 5.演示
- 6.事件派发与提出reactor
- 7.完整代码
- 1.Reactor.hpp
- 2. Connection.hpp
- 3. Epoller.hpp
- 4. handler.hpp
- 5. PacketProcesser.hpp
- 6.协议和业务: Protocal.hpp Translate.hpp
- 7.cpp文件: server和client
- 三.画图进一步理解reactor
- 1.版本1
- 2.版本2
- 3.小总结
回顾上文留下的疑问,这是一个待处理的问题
read的数据粘包,序列反序列化,写事件和异常事件怎么处理?
一.步步探索
1.先看一下之前的BUG的影响
之前我们都是多线程给每个用户提供服务,每个线程对应于一个用户,都在线程函数当中维护了对应的输入输出缓冲区,
所以没有遇到过这种BUG
多路转接使得服务器能够不依赖多线程即可完成同时为多个用户提供服务,那么它就要解决多线程解决过的问题,这是理所当然的
我们先简单地解决数据包粘包问题, 就规定 每条消息固定长度:15字节
buf数组我们搞成大小为20
依旧是echo服务器(发回完整的一条消息(15字节))
因为我们发的消息是字符流,所以没有序列反序列化问题
演示一下:
|Live on hope.|
|Give me time.|
|Show me yours|
|Take it easy!|
|Just a moment|
|Cat dog house|
|Tea pot stand|
这是长度为13的短语,加上两边的|正好15个长度我们到时候就发这个
这种bug是不能容忍的,因此下面我们来解决这一问题
2.解决拼接式读取问题
这一点充分地说明了只要为每个fd维护一个输入输出缓冲区即可修复对应的BUG
大佬在这个角度上又有了更深的思考
某个功能不能写死,如何才能办到呢?
回调函数!!
3.进一步的探索
于是:
4.Connection的提出
每个fd对应于一个用户,也就是一个通信连接,也要有自己的inbuff,outbuff
还要能够访问Epoll_server的Epoller _epoll对象来设置取消关系等等!!
因此大佬对上述的成员进行封装,提出了Connection
写事件的问题我们解决了,但是仔细想想: 这种方法还是写死了啊,只不过是封装一层之后写死的啊
reader和writer应该要由使用方来提供,因此reader,writer,accepter,excepter都要在搞一个回调
因此connection就此成型
#pragma once
#include <functional>
#include <memory>
using namespace std;
class EpollServer; // 前置声明
class Connection;
using func_t = function<void(shared_ptr<Connection>)>;
// connection负责维护一个具体的连接
// 拥有自己的sockfd,用户级输入输出缓冲区, 读写异常事件的注册方法/回调函数
// 还有一个回指向EpollServer的指针/EpollServer设置的对应的回调函数
class Connection
{
public:
Connection(int fd, EpollServer *epoll_server)
: _sockfd(fd), _epoll_server(epoll_server) {}
~Connection()
{
close(_sockfd);
}
void add_outbuffer(const string &buf)
{
_out_buffer += buf;
}
string &get_inbuffer()
{
return _in_buffer;
}
string &get_outbuffer()
{
return _out_buffer;
}
func_t getreader()
{
return _reader;
}
func_t getwriter()
{
return _writer;
}
func_t getexcepter()
{
return _excepter;
}
void registerCallback(func_t reader = nullptr, func_t writer = nullptr, func_t excepter = nullptr)
{
_reader = reader ? reader : _reader;
_writer = writer ? writer : _writer;
_excepter = _excepter ? excepter : _excepter;
}
void deregisterCallback(bool reader = false, bool writer = false, bool excepter = false)
{
if (reader)
_reader = nullptr;
if (writer)
_writer = nullptr;
if (excepter)
_excepter = nullptr;
}
int getfd()
{
return _sockfd;
}
EpollServer *getEpollServer()
{
return _epoll_server;
}
private:
int _sockfd;
string _in_buffer; // 有问题,只能处理文本,无法处理二进制(比如:图片,视频....)
string _out_buffer; // TO BE MODIFY
// 回调函数: 读,写,异常
func_t _reader;
func_t _writer;
func_t _excepter;
EpollServer *_epoll_server;
};
我们先暂且把Callback,Request,Response,BusinessProcessing放到同一个文件中: handler.hpp
不过我们要知道,它们将来必须是要分开的
5.EpollServer的修改并将监听套接字添加进去
Connection封装了一个单独的连接,并提供了回调函数的get和set方法,我们的EpollServer也就要修改了
又因为:
将connection和handler提出去之后,我们就可以在.cpp文件当中提前创建/绑定监听套接字,然后设置回调,添加到EpollServer当中
我们的EpollServer要能够提供这么几个功能:
- 建立和关闭连接 – connection
- 设置和取消关心 – epoll
- 注册和取消回调 – connection当中的reader,writer,excepter
这些代码以大家现在的水平应该一看就能看懂,我们重点是在思想上和代码上,走一下大佬们曾经走过的路
(当然,大佬是边走边建立道路, 我们是边走边欣赏,感叹道路建设当中的优雅与强大)
下面我们添加监听套接字: 建立一个.cpp文件
- 温馨提示: 我们用epoll的ET模式
因此: 我们的accept,read,write都要用非阻塞方式来进行
所以我们拿出我们之前一劳永逸的代码: 直接将fd设置为非阻塞状态
// 非阻塞式IO
void ModifyFdToNonBlock(int fd)
{
int fl = fcntl(fd, F_GETFL); // 获取fd的状态给fl
if (fl < 0)
{
LOG_SCREEN(FATAL) << "ModifyFdToNonBlock(int fd) error , errno: " << errno << " , strerror: " << strerror(errno) << "\n";
return;
}
fcntl(fd, F_SETFL, fl | O_NONBLOCK); // 将fd设置为非阻塞状态
}
我们就不用Socket了,直接用原生套接字接口
6.小演示
跟我们一开始的样子一样,发一下|Live on hope.||Give me time.|等等这些短语验证一下,没问题我们就开始写一个小业务了(主要是自定义应用层协议解决一下粘包问题和序列反序列化问题)
完美,下面我们把它们分开吧
二.协议与业务登场
1.协议,业务,解决粘包,序列反序列化等等的函数模块实现
我们要实现一个简单的英汉互译服务器,
协议是这样的,采用的是LV(length+Value)方式来解决粘包问题 len\n0 单词\n
0和单词之间以空格作为分割符,进行序列反序列化
徒说无益,直接给代码了,没啥难的
// 英汉互译服务器
#pragma once
#include <string>
#include <iostream>
using namespace std;
const char Sep = '\n';
// len\n字符串\n
class Codec
{
public:
static void Encode(string &str)
{
string encode_str = to_string(str.size()) + Sep + str;
str = encode_str;
}
static bool Decode(string &str, string *return_str)
{
// 先找\n
size_t start = 0, pos = str.find(Sep);
if (pos == string::npos)
{
return false;
}
// 1. 取出len
size_t len = stoi(str.substr(start, pos - start));
// 2. start往后走,越过\r\n
start = pos + 1;
// 3. pos往后走len个长度
pos = start + len;
// 4. 看pos是否不够
if (pos >= str.size()+1)
{
return false;
}
// 5.没有越界,则截取字符串,并erase str
*return_str = str.substr(start, len);
str.erase(0,pos);
return true;
}
};
enum TranslateMode
{
ETOC, // 英译汉
CTOE // 汉译英
};
// len\n0 字符串\n
//"0"表示英译汉
//"1"表示汉译英
// 其余的直接丢弃
class Request
{
public:
Request() = default;
Request(TranslateMode mode, const string &str)
: _mode(mode)
{
if (_mode == ETOC)
_english = str;
else
_chinese = str;
}
string Serialize()
{
if (_mode == ETOC)
{
return "0 " + _english;
}
else
return "1 " + _chinese;
}
bool Deserialize(const string &str)
{
// 找空格,分割即可
size_t pos = str.find(" ");
if (pos == string::npos || str.size() <= 2)
return false;
if (str.substr(0, pos) == "0")
{
_english = str.substr(pos + 1);
_mode = ETOC;
return true;
}
else if (str.substr(0, pos) == "1")
{
_chinese = str.substr(pos + 1);
_mode = CTOE;
return true;
}
return false;
}
bool etoc() const
{
return _mode == ETOC;
}
const string &str() const
{
if (etoc())
return _english;
return _chinese;
}
void setmode(TranslateMode mode)
{
_mode = mode;
}
void setstr(const string &s)
{
if (etoc())
_english = s;
else
_chinese = s;
}
private:
string _english;
string _chinese;
TranslateMode _mode;
};
class Response
{
public:
Response() = default;
Response(const string &str)
: _str(str) {}
string Serialize()
{
return _str;
}
bool Deserialize(const string &str)
{
_str = str;
return true;
}
void Setans(const string &str)
{
_str = str;
}
private:
string _str;
};
至于业务:
#pragma once
#include <unordered_map>
using namespace std;
#include "Protocal.hpp"
class Translater
{
public:
static Response Translate(const Request &req)
{
static unordered_map<string, string> umap_etoc = {
{"hello", "你好"}, {"dp", "一生之敌"}, {"ac", "恭喜"}, {"hello world", "你好,世界"}};
static unordered_map<string, string> umap_ctoe = {
{"你好", "hello"}, {"一生之敌", "dp"}, {"恭喜", "ac"}, {"你好,世界", "hello world"}};
bool etoc = req.etoc();
string ans = "Not Found";
if (etoc)
{
if (umap_etoc.count(req.str()))
{
ans = umap_etoc[req.str()];
}
}
else
{
if (umap_ctoe.count(req.str()))
{
ans = umap_etoc[req.str()];
}
}
Response resp;
resp.Setans(ans);
return resp;
}
};
至于解决粘包,反序列化,拿到Request,交给业务层处理,拿到Response,序列化,封装报头这些任务依然需要搞一个文件
PacketProcessor.hpp
#pragma once
#include "Translate.hpp"
#include "Log.hpp"
#include <unistd.h>
class PacketProcessor
{
public:
static string getProcessedMessage(string &inbuffer)
{
string outstr;
string tmpstr;
// 1.解决粘包
while (Codec::Decode(inbuffer, &tmpstr))
{
// 2.反序列化
Request req;
if (!req.Deserialize(tmpstr))
{
LOG_SCREEN(FATAL) << "request 反序列化失败,该报文直接丢弃\n";
return outstr;
}
// 3.交由业务层处理
Response resp = Translater::Translate(req);
// 4.encode
string s = resp.Serialize();
Codec::Encode(s);
// 5.将返回结果添加到outstr当中
outstr += s;
}
return outstr;
}
};
业务相关代码处理完了,下面重点就是reader,writer,excepter函数的修改了
2.读写异常事件的关心策略
大家也都写了一年多代码了,结合我们的编程经验,我们也都可以得出: IO操作当中,读是最最最容易阻塞的,
因此读事件一般都要关心,而写事件很少阻塞(除了学管道的时候见过),因此写事件很少关心,而异常事件可以转为读写事件的关心,
而IO在读/写时也会发生异常,所以统一集中在读写时处理
对于写而言:
我们一般就是直接非阻塞式写,如果遇到errno==EAGAIN,说明发送缓冲区满了,此时才会设置写事件关心
如果用户级发送缓冲区outbuff空了,说明全发过去了,取消写事件关心
注意: 一定不要影响到读事件的关心和回调
3.handler.hpp的修改
#pragma once
#include <string>
using namespace std;
#include "Epoll_server.hpp"
#include "common.hpp"
#include "PacketProcessor.hpp"
#include <sys/socket.h>
#include <netinet/in.h>
#include <arpa/inet.h>
struct Callback
{
static void reader(shared_ptr<Connection> conn)
{
int fd = conn->getfd();
string &inbuff = conn->get_inbuffer();
char buf[1024];
int num;
while (true)
{
errno = 0;
num = recv(fd, buf, sizeof(buf) - 1, 0);
if (num >= 0)
{
buf[num] = 0;
inbuff += buf; // 不要忘了拼接到接收缓冲区中
if (num == 0) // 说明client关闭了写,那么我们只需要把该发给client的数据发完就OK了,无需在读了
break;
}
else
{
if (errno == EAGAIN) // 说明缓冲区为空了,此时退出循环即可
break;
if (errno == EINTR) // 说明读过程收到了信号打断,continue重新读
continue;
// 说明recv失败,打印日志
LOG_SCREEN(ERROR) << "recv message fail!, errno: " << errno << " strerror: " << strerror(errno) << "\n";
excepter(conn); // 调用excepter异常处理(断开连接)
return; // 回来之后马上return
}
}
// 走到这里,说明recv成功,要不然就是接收缓冲区空了,要不然就是client关闭写了
// 无论是哪种,先交给PackProcessor.hpp解决粘包,反序列化拿到Request,交由业务层处理,拿到Response,序列化,封装报头发回来,
// 我们放到发送缓冲区调用writer进行发送
string send_str = PacketProcessor::getProcessedMessage(inbuff);
conn->add_outbuffer(send_str);
writer(conn);
// 还会回来 , 如果num==0 并且发送缓冲区为空,那么通常情况下断开连接即可
if (num == 0)
{
while (!conn->get_outbuffer().empty())
writer(conn); // 只要发送缓冲区还有数据就一直调用
LOG_SCREEN(INFO) << "client exit, send him message finish...\n";
excepter(conn);
}
}
static void writer(shared_ptr<Connection> conn)
{
int fd = conn->getfd();
string &outbuff = conn->get_outbuffer();
while (true)
{
errno = 0;
// 发回去
int num = send(fd, outbuff.c_str(), outbuff.size(), 0);
if (num >= 0)
{
if (num == 0) // 说明client关闭写,通常情况下我们直接关闭跟client的连接即可
{
LOG_SCREEN(INFO) << "client close his writer_interface\n";
excepter(conn);
return; // 回来时直接返回
}
outbuff.erase(0, num); // 发成功的话,要把实际发出的信息从outbuff删除掉
if (outbuff.empty())
break; // 全发完了,直接break即可
}
else
{
if (errno == EAGAIN) // 说明发送缓冲区满了,break即可
break;
if (errno == EINTR)
continue;
// 说明send失败,打印日志
LOG_SCREEN(ERROR) << "send message fail!, errno: " << errno
<< " strerror: " << strerror(errno) << "\n";
excepter(conn); // 调用excepter异常处理(断开连接)
return; // 回来之后马上return
}
}
// 走到这里只有2种情况: 要么发送缓冲区满了 我们需要设置写事件关心,要么发完了 我们需要取消写事件关心
if (outbuff.empty())
{
LOG_SCREEN(INFO) << "outbuff发完了,去取消写事件关心与回调\n";
conn->getEpollServer()->deregisterCallback(fd, false, true);
}
else
{
LOG_SCREEN(INFO) << "缓冲区满了,设置写事件关心与回调\n";
conn->getEpollServer()->setupCare(fd, true, true);
conn->getEpollServer()->registerCallback(fd, &Callback::reader, &Callback::writer);
}
}
static void excepter(shared_ptr<Connection> conn)
{
int fd = conn->getfd();
EpollServer *es = conn->getEpollServer();
// 1. 取消回调
es->deregisterCallback(fd, true, true, true);
// 2. 取消关心
es->teardownCare(fd);
// 3. 关闭连接
es->shutdownConnection(fd);
LOG_SCREEN(INFO) << "关闭连接, client's fd: " << fd << "\n";
}
static void accepter(shared_ptr<Connection> conn)
{
int listenfd = conn->getfd();
while (true)
{
errno = 0;
sockaddr_in srcaddr;
socklen_t len = sizeof(srcaddr);
int newfd = ::accept(listenfd, Conv(&srcaddr), &len);
if (newfd >= 0)
{
LOG_SCREEN(INFO) << "accept success, newfd: " << newfd << "\n";
ModifyFdToNonBlock(newfd);
// 说明有新套接字出现,要用Epoll_server来添加
conn->getEpollServer()->buildConnection(newfd); // 1.建议连接
conn->getEpollServer()->setupCare(newfd, true); // 2.设置读事件的关心
conn->getEpollServer()->registerCallback(newfd, &Callback::reader); // 3.注册回调
}
else
{
if (errno == EAGAIN)
return;
if (errno == EINTR)
continue;
else
{
LOG_SCREEN(ERROR) << "accept fail, errno: "
<< errno << ", strerror(errno) : " << strerror(errno) << "\n";
break;
}
}
}
}
};
4.client.cpp的快速编写
因为telnet的每次回车都会给我们加上/r/n,所以就不好演示,因此与其费劲调telnet和协议,还不如自己写一个简单的普普通通的套接字client呢
我们采用两个新线程,一个负责读,一个负责写
直接上代码了,没啥难的
#include "Protocal.hpp"
#include <iostream>
using namespace std;
#include <vector>
#include <sys/socket.h>
#include <netinet/in.h>
#include <arpa/inet.h>
#include <cstring>
#include "common.hpp"
#include <thread>
void sender(int sockfd)
{
int command;
vector<string> inv = {"0hello", "0dp", "0ac"};
while (true)
{
cout << "是否发送请求, 0代表不发送,直接退出,1代表发送: [0/1]\n";
cin >> command;
if (command == 0)
break;
for (auto &in : inv)
{
// 1.构建请求
Request req;
if (in[0] == '0')
{
req.setmode(ETOC);
}
else
req.setmode(CTOE);
req.setstr(in.substr(1));
// 2.序列化
string send_str = req.Serialize();
// 3.encode
Codec::Encode(send_str);
//cout << "发送数据: " << send_str;
send(sockfd, send_str.c_str(), send_str.size(), 0);
}
}
// 仅关闭写端
// int shutdown(int sockfd, int how); how 设为 SHUT_WR
LOG_SCREEN(INFO) << "发完数据啦,关闭套接字的写端!\n";
shutdown(sockfd, SHUT_WR);
}
void reader(int sockfd)
{
while (true)
{
string out;
int n;
// 1.读取响应
while (true)
{
char buf[1024];
n = recv(sockfd, buf, sizeof(buf) - 1, MSG_DONTWAIT);
if (n >= 0)
{
buf[n] = 0;
out += buf;
// cout << out << endl;
if (n == 0)
break;
}
else
{
if (errno == EAGAIN)
break;
if (errno == EINTR)
continue;
else
{
cout << "recv error, errno: " << errno << ", strerror(errno): " << strerror(errno) << "\n";
return;
}
}
}
// 2.解决数据包粘包
string return_str;
while (Codec::Decode(out, &return_str))
{
// 3.反序列化
Response resp;
if (!resp.Deserialize(return_str))
{
cout << "response 反序列化失败,该报文直接丢弃\n";
break;
}
// 打印即可
cout << "server回复: " << return_str << "\n";
}
if (n == 0)
break;
}
}
int main(int argc, char *argv[])
{
if (argc != 3)
{
cout << "Usage: " << argv[0] << " server_ip, server_port" << endl;
return 1;
}
string ip = argv[1];
uint16_t port = stoi(argv[2]);
// 1. 创建socket
int sockfd = socket(AF_INET, SOCK_STREAM, 0);
// 2. connect
struct sockaddr_in server_addr;
memset(&server_addr, 0, sizeof(server_addr));
server_addr.sin_family = AF_INET;
server_addr.sin_addr.s_addr = inet_addr(ip.c_str());
server_addr.sin_port = htons(port);
int n = connect(sockfd, Conv(&server_addr), sizeof(server_addr));
if (n < 0)
{
cout << "connect fail\n";
return 1;
}
thread t1(sender, sockfd);
thread t2(reader, sockfd);
t1.join();
t2.join();
return 0;
}
5.演示
验证成功
6.事件派发与提出reactor
到现在,我们成功解决了 本文一开始提出的 read的数据粘包,序列反序列化,写事件和异常事件怎么处理?的这个问题
下面我们回过头来看一下我们当初的EpollSever如今变成了什么样子
而正因如此,刚才的EpollServer被称为reactor模式(反应堆模式,也叫做半同步半异步模式)当中最核心的反应堆
反应堆体现在: 利用epoll的事件驱动机制+内部注册的回调函数实现回调函数调用的"自动化"与"连续化" , 就类似于核反应的感觉
半同步半异步模式体现在: 半异步: 事件的监听和通知与注册的函数调用是异步的
半同步体现在: 对应的recv和send接口依旧需要执行流主动调用,是一种IO的同步机制
因此: 我们改一下名字
7.完整代码
1.Reactor.hpp
#pragma once
#include "Log.hpp"
#include <memory>
#include <vector>
#include <cstring>
#include "Connection.hpp"
#include "Epoller.hpp"
class Reactor
{
public:
Epoller &epoll()
{
return _epoll;
}
Reactor(uint16_t port) : _port(port) {}
void dispatch()
{
while (true)
{
int n = _epoll.wait();
if (n > 0)
{
eventloop(n);
}
else
{
LOG_SCREEN(ERROR) << "epoll wait error, errno: " << errno
<< ", strerror: " << strerror(errno) << "\n";
}
}
}
void buildConnection(int fd)
{
_connection_map[fd] = make_shared<Connection>(fd, this);
}
void shutdownConnection(int fd)
{
_connection_map.erase(fd);
}
void setupCare(int fd, bool in = false, bool out = false, bool except = false)
{
_epoll.add_Epoll(fd, in, out, except);
}
void teardownCare(int fd)
{
_epoll.removefromEpoll(fd);
}
void registerCallback(int fd, func_t reader = nullptr, func_t writer = nullptr, func_t excepter = nullptr)
{
// 添加/修改
if (!isConnected(fd))
{
_connection_map[fd] = make_shared<Connection>(fd, this);
}
_connection_map[fd]->registerCallback(reader, writer, excepter);
}
void deregisterCallback(int fd, bool reader = false, bool writer = false, bool excepter = false)
{
// 查看在不在
if (!isConnected(fd))
return;
// 删除
_connection_map[fd]->deregisterCallback(reader, writer, excepter);
}
private:
bool isConnected(int fd)
{
return _connection_map.count(fd) == 1;
}
void eventloop(int n)
{
for (int i = 0; i < n; i++)
{
int fd = _epoll.getfd(i);
uint32_t event = _epoll.getevent(i);
if (!isConnected(fd))
continue;
shared_ptr<Connection> conn = _connection_map[fd];
// 面对异常,我们能做的只有断开连接
// 而读写时也可能会发生异常啊,因此我们把异常统一到一个地方去处理,更加优雅
if (event & EPOLLERR | event & EPOLLHUP)
{
event |= (EPOLLIN | EPOLLOUT);
}
if (event & EPOLLIN)
{
func_t reader = conn->getreader();
//不区分fd是监听套接字还是普通套接字,监听套接字对应的connection绑定的reader函数其实是accepter
if (reader == nullptr)
continue;
reader(conn);
}
if (event & EPOLLOUT)
{
func_t writer = conn->getwriter();
if (writer == nullptr)
continue;
writer(conn);
}
}
}
uint16_t _port;
Epoller _epoll;
unordered_map<int, shared_ptr<Connection>> _connection_map;
};
2. Connection.hpp
#pragma once
#include <functional>
#include <unistd.h>
#include <memory>
using namespace std;
class Reactor; // 前置声明
class Connection;
using func_t = function<void(shared_ptr<Connection>)>;
// connection负责维护一个具体的连接
// 拥有自己的sockfd,用户级输入输出缓冲区, 读写异常事件的注册方法/回调函数
// 还有一个回指向EpollServer的指针/EpollServer设置的对应的回调函数
class Connection
{
public:
Connection(int fd, Reactor *reactor)
: _sockfd(fd), _reactor(reactor) {}
~Connection()
{
close(_sockfd);
}
void add_outbuffer(const string &buf)
{
_out_buffer += buf;
}
string &get_inbuffer()
{
return _in_buffer;
}
string &get_outbuffer()
{
return _out_buffer;
}
func_t getreader()
{
return _reader;
}
func_t getwriter()
{
return _writer;
}
func_t getexcepter()
{
return _excepter;
}
void registerCallback(func_t reader = nullptr, func_t writer = nullptr, func_t excepter = nullptr)
{
_reader = reader ? reader : _reader;
_writer = writer ? writer : _writer;
_excepter = _excepter ? excepter : _excepter;
}
void deregisterCallback(bool reader = false, bool writer = false, bool excepter = false)
{
if (reader)
_reader = nullptr;
if (writer)
_writer = nullptr;
if (excepter)
_excepter = nullptr;
}
int getfd()
{
return _sockfd;
}
Reactor *reactor()
{
return _reactor;
}
private:
int _sockfd;
string _in_buffer; // 有问题,只能处理文本,无法处理二进制(比如:图片,视频....)
string _out_buffer; // TO BE MODIFY
// 回调函数: 读,写,异常
func_t _reader;
func_t _writer;
func_t _excepter;
Reactor *_reactor;
};
3. Epoller.hpp
#pragma once
#include <sys/epoll.h>
#include <vector>
#include <string>
using namespace std;
class Epoller
{
public:
// 默认阻塞式等待
Epoller(int timeout = -1) : _timeout(timeout)
{
_epfd = epoll_create(1);
}
void add_Epoll(int fd, bool in = false, bool out = false, bool except = false)
{
int i = _events_arr.size();
if (!_invalids.empty())
{
i = _invalids.back() - '0';
_invalids.pop_back();
}
else
_events_arr.push_back(epoll_event());
_events_arr[i].events = EPOLLET | (in ? EPOLLIN : 0) | (out ? EPOLLOUT : 0) | (except ? EPOLLERR | EPOLLHUP : 0);
_events_arr[i].data.fd = fd;
epoll_ctl(_epfd, EPOLL_CTL_ADD, fd, &_events_arr[i]);
}
void removefromEpoll(int fd)
{
for (int i = 0; i < _events_arr.size(); i++)
{
if (_events_arr[i].data.fd == fd)
{
_events_arr[i].data.fd = -1;
_events_arr[i].events = 0;
_invalids.push_back(i); // 删除的时候添加到invalids当中
break;
}
}
epoll_ctl(_epfd, EPOLL_CTL_DEL, fd, nullptr);
}
int wait()
{
return epoll_wait(_epfd, &_events_arr[0], _events_arr.size(), _timeout);
}
int getfd(int index)
{
return _events_arr[index].data.fd;
}
uint32_t getevent(int index)
{
return _events_arr[index].events;
}
private:
int _epfd;
vector<struct epoll_event> _events_arr;
string _invalids;
int _timeout;
};
4. handler.hpp
#pragma once
#include <string>
using namespace std;
#include "Reactor.hpp"
#include "common.hpp"
#include "PacketProcessor.hpp"
#include <sys/socket.h>
#include <netinet/in.h>
#include <arpa/inet.h>
struct Handler
{
static void reader(shared_ptr<Connection> conn)
{
int fd = conn->getfd();
string &inbuff = conn->get_inbuffer();
char buf[1024];
int num;
while (true)
{
errno = 0;
num = recv(fd, buf, sizeof(buf) - 1, 0);
if (num >= 0)
{
buf[num] = 0;
inbuff += buf; // 不要忘了拼接到接收缓冲区中
if (num == 0) // 说明client关闭了写,那么我们只需要把该发给client的数据发完就OK了,无需在读了
break;
}
else
{
if (errno == EAGAIN) // 说明缓冲区为空了,此时退出循环即可
break;
if (errno == EINTR) // 说明读过程收到了信号打断,continue重新读
continue;
// 说明recv失败,打印日志
LOG_SCREEN(ERROR) << "recv message fail!, errno: " << errno << " strerror: " << strerror(errno) << "\n";
excepter(conn); // 调用excepter异常处理(断开连接)
return; // 回来之后马上return
}
}
// 走到这里,说明recv成功,要不然就是接收缓冲区空了,要不然就是client关闭写了
// 无论是哪种,先交给PackProcessor.hpp解决粘包,反序列化拿到Request,交由业务层处理,拿到Response,序列化,封装报头发回来,
// 我们放到发送缓冲区调用writer进行发送
string send_str = PacketProcessor::getProcessedMessage(inbuff);
conn->add_outbuffer(send_str);
writer(conn);
// 还会回来 , 如果num==0 并且发送缓冲区为空,那么通常情况下断开连接即可
if (num == 0)
{
while (!conn->get_outbuffer().empty())
writer(conn); // 只要发送缓冲区还有数据就一直调用
LOG_SCREEN(INFO) << "client exit, send him message finish...\n";
excepter(conn);
}
}
static void writer(shared_ptr<Connection> conn)
{
int fd = conn->getfd();
string &outbuff = conn->get_outbuffer();
while (true)
{
errno = 0;
// 发回去
int num = send(fd, outbuff.c_str(), outbuff.size(), 0);
if (num >= 0)
{
if (num == 0) // 说明client关闭写,通常情况下我们直接关闭跟client的连接即可
{
LOG_SCREEN(INFO) << "client close his writer_interface\n";
excepter(conn);
return; // 回来时直接返回
}
outbuff.erase(0, num); // 发成功的话,要把实际发出的信息从outbuff删除掉
if (outbuff.empty())
break; // 全发完了,直接break即可
}
else
{
if (errno == EAGAIN) // 说明发送缓冲区满了,break即可
break;
if (errno == EINTR)
continue;
// 说明send失败,打印日志
LOG_SCREEN(ERROR) << "send message fail!, errno: " << errno
<< " strerror: " << strerror(errno) << "\n";
excepter(conn); // 调用excepter异常处理(断开连接)
return; // 回来之后马上return
}
}
// 走到这里只有2种情况: 要么发送缓冲区满了 我们需要设置写事件关心,要么发完了 我们需要取消写事件关心
if (outbuff.empty())
{
LOG_SCREEN(INFO) << "outbuff发完了,去取消写事件关心与回调\n";
conn->reactor()->deregisterCallback(fd, false, true);
}
else
{
LOG_SCREEN(INFO) << "缓冲区满了,设置写事件关心与回调\n";
conn->reactor()->setupCare(fd, true, true);
conn->reactor()->registerCallback(fd, &Handler::reader, &Handler::writer);
}
}
static void excepter(shared_ptr<Connection> conn)
{
int fd = conn->getfd();
// 1. 取消回调
conn->reactor()->deregisterCallback(fd, true, true, true);
// 2. 取消关心
conn->reactor()->teardownCare(fd);
// 3. 关闭连接
conn->reactor()->shutdownConnection(fd);
LOG_SCREEN(INFO) << "关闭连接, client's fd: " << fd << "\n";
}
static void accepter(shared_ptr<Connection> conn)
{
int listenfd = conn->getfd();
while (true)
{
errno = 0;
sockaddr_in srcaddr;
socklen_t len = sizeof(srcaddr);
int newfd = ::accept(listenfd, Conv(&srcaddr), &len);
if (newfd >= 0)
{
LOG_SCREEN(INFO) << "accept success, newfd: " << newfd << "\n";
ModifyFdToNonBlock(newfd);
// 说明有新套接字出现,要用Epoll_server来添加
conn->reactor()->buildConnection(newfd); // 1.建议连接
conn->reactor()->setupCare(newfd, true); // 2.设置读事件的关心
conn->reactor()->registerCallback(newfd, &Handler::reader); // 3.注册回调
}
else
{
if (errno == EAGAIN)
return;
if (errno == EINTR)
continue;
else
{
LOG_SCREEN(ERROR) << "accept fail, errno: "
<< errno << ", strerror(errno) : " << strerror(errno) << "\n";
break;
}
}
}
}
};
5. PacketProcesser.hpp
#pragma once
#include "Translate.hpp"
#include "Log.hpp"
#include <unistd.h>
class PacketProcessor
{
public:
static string getProcessedMessage(string &inbuffer)
{
string outstr;
string tmpstr;
// 1.解决粘包
while (Codec::Decode(inbuffer, &tmpstr))
{
// 2.反序列化
Request req;
if (!req.Deserialize(tmpstr))
{
LOG_SCREEN(FATAL) << "request 反序列化失败,该报文直接丢弃\n";
return outstr;
}
// 3.交由业务层处理
Response resp = Translater::Translate(req);
// 4.encode
string s = resp.Serialize();
Codec::Encode(s);
// 5.将返回结果添加到outstr当中
outstr += s;
}
return outstr;
}
};
6.协议和业务: Protocal.hpp Translate.hpp
Protocal.hpp
// 英汉互译服务器
#pragma once
#include <string>
#include <iostream>
using namespace std;
const char Sep = '\n';
// len\n字符串\n
class Codec
{
public:
static void Encode(string &str)
{
string encode_str = to_string(str.size()) + Sep + str;
str = encode_str;
}
static bool Decode(string &str, string *return_str)
{
// 先找\n
size_t start = 0, pos = str.find(Sep);
if (pos == string::npos)
{
return false;
}
// 1. 取出len
size_t len = stoi(str.substr(start, pos - start));
// 2. start往后走,越过\r\n
start = pos + 1;
// 3. pos往后走len个长度
pos = start + len;
// 4. 看pos是否不够
if (pos >= str.size()+1)
{
return false;
}
// 5.没有越界,则截取字符串,并erase str
*return_str = str.substr(start, len);
str.erase(0,pos);
return true;
}
};
enum TranslateMode
{
ETOC, // 英译汉
CTOE // 汉译英
};
// len\n0 字符串\n
//"0"表示英译汉
//"1"表示汉译英
// 其余的直接丢弃
class Request
{
public:
Request() = default;
Request(TranslateMode mode, const string &str)
: _mode(mode)
{
if (_mode == ETOC)
_english = str;
else
_chinese = str;
}
string Serialize()
{
if (_mode == ETOC)
{
return "0 " + _english;
}
else
return "1 " + _chinese;
}
bool Deserialize(const string &str)
{
// 找空格,分割即可
size_t pos = str.find(" ");
if (pos == string::npos || str.size() <= 2)
return false;
if (str.substr(0, pos) == "0")
{
_english = str.substr(pos + 1);
_mode = ETOC;
return true;
}
else if (str.substr(0, pos) == "1")
{
_chinese = str.substr(pos + 1);
_mode = CTOE;
return true;
}
return false;
}
bool etoc() const
{
return _mode == ETOC;
}
const string &str() const
{
if (etoc())
return _english;
return _chinese;
}
void setmode(TranslateMode mode)
{
_mode = mode;
}
void setstr(const string &s)
{
if (etoc())
_english = s;
else
_chinese = s;
}
private:
string _english;
string _chinese;
TranslateMode _mode;
};
class Response
{
public:
Response() = default;
Response(const string &str)
: _str(str) {}
string Serialize()
{
return _str;
}
bool Deserialize(const string &str)
{
_str = str;
return true;
}
void Setans(const string &str)
{
_str = str;
}
private:
string _str;
};
Translate.hpp:
#pragma once
#include <unordered_map>
using namespace std;
#include "Protocal.hpp"
class Translater
{
public:
static Response Translate(const Request &req)
{
static unordered_map<string, string> umap_etoc = {
{"hello", "你好"}, {"dp", "一生之敌"}, {"ac", "恭喜"}, {"hello world", "你好,世界"}};
static unordered_map<string, string> umap_ctoe = {
{"你好", "hello"}, {"一生之敌", "dp"}, {"恭喜", "ac"}, {"你好,世界", "hello world"}};
bool etoc = req.etoc();
string ans = "Not Found";
if (etoc)
{
if (umap_etoc.count(req.str()))
{
ans = umap_etoc[req.str()];
}
}
else
{
if (umap_ctoe.count(req.str()))
{
ans = umap_etoc[req.str()];
}
}
Response resp;
resp.Setans(ans);
return resp;
}
};
7.cpp文件: server和client
server.cpp
#include "handler.hpp"
#include "common.hpp"
const int defaultBacklog = 5;
int getListenSock(uint16_t port)
{
// 1.设置监听套接字
int listensock = socket(AF_INET, SOCK_STREAM, 0);
// 2.绑定监听套接字
struct sockaddr_in addr;
memset(&addr, 0, sizeof(addr));
addr.sin_family = AF_INET;
addr.sin_port = htons(port);
addr.sin_addr.s_addr = INADDR_ANY;
if (::bind(listensock, Conv(&addr), sizeof(addr)) == -1)
{
LOG_SCREEN(FATAL) << "bind fail , port: " << port << "\n";
exit(1);
}
LOG_SCREEN(DEBUG) << "bind success\n";
// 3.进行监听
if (listen(listensock, defaultBacklog) == -1)
{
LOG_SCREEN(FATAL) << "listen fail , port: " << port << "\n";
exit(1);
}
LOG_SCREEN(DEBUG) << "listen success\n";
return listensock;
}
int main(int argc, char *argv[])
{
if (argc != 2)
{
cout << "Usage: " << argv[0] << " server_port\n";
return 1;
}
uint16_t port = stoi(argv[1]);
// 1. 创建监听套接字,并设为非阻塞
int listensock = getListenSock(port);
ModifyFdToNonBlock(listensock);
// 2. 注册监听套接字
Reactor svr(port);
svr.setupCare(listensock, true);
svr.registerCallback(listensock, &Handler::accepter);
// 3. dispatch就完事了
svr.dispatch();
return 0;
}
client.cpp
#include "Protocal.hpp"
#include <iostream>
using namespace std;
#include <vector>
#include <sys/socket.h>
#include <netinet/in.h>
#include <arpa/inet.h>
#include <cstring>
#include "common.hpp"
#include <thread>
void sender(int sockfd)
{
int command;
vector<string> inv = {"0hello", "0dp", "0ac"};
while (true)
{
cout << "是否发送请求, 0代表不发送,直接退出,1代表发送: [0/1]\n";
cin >> command;
if (command == 0)
break;
for (auto &in : inv)
{
// 1.构建请求
Request req;
if (in[0] == '0')
{
req.setmode(ETOC);
}
else
req.setmode(CTOE);
req.setstr(in.substr(1));
// 2.序列化
string send_str = req.Serialize();
// 3.encode
Codec::Encode(send_str);
//cout << "发送数据: " << send_str;
send(sockfd, send_str.c_str(), send_str.size(), 0);
}
}
// 仅关闭写端
// int shutdown(int sockfd, int how); how 设为 SHUT_WR
LOG_SCREEN(INFO) << "发完数据啦,关闭套接字的写端!\n";
shutdown(sockfd, SHUT_WR);
}
void reader(int sockfd)
{
while (true)
{
string out;
int n;
// 1.读取响应
while (true)
{
char buf[1024];
n = recv(sockfd, buf, sizeof(buf) - 1, MSG_DONTWAIT);
if (n >= 0)
{
buf[n] = 0;
out += buf;
// cout << out << endl;
if (n == 0)
break;
}
else
{
if (errno == EAGAIN)
break;
if (errno == EINTR)
continue;
else
{
cout << "recv error, errno: " << errno << ", strerror(errno): " << strerror(errno) << "\n";
return;
}
}
}
// 2.解决数据包粘包
string return_str;
while (Codec::Decode(out, &return_str))
{
// 3.反序列化
Response resp;
if (!resp.Deserialize(return_str))
{
cout << "response 反序列化失败,该报文直接丢弃\n";
break;
}
// 打印即可
cout << "server回复: " << return_str << "\n";
}
if (n == 0)
break;
}
}
int main(int argc, char *argv[])
{
if (argc != 3)
{
cout << "Usage: " << argv[0] << " server_ip, server_port" << endl;
return 1;
}
string ip = argv[1];
uint16_t port = stoi(argv[2]);
// 1. 创建socket
int sockfd = socket(AF_INET, SOCK_STREAM, 0);
// 2. connect
struct sockaddr_in server_addr;
memset(&server_addr, 0, sizeof(server_addr));
server_addr.sin_family = AF_INET;
server_addr.sin_addr.s_addr = inet_addr(ip.c_str());
server_addr.sin_port = htons(port);
int n = connect(sockfd, Conv(&server_addr), sizeof(server_addr));
if (n < 0)
{
cout << "connect fail\n";
return 1;
}
thread t1(sender, sockfd);
thread t2(reader, sockfd);
t1.join();
t2.join();
return 0;
}
common.cpp
#pragma once
#include <unistd.h>
#include <fcntl.h>
#include <cstring>
#include <sys/types.h>
#include <sys/stat.h>
#include <unistd.h>
#include "Log.hpp"
#define Conv(addr) (reinterpret_cast<struct sockaddr *>(addr))
// 非阻塞式IO
void ModifyFdToNonBlock(int fd)
{
int fl = fcntl(fd, F_GETFL); // 获取fd的状态给fl
if (fl < 0)
{
LOG_SCREEN(FATAL) << "ModifyFdToNonBlock(int fd) error , errno: " << errno << " , strerror: " << strerror(errno) << "\n";
return;
}
fcntl(fd, F_SETFL, fl | O_NONBLOCK); // 将fd设置为非阻塞状态
}
三.画图进一步理解reactor
画了两张图,大家看一下
1.版本1
2.版本2
仅仅1087行代码就能实现一个简单的reactor模式了
3.小总结
reactor模式主要包括:
- Reactor反应堆(核心)
- Connection(维护每个连接[fd]与其用户级inbuff,outbuff,还有回调函数)
- Epoller(封装epoll进行多路转接)
- Handler(Connection对应的回调函数)
再往上就是具体的业务处理模块了: 协议层和业务层
以上就是网络编程: reactor模式的步步探索与实现的全部内容,希望能对大家有所帮助!!