目录
- 基础操作
- 端点的生成
- 创建socket
- 服务端创建acceptor用于监听传入的连接请求 并 接受连接
- 服务器绑定端口
- 客户端通过ip连接服务端 connect
- 客户端通过域名连接服务端
- 服务器监听 接受连接
- 读写buffer
- 同步的读写
- 实现一个同步读写的服务器与客户端应答
- 异步读写函数
- 异步写
- 异步读
- 官方异步Server读写示例
- 问题处理改进
- 使用lambda表达式和智能指针延长连接的生命周期
- 增加发送队列实现全双工通信
- 处理粘包问题
- io_context:在不同平台有不同的实现
- 字节序问题
完整示例代码
booast::asio和boost::asio::ip
boost::asio提供了同步和异步的connect、read、write函数。这些函数有2套实现,一个是出错时抛异常,一个是传入错误码
boost::asio::ip提供了对ip地址、端点的封装,当创建套接字时需要传入io_context,并提供了套接字属性控制函数(类似fcntl)
cmake构建时需要的组件 boost::system boost::regex
基础操作
端点的生成
int client_end_point() {
//1.对端ip string转address
std::string raw_ip_address = "127.4.8.1";
unsigned short port_num = 3333;
boost::system::error_code ec;
asio::ip::address ip_address = asio::ip::address::from_string(raw_ip_address, ec);
if (ec.value() != 0)//转换失败
{
std::cout << "解析ip失败 " << ec.message() << std::endl;
return ec.value();
}
//2. 生成端点
asio::ip::tcp::endpoint ep(ip_address, port_num);
return 0;
}
int server_end_point() {
unsigned short port_num = 3333;
asio::ip::address ip_address = asio::ip::address_v6::any();
asio::ip::tcp::endpoint ep(ip_address, port_num);
return 0;
}
创建socket
传入 上下文asio::io_context,使socket能够注册自己的事件,在事件循环中得到处理
asio::io_context ioc;
asio::ip::tcp protocol = asio::ip::tcp::v4();
asio::ip::tcp::socket sock(ioc);
服务端创建acceptor用于监听传入的连接请求 并 接受连接
和创建socket一样,老版本需要手动调用open打开一下:acceptor.open(protocol, ec)。
asio::io_context ios;
asio::ip::tcp::acceptor acceptor(ios);
服务器绑定端口
如下写法相当于执行了bind:服务器端 3333端口 接收ipv4连接
asio::io_context ios;
asio::ip::tcp::endpoint ep(asio::ip::tcp::v4(), 3333);
asio::ip::tcp::acceptor acceptor(ios, ep);
旧版写法如下
//1.创建端点
unsigned short port_num = 3333;
asio::ip::tcp::endpoint ep(asio::ip::address_v4::any(), port_num);
//2.创建acceptor
asio::io_context ios;
asio::ip::tcp::acceptor acceptor(ios, ep.protocol());
//3.bind
boost::system::error_code ec;
acceptor.bind(ep, ec);
异步的accept:acceptor.async_accept(error_code)
客户端通过ip连接服务端 connect
int connect_to_end()
{
//服务器的ip和端口
std::string raw_ip_address = "192.168.1.124";
unsigned short port_num = 3333;
try
{
//1.创建socket
asio::ip::tcp::endpoint ep(asio::ip::address::from_string(raw_ip_address), port_num);
asio::io_context ios;
asio::ip::tcp::socket sock(ios, ep.protocol());//ep.protocol()就是ipv4
//2.连接到端点
sock.connect(ep);
}
catch (boost::system::system_error& e)
{
std::cout << e.what() << std::endl;
}
return 0;
}
客户端通过域名连接服务端
int dns_connect_to_end()
{
std::string host = "samplehost";//域名
std::string port_num = "3333";
//1.创建dns解析器
asio::io_context ios;
asio::ip::tcp::resolver::query resolver_query(host, port_num, asio::ip::tcp::resolver::query::numeric_service);
asio::ip::tcp::resolver resolver(ios);//用于执行解析的解析器对象
try
{//由于负载均衡,一个域名可以对应多个IP
//2.解析域名
asio::ip::tcp::resolver::iterator it = resolver.resolve(resolver_query);
//3.创建socket
asio::ip::tcp::socket sock(ios);
//4.连接服务端:使用全局的connect而不是sock.connet,那个方法没有提供迭代器版本
asio::connect(sock, it);
}
catch (boost::system::system_error& e)
{
std::cout << e.what() << std::endl;
}
return 0;
}
服务器监听 接受连接
int accept_new_connection()
{
//1.创建端点
const int BACKLOG_SIZE = 30;//监听队列大小
unsigned short port_num = 3333;
asio::ip::tcp::endpoint ep(asio::ip::address_v4::any(), port_num);
asio::io_context ios;
try
{
//2.创建acceptor
asio::ip::tcp::acceptor acceptor(ios, ep.protocol());
//3.bind
acceptor.bind(ep);
//4.监听
acceptor.listen(BACKLOG_SIZE);
//5.接受连接 并 创建通信socket
asio::ip::tcp::socket sock(ios);//通信socket
acceptor.accept(sock);
}
catch (boost::system::system_error& e)
{
std::cout << e.what() << std::endl;
}
return 0;
}
读写buffer
std::size_t send(const ConstBufferSequence & buffers)
-
数据结构
mutablexxx用于写服务, constxxx用于读服务
MutableBufferSequence 就是 std::vector<asio::mutable_buffer >
ConstBufferSequence 就是 std::vector<asio::const_buffer >
mutable_buffer、const_buffer 首字节存储数据长度,后续存储具体数据 -
为了免去自行组装MutableBufferSequence的麻烦,提供了buffer()函数
自行组装: //string->char*->const_buffer std::string buf = "hello"; asio::const_buffer asio_buf(buf.c_str(), buf.length()); //把const_buffer放到vector容器中 std::vector<asio::const_buffer> buffer_sequence; buffer_sequence.push_back(asio_buf); //发送 //...
-
buffer函数:返回const_buffers_1类型,可以直接用于asio的各种函数
asio::const_buffers_1 output_buf = asio::buffer("hello");
-
char数组转buffer
//1.使用智能指针来管理char数组 const size_t BUF_SIZE_BYTES = 20; std::unique_ptr<char[]> buf(new char[BUF_SIZE_BYTES]); //2.转为xx_buffer_1 auto input_buf = asio::buffer(static_cast<void*>(buf.get()), BUF_SIZE_BYTES);
同步的读写
- write_some:发送尽量多的数据,但可能不会全部发送。函数返回实际发送的字节数
循环发送剩余数据,直至发送完毕
客户端向服务器同步的发送数据:std::string buf = "hello"; std::size_t total_bytes_written = 0; while (total_bytes_written != buf.length()) { total_bytes_written += sock.write_some(asio::buffer(buf.c_str()+ total_bytes_written, buf.length()-total_bytes_written)); }
//1.产生端点 asio::ip::tcp::endpoint ep(asio::ip::address::from_string(raw_ip_address), port_num); //2.创建套接字 asio::io_context ioc; asio::ip::tcp::socket sock(ioc, ep.protocol()); //3.连接 sock.connect(ep); //4.发送数据 sock.write_some...
- send:一次性全部发送,没发完就阻塞在那,直至全发完为止
//4.发送数据 std::string buf = "hello"; int send_length = sock.send(asio::buffer(buf.c_str(), buf.length())); //<0为错误 =0为对端关闭
- write:一次性全部发送,没发完就阻塞在那,直至全发完为止
int send_length = asio::write(sock, asio::buffer(buf.c_str(), buf.length()));
- read_some
const unsigned short MESSAGE_SIZE = 7; char buf[MESSAGE_SIZE]; std::size_t total_bytes_read = 0; while (total_bytes_read != MESSAGE_SIZE) { total_bytes_read += sock.read_some(asio::buffer(buf + total_bytes_read, MESSAGE_SIZE - total_bytes_read)); }
- receive
sock.receive(asio::buffer(buffer_receive, BUFF_SIZE))
- read
asio::read(sock, asio::buffer(buffer_receive, BUFF_SIZE));
实现一个同步读写的服务器与客户端应答
-
客户端
-
连接
创建上下文服务并传给套接字sock,使用sock调用connect函数发起连接,函数中传入对端endpoint
- 收发
boost::asio::write(sock, boost::asio::buffer(char*数组, 长度)) boost::asio::read(sock, boost::asio::buffer(char*数组, 长度))
-
-
服务端
- 阻塞等待新连接
tcp::acceptor a(io_context, tcp::endpoint(tcp::v4(), port)) socket_ptr socket(new tcp::socket(io_context));//创建socket,并用智能指针管理 a.accept(*socket);
- 使用子线程与对方会话
auto t = std::make_shared<std::thread>(session, socket);//创建线程用于执行会话:std::thread t(session, socket); thread_set.insert(t);//为了避免指针t的提前释放,将其存在容器中,这样直到容器释放时才会释放 //在session函数中实现具体的收发...
- 阻塞等待新连接
异步读写函数
异步写
-
异步写 async_write_some(buffer, 回调函数):一次可能不会发送完,因为TCP发送缓冲区大小和用户要求发送的数据大小不一致。
如下在回调函数中处理剩余未发送的数据,判读已发送数据的大小,如果未发送完,就再次调用async_write_some异步写。
_socket->async_write_some(...WriteCallBackErr...); //异步写操作 的 回调函数 bytes_transferred:本次async_write_some发送的长度 send_node:此前已经发送的长度 void Session::WriteCallBackErr(const boost::system::error_code& ec, std::size_t bytes_transferred, std::shared_ptr<MsgNode> send_node) { //被设置为异步写async_write_some的回调函数,这样在执行了async_write_some后就会调用该函数 //由于async_write_some不一定一次将所有数据写完,这里再次调用异步写 if (bytes_transferred + send_node->_cur_len < send_node->_total_len) { send_node->_cur_len += bytes_transferred; _socket->async_write_some(剩余... WriteCallBackErr...); } }
-
问题:上述代码在回调函数中处理未发送完的数据,如果此时恰好用户重新发起了新的异步写请求,就会出现数据顺序混乱。实际工作中使用队列来保证应用层的发送请求顺序
std::queue<std::shared_ptr<MsgNode>> _send_queue;//发送队列,避免用户请求因async_write_some的多次写而顺序混乱 bool _send_pending;//true表示一个节点还未发送完,默认初始化为false
解决:每当用户发起新的write请求时,先将要发送的信息存入队列,然后判断当前节点是否发送完毕,如果发送完毕了,就async_write_some执行新的异步写
void Session::WriteToSocket(const std::string& buf) { _send_queue.emplace(new MsgNode(buf.c_str(), buf.length())); if (_send_pending)//当前节点未发送完,暂时不要调用异步写 { return; } //发送新节点的异步写请求 _socket->async_write_some(boost::asio::buffer(buf), [this](const boost::system::error_code& ec, std::size_t bytes_transferred) { WriteCallBack(ec, bytes_transferred); }); _send_pending = true; }
回调函数WriteCallBack:尚未发完就取队头节点继续发 否则取新节点发
//1.取队头元素(当前正在处理),如果尚未发送完,就继续发送 auto& send_data = _send_queue.front(); if (...) { _socket->async_write_some( ...WriteCallBack(ec, bytes_transferred)); return; } //2.如果已经发送完了,就队头元素出队 _send_queue.pop(); //3.发送队列里的其它节点 if (_send_queue.empty()) _send_pending = false; else//异步写新节点 { auto& send_data = _send_queue.front(); _socket->async_write_some( ...WriteCallBack(ec, bytes_transferred)); return; }
总结:
-
使用async_send一次性发送数据:实际上async_send就是多次调用async_write_some
写法和上述一样,只不过回调函数中无需处理当前节点尚未发送完毕的情形
异步读
-
异步读 async_read_some、async_receive
void Session::ReadFromSocket() { ... _socket->async_read_some(boost::asio::buffer(...),ReadCallBack()); ... } //回调函数 void Session::ReadCallBack() { //如果还没读完:那就async_read_some继续读 _recv_node->_cur_len += bytes_transferred; if (...) { _socket->async_read_some(); return; } }
官方异步Server读写示例
-
在实际开发中,通常使用async_read_some异步读、async_send异步写。如下实现异步server
代码总结实现步骤:将服务器接收连接的步骤由Server类来实现,服务器读写客户端数据由Session会话类来实现
问题处理改进
使用lambda表达式和智能指针延长连接的生命周期
-
问题:假设不是echo服务器,设计在读完数据后再次启动async_read_some的话,如果恰好在async_send前客户端关闭了,就会导致触发读事件和写事件的err,使得this指针被delete2次。
//读回调函数 void Session::handle_read(...) { if (!error) { std::cout << "收到数据: " << _data << std::endl; _socket.async_send(...handle_write(er)...); } else { std::cout << error.message() << std::endl; delete this; } }
-
解决
为了管理各个会话session,实现如断线重连的功能,需要使用map来管理所有session,使用boost库自带的生成id方法。和客户端成功连接并调用异步读函数后,就将该session和对应id保存到map中。boost::uuids::uuid a_uuid = boost::uuids::random_generator()(); _uuid = boost::uuids::to_string(a_uuid); std::map<std::string, std::shared_ptr<Session>> _sessions;
在Server类中,为了避免执行async_accept的回调函数时new_session已经被释放了,如下将其通过lambda表达式的按值捕获复制,使其引用计数+1
void Server::start_accept() { std::shared_ptr<Session> new_session = std::make_shared<Session>(_ioc, this); _acceptor.async_accept(new_session->Socket(), [this, new_session](const boost::system::error_code& error) { handle_accept(new_session, error); }); }
在Session类中,Start调用异步读函数,handle_read作为回调函数被执行了以后,Start的lambda表达式中所拷贝的Session智能指针引用计数减一,为避免其释放,在handle_read函数中使用lambda表达式再次按值捕获智能指针。
下述代码解决了2次delete的问题,因为handle_read和handle_write的lambda表达式中都按值捕获,使得引用计数为2,单单handle_read出错只会导致其从Server类的map中移除,且引用计数减一,但还没到释放的地步。
//启动 void Session::Start() { //1.初始化清零数据 memset(_data, 0, max_length); //2.异步读 _socket.async_read_some(buffer(....), [this](....) { handle_read(error, bytes_transferred, shared_from_this()); }); } //读回调函数 void Session::handle_read(...., std::shared_ptr<Session> _self_shared) { if (!error) { _socket.async_send(buffer(),[this, _self_shared](....) { handle_write(....); }); } else { _sessions.erase(_uuid); } }
总之:使用map方便管理session,使用lambda表达式按值捕获避免了对象提前析构。session的智能指针计数最多为3,即handel_read、handel_write、map都有它时。
增加发送队列实现全双工通信
1.将数据组成节点放到队列中
2.执行async_send发送数据
3.回调函数判断队列中是否还有节点待发送,有则调用async_send发送
handle_write回调函数:队头元素已经发送完毕出队,如果队列不空则再次调用async_send发送数据
void Session::Send(char* msg, int max_length)
{
bool pending = false;
std::lock_guard<std::mutex> lock(_send_lock);
//队列上有数据待组装成节点发送
if (_send_que.size() > 0) pending = true;
_send_que.push(std::make_shared<MsgNode>(msg, max_length));
//当前有线程正在发送数据
if (pending) return;
//没有线程在发送数据了,必须自行调用async_send发送数据
_socket.async_send(buffer(....), ...handle_write(er, shared_from_this());
}
处理粘包问题
切包处理:也就是在应用层定义收发包格式,常使用tlv协议(tag+length+value)
下述代码构造节点保存length+value,以便实现简单的切包
MsgNode(char* msg, short max_len) :_total_len(max_len + HEAD_LENGTH), _cur_len(0)
{
_data = new char[_total_len + 1]();
memcpy(_data, &max_len, HEAD_LENGTH);//将消息体长度存储到_data里
memcpy(_data + HEAD_LENGTH, msg, max_len);//再存储消息体信息
_data[_total_len] = '\0';//最后添加上'\0'
}
- async_read_some读取到的数据都会首先会存放在_data 数组中,在回调函数中将此数据copy拆分切包。
- 为了便于拆分length数据和value数据,定义2个结构来分别保存它们
std::shared_ptr<MsgNode> _recv_msg_node;//收到的消息结构 std::shared_ptr<MsgNode> _recv_head_node;//收到的头部结构 bool _b_head_parse;//头部解析是否已完成
- 回调函数handle_read
- 如果本次async_read_some读取到的数据 加上 头部结构已有数据 仍不足2字节:就将_data中的数据memcpy到头部结构,清空_data,再次调用async_read_some
- 如果超过2字节:就先memcpy剩余头部结构数据,然后用头部结构->_data读取出length,该length就是后续value的字节大小。判断本次读取到的剩余数据大小是否足够length大小,如果不够就memcpy到消息结构,清空_data,再次调用async_read_some。如果足够就把length大小的数据memcpy到消息结构,并在末尾补上’\0’。
- 对于多余的数据,那是下一个length数据,因此把_b_head_parse置为false,再次async_read_some就会在回调函数中解析头部结构。
- 为了便于拆分length数据和value数据,定义2个结构来分别保存它们
io_context:在不同平台有不同的实现
在Windows平台实现了原生的proactor模式,其它平台则是通过reactor模拟proactor。这里的模拟也就是使用主线程来执行数据的读写,在用户态完成数据的复制,读写完成后通知工作线程,使得工作线程只需要处理业务逻辑,就像proactor一样。
当调用async_read异步读时,会向io_context注册读事件和读回调函数,
在应用层调用io_context.run执行事件循环,该死循环轮询事件发生,一旦事件发生就将对应的回调函数添加到就绪事件队列中,然后由单/多线程执取回调函数执行。
io_context将读回调函数放到事件队列里,在windows上使用的iocp模型,linux上使用的epoll模型,linux端epoll_wait等待事件发生,事件发生则将回调函数添加到就绪事件队列。
字节序问题
需要把主机字节序转为大端序(高位存到低地址):htonl
asio中为boost::asio::detail::socket_ops::host_to_network_long()