文章目录
- 1、简介
- 2、异步写 void AsyncWriteSomeToSocketErr(const std::string& buffer)
- 3、异步写void AsyncWriteSomeToSocket(const std::string& buffer)
- 4、异步写void AsyncSendToSocket(const std::string& buffer)
- 5、异步读void AsyncReadSomeToSocket(const std::string& buffer)
- 6、异步读void AsyncReceiveToSocket(const std::string& buffer)
- 7、总结
1、简介
本文介绍boost asio的异步读写操作及注意事项,为保证知识便于读者吸收,仅介绍api使用的代码片段。下一节再编写完整的客户端和服务器程序。
所以我们定义一个session类,这个session类表示服务器处理客户端连接的管理类
#pragma once
#ifndef __ASYNC_DEMO_H_2023_8_22__
#define __ASYNC_DEMO_H_2023_8_22__
#include<iostream>
#include<boost/asio.hpp>
#include<memory>
#include<string>
#include<queue>
#include"buffer.h"
class Session {
public:
Session(std::shared_ptr <boost::asio::ip::tcp::socket> socket);
bool Connect(boost::asio::ip::tcp::endpoint& ep);
private:
std::shared_ptr<boost::asio::ip::tcp::socket> socket_;
std::shared_ptr<Buffer> send_buffer_;
};
#endif // !__ASYNC_DEMO_H_2023_8_22__
session类定义了一个socket成员变量,负责处理对端**(ip+端口)的连接读写,封装了Connect**函数:
#include"async_demo.h"
Session::Session(std::shared_ptr<boost::asio::ip::tcp::socket> socket)
:socket_(socket)
{
send_buffer_ = nullptr;
}
bool Session::Connect(boost::asio::ip::tcp::endpoint& ep) {
socket_->connect(ep);
return true;
}
这里只是简单意思一下,下面核心介绍异步读写api的使用。
2、异步写 void AsyncWriteSomeToSocketErr(const std::string& buffer)
在写操作前,我们先封装一个Buffer结构。用来管理要发送和接收的数据,该结构包含数据域首地址,数据的总长度,以及已经处理的长度(已读的长度或者已写的长度)
写了两个构造函数,两个参数的负责构造写节点,一个参数的负责构造读节点。
#pragma once
#include<iostream>
//trv
const int RECVSIZE = 1024;
class Buffer {
public:
//发送消息协议
//param 协议首地址,协议总长度
Buffer(const char* msg,int32_t total_len)
:msg_(new char[total_len])
,total_len_(total_len)
,cur_len_(0)
{
memcpy(msg_, msg, total_len);
}
//接收消息协议
//param 协议总长度,当前接收协议长度
Buffer(int32_t total_len)
:total_len_(total_len)
,cur_len_(0)
{
msg_ = new char[total_len];
}
~Buffer() {
delete[] msg_;
}
char* GetMsg() {
return msg_;
}
int32_t GetTotalLen() {
return total_len_;
}
void SetTotalLen(int32_t total_len) {
total_len_ = total_len;
}
int32_t GetCurLen() {
return cur_len_;
}
void SetCurLen(int32_t cur_len) {
cur_len_ = cur_len;
}
private:
//消息协议的首地址
char* msg_;
//消息协议的总长度
int32_t total_len_;
//消息协议的当前发送长度 +上已经发送长度 = total_len (已经处理的长度(已读的长度或者已写的长度))
int32_t cur_len_;
};
接下来为Session添加异步写发送数据操作和负责发送写数据的节点。
#include<iostream>
#include<boost/asio.hpp>
#include<memory>
#include<string>
#include<queue>
#include"buffer.h"
class Session {
public:
Session(std::shared_ptr <boost::asio::ip::tcp::socket> socket);
bool Connect(boost::asio::ip::tcp::endpoint& ep);
//异步写 这个异步写存在问题
void AsyncWriteSomeToSocketErr(const std::string& buffer);
void AsyncWriteSomeCallBackErr(const boost::system::error_code& err, std::size_t bytes_transferred, std::shared_ptr<Buffer> buffer);
private:
std::shared_ptr<boost::asio::ip::tcp::socket> socket_;
std::shared_ptr<Buffer> send_buffer_;
};
#endif // !__ASYNC_DEMO_H_2023_8_22__
AsyncWriteSomeToSocketErr函数为我们封装的写操作,AsyncWriteSomeToSocketErr为异步写操作回调的函数,为什么会有三个参数呢,我们可以看一下asio源码:
template <typename ConstBufferSequence,
BOOST_ASIO_COMPLETION_TOKEN_FOR(void (boost::system::error_code,
std::size_t)) WriteToken
BOOST_ASIO_DEFAULT_COMPLETION_TOKEN_TYPE(executor_type)>
BOOST_ASIO_INITFN_AUTO_RESULT_TYPE_PREFIX(WriteToken,
void (boost::system::error_code, std::size_t))
async_write_some(const ConstBufferSequence& buffers,
BOOST_ASIO_MOVE_ARG(WriteToken) token
BOOST_ASIO_DEFAULT_COMPLETION_TOKEN(executor_type))
BOOST_ASIO_INITFN_AUTO_RESULT_TYPE_SUFFIX((
async_initiate<WriteToken,
void (boost::system::error_code, std::size_t)>(
declval<initiate_async_send>(), token,
buffers, socket_base::message_flags(0))))
{
return async_initiate<WriteToken,
void (boost::system::error_code, std::size_t)>(
initiate_async_send(this), token,
buffers, socket_base::message_flags(0));
}
async_write_some是异步写的函数,这个异步写函数有两个参数,第一个参数为ConstBufferSequence常引用类型的buffer,就是构造buffer结构。
第二个参数为WriteToken类型,而WriteToken在上面定义了,是一个函数对象类型,返回值为void,参数为error_code和size_t,所以我们为了调用async_write_some函数也要传入一个符合WriteToken定义的函数,就是我们声明的AsyncWriteSomeToSocketErr函数,前两个参数为WriteToken规定的参数,第三个参数为Buffer的智能指针,这样通过智能指针保证我们发送的Buffer数据生命周期延长。
我们看一下AsyncWriteSomeToSocketErr函数的具体实现:
void Session::AsyncWriteSomeToSocketErr(const std::string& buffer) {
//先构造一个发送节点
send_buffer_ = std::make_shared<Buffer>(buffer.c_str(), buffer.length());
//然后构造async_write_some的参数buffer和回调和函数
socket_->async_write_some(boost::asio::buffer(buffer.c_str(), buffer.length()),
//绑定成员函数的地址,类的对象,参数占位符1,参数占位符2
std::bind(&Session::AsyncWriteSomeCallBackErr, this, std::placeholders::_1, std::placeholders::_2, send_buffer_));
}
//TCP缓冲区 收发端不对等 发11字节 TCP缓冲区只有5字节 那么要分两次发送,假设发送hello world ,第一次只发送hello,\
world未发送,那么如果用户再次调用WriteCallBackErr那么底层不保护发送顺序,那么可能收到的结果hello hello world world \
解决这种就是用一个队列把存储的数据存放到队列里面
void Session::AsyncWriteSomeCallBackErr(const boost::system::error_code& err, std::size_t bytes_transferred, std::shared_ptr<Buffer> buffer_) {
if (err.value() != 0) {
std::cout << "error occured!error code: " << err.value() << " . message: " << err.what() << std::endl;
return;
}
if (bytes_transferred + buffer_->GetCurLen() < buffer_->GetTotalLen()) {
//buffer_->GetCurLen() = buffer_->GetCurLen() + bytes_transferred;
buffer_->SetCurLen(buffer_->GetCurLen() + bytes_transferred);
socket_->async_write_some(boost::asio::buffer(buffer_->GetMsg() + buffer_->GetCurLen(), buffer_->GetTotalLen() - buffer_->GetCurLen()),
std::bind(&Session::AsyncWriteSomeCallBackErr, this, std::placeholders::_1, std::placeholders::_2, buffer_));
}
}
这段代码的作用是实现异步发送数据的功能,主要包括两个函数:AsyncWriteSomeToSocketErr 和 AsyncWriteSomeCallBackErr。
-
AsyncWriteSomeToSocketErr 函数的作用是将数据放入发送队列中,并触发异步写操作。具体步骤如下:
- 首先,使用 std::make_shared 创建一个 Buffer 对象,这个对象用于存储要发送的数据。
- 然后,使用 socket_->async_write_some 函数触发异步写操作,将数据写入套接字。在这里,你绑定了回调函数 AsyncWriteSomeCallBackErr。
-
AsyncWriteSomeCallBackErr 函数是异步写操作完成后的回调函数。它的主要作用是处理写操作的结果,检查是否发生错误,以及是否需要继续发送剩余的数据。具体步骤如下:
- 首先,检查 err 参数,如果其值不为 0,表示发送出现错误,就输出错误信息并返回。
- 然后没有错误,检查已传输的字节数 bytes_transferred 加上 buffer_ 对象中已经发送的字节数 buffer_->GetCurLen() 是否小于总的数据长度 buffer_->GetTotalLen()。如果小于总长度,说明还有剩余数据需要发送。
- 如果有剩余数据需要发送,就更新 buffer_ 对象中的已发送字节数 buffer_->SetCurLen(buffer_->GetCurLen() + bytes_transferred),然后继续触发异步写操作,将剩余的数据发送出去。这里再次调用 socket_->async_write_some 并绑定了相同的回调函数,以便在写操作完成后再次检查和处理。
- 总体来说,这段代码实现了异步发送数据的逻辑,确保了数据的完整性和发送顺序。通过使用回调函数,可以在每次写操作完成后处理相应的逻辑,包括检查错误、更新已发送字节数以及触发下一次写操作。
在AsyncWriteSomeToSocketErr函数里判断如果已经发送的字节数没有达到要发送的总字节数,那么久更新节点已经发送的长度,然后计算剩余要发送的长度,如果有数据未发送完,再次调用async_write_some函数异步发送。
但是这个函数并不能投入实际应用,因为async_write_some回调函数返回已发送的字节数可能并不是全部长度。比如TCP发送缓存区总大小为8字节,但是有3字节未发送(上一次未发送完),这样剩余空间为5字节。
此时我们调用async_write_some发送hello world!实际发送的长度就是为5,也就是只发送了hello,剩余world!通过我们的回调继续发送。
而实际开发的场景用户是不清楚底层tcp的多路复用调用情况的,用户想发送数据的时候就调用WriteToSocketErr,或者循环调用WriteToSocketErr,很可能在一次没发送完数据还未调用回调函数时再次调用WriteToSocketErr,因为boost::asio封装的时epoll和iocp等多路复用模型。当写事件就绪后就发数据,发送的数据按照async_write_some调用的顺序发送,所以回调函数内调用的async_write_some可能并没有被及时调用。
比如我们如下代码:
//用户发送数据
AsyncWriteSomeToSocketErr("Hello World!");
//用户无感知下层调用情况又一次发送了数据
AsyncWriteSomeToSocketErr("Hello World!");
那么很可能第一次只发送了Hello,后面的数据没发完,第二次发送了Hello World!之后又发送了World!
所以对端收到的数据很可能是HelloHello World! World!
3、异步写void AsyncWriteSomeToSocket(const std::string& buffer)
那怎么解决这个问题呢,我们可以通过队列保证应用层的发送顺序。我们在Session中定义一个发送队列,然后重新定义正确的异步发送函数和回调处理:
#include<iostream>
#include<boost/asio.hpp>
#include<memory>
#include<string>
#include<queue>
#include"buffer.h"
class Session {
public:
Session(std::shared_ptr <boost::asio::ip::tcp::socket> socket);
bool Connect(boost::asio::ip::tcp::endpoint& ep);
//异步写 这个异步写存在问题
void AsyncWriteSomeToSocketErr(const std::string& buffer);
void AsyncWriteSomeCallBackErr(const boost::system::error_code& err, std::size_t bytes_transferred, std::shared_ptr<Buffer> buffer);
void AsyncWriteSomeToSocket(const std::string& buffer);
void AsyncWriteSomeCallBack(const boost::system::error_code& err, std::size_t bytes_transferred);
private:
std::shared_ptr<boost::asio::ip::tcp::socket> socket_;
std::shared_ptr<Buffer> send_buffer_;
std::queue<std::shared_ptr<Buffer>> send_queue_;
bool send_padding_;
};
#endif // !__ASYNC_DEMO_H_2023_8_22__
定义了bool变量send_padding_,该变量为true表示一个节点还未发送完,false代表发送完成。send_padding_ 用来缓存要发送的消息协议节点,是一个队列。
我们实现异步发送功能:
Session::Session(std::shared_ptr<boost::asio::ip::tcp::socket> socket)
:socket_(socket)
,send_padding_(false)
{
send_buffer_ = nullptr;
if (!send_queue_.empty()) {
send_queue_.pop();
}
}
函数实现:
void Session::AsyncWriteSomeToSocket(const std::string& buffer) {
//发送节点插入队列
send_queue_.emplace(new Buffer(buffer.c_str(), buffer.length()));
//判断是否还有未发完的数据,false,表示没有,true表示还有
if (send_padding_) {
return;
}
//异步发送数据
socket_->async_write_some(boost::asio::buffer(buffer.c_str(), buffer.length()), std::bind(&Session::AsyncWriteSomeCallBack, this
, std::placeholders::_1, std::placeholders::_2));
send_padding_ = true;
}
void Session::AsyncWriteSomeCallBack(const boost::system::error_code& err, size_t bytes_transferred) {
if (err.value() != 0) {
std::cout << "error occured!error code: " << err.value() << " .message: " << err.what() << std::endl;
return;
}
//取出队列中队首元素
std::shared_ptr<Buffer> send_data = send_queue_.front();
send_data->SetCurLen(send_data->GetCurLen() + bytes_transferred);
//数据未发送完,继承调用异步函数取出队首元素发送
if (send_data->GetCurLen() < send_data->GetTotalLen()) {
socket_->async_write_some(boost::asio::buffer(send_data->GetMsg() + send_data->GetCurLen(),
send_data->GetTotalLen() - send_data->GetCurLen()),
std::bind(&Session::AsyncWriteSomeCallBack, this, std::placeholders::_1, std::placeholders::_2));
return;
}
//如果这个数据发送完了,把数据节点取出来
send_queue_.pop();
//判断队列里面是否还有下一个数据
if (send_queue_.empty()) {
send_padding_ = false;
return;
}
//有数据则继续发送
if (!send_queue_.empty()) {
std::shared_ptr<Buffer> send_data_next = send_queue_.front();
//异步发送的地址偏移
socket_->async_write_some(boost::asio::buffer(send_data_next->GetMsg() + send_data_next->GetCurLen(),
send_data_next->GetTotalLen() - send_data_next->GetCurLen()),
std::bind(&Session::AsyncWriteSomeCallBack, this, std::placeholders::_1, std::placeholders::_2));
}
}
这段代码的作用是实现异步发送数据并保证发送顺序的逻辑,主要包括两个函数:AsyncWriteSomeToSocket 和 AsyncWriteSomeCallBack。
-
AsyncWriteSomeToSocket 函数的作用是将数据放入发送队列中,并触发异步写操作。具体步骤如下:
- 首先,将一个新的 Buffer 对象(用于存储要发送的数据)插入到 send_queue_ 队列中。
- 接着,检查是否还有未发完的数据,如果有,说明还在等待前一次异步写操作完成,直接返回。
- 如果没有未发完的数据,说明可以触发异步发送操作,使用 socket_->async_write_some 函数将数据写入套接字,并绑定回调函数 AsyncWriteSomeCallBack。
-
AsyncWriteSomeCallBack 函数是异步写操作完成后的回调函数。其主要作用是处理写操作的结果,继续发送队列中的下一个数据。具体步骤如下:
- 首先,检查 err 参数,如果其值不为 0,表示发送出现错误,就输出错误信息并返回。
- 然后,取出队列中队首元素,该元素是一个 Buffer 对象,表示待发送的数据。
- 接着,更新这个数据的已发送字节数 send_data->SetCurLen(send_data->GetCurLen() + bytes_transferred)。
- 然后,检查数据是否已经全部发送完,如果未发送完,则继续触发异步写操作,将剩余的数据发送出去。
- 如果这个数据已经发送完毕,就从队列中移除这个数据节点,并检查队列是否还有下一个数据。
- 如果队列不为空,表示还有数据需要发送,就取出下一个数据节点,更新已发送字节数,并触发下一个异步写操作,以便发送下一个数据。
这段代码的设计确保了数据的发送顺序,即使在异步发送的情况下也可以保持数据的完整性和顺序。如果发送错误,它也会正确地处理错误情况。
async_write_some函数不能保证每次回调函数触发时发送的长度为要总长度,这样我们每次都要在回调函数判断发送数据是否完成,asio提供了一个更简单的发送函数async_send,这个函数在发送的长度未达到我们要求的长度时就不会触发回调,所以触发回调函数时要么时发送出错了要么是发送完成了,其内部的实现原理就是帮我们不断的调用async_write_some直到完成发送,所以async_send不能和async_write_some混合使用,我们基于async_send封装另外一个发送函数。
4、异步写void AsyncSendToSocket(const std::string& buffer)
函数定义:
#include<iostream>
#include<boost/asio.hpp>
#include<memory>
#include<string>
#include<queue>
#include"buffer.h"
class Session {
public:
Session(std::shared_ptr <boost::asio::ip::tcp::socket> socket);
bool Connect(boost::asio::ip::tcp::endpoint& ep);
//异步写 这个异步写存在问题
void AsyncWriteSomeToSocketErr(const std::string& buffer);
void AsyncWriteSomeCallBackErr(const boost::system::error_code& err, std::size_t bytes_transferred, std::shared_ptr<Buffer> buffer);
void AsyncWriteSomeToSocket(const std::string& buffer);
void AsyncWriteSomeCallBack(const boost::system::error_code& err, std::size_t bytes_transferred);
//优先取这个
void AsyncSendToSocket(const std::string& buffer);
void AsyncSendCallBack(boost::system::error_code& err, std::size_t bytes_transferred);
private:
std::shared_ptr<boost::asio::ip::tcp::socket> socket_;
std::shared_ptr<Buffer> send_buffer_;
std::queue<std::shared_ptr<Buffer>> send_queue_;
bool send_padding_;
};
#endif // !__ASYNC_DEMO_H_2023_8_22__
函数实现:
void Session::AsyncSendToSocket(const std::string& buffer) {
//把发送消息协议构造成节点插入队列
send_queue_.emplace(new Buffer(buffer.c_str(), buffer.length()));
//判断是否还有未发完数据
if (send_padding_) {
return;
}
//异步发送数据
socket_->async_send(boost::asio::buffer(buffer.c_str(), buffer.length()),
std::bind(&Session::AsyncSendCallBack, this, std::placeholders::_1, std::placeholders::_2));
send_padding_ = true;
}
void Session::AsyncSendCallBack(boost::system::error_code& err, std::size_t bytes_transferred) {
if (0 != err.value()) {
//发送数据失败
std::cout << "error occured!error code: " << err.value() << " .message: " << err.what() << std::endl;
return;
}
//因为调用的是async_send()它的设计目标是简化发送数据的过程,\
让用户不必关心数据的细节,只需提供要发送的数据和回调函数即可
send_queue_.pop();
if (send_queue_.empty()) {
send_padding_ = false;
return;
}
if (!send_queue_.empty()) {
std::shared_ptr<Buffer> send_data_next = send_queue_.front();
//异步发送发生地址偏移
socket_->async_send(boost::asio::buffer(send_data_next->GetMsg() + send_data_next->GetCurLen(),
send_data_next->GetTotalLen() - send_data_next->GetCurLen()),
std::bind(&Session::AsyncSendCallBack, this,std::placeholders::_1, std::placeholders::_2));
}
}
这段代码的目的是实现异步发送数据,并在发送完成后调用回调函数进行处理。这与你之前提到的代码逻辑类似,但使用了 async_send 函数代替了 async_write_some,并且没有需要手动维护已发送字节数。
-
具体的逻辑如下:
-
AsyncSendToSocket 函数用于将数据包装成一个 Buffer 对象并插入发送队列 send_queue_ 中。
-
接着,检查是否已经有数据正在等待发送(send_padding_ 是否为 true),如果是,则说明还在等待前一次异步发送完成,直接返回。
-
如果没有等待发送的数据,就调用 socket_->async_send 函数进行异步发送。这个函数会将数据发送到套接字,并在发送完成后调用回调函数 AsyncSendCallBack。
-
在 AsyncSendCallBack 回调函数中,首先检查错误码 err,如果不为 0,表示发送出现错误,输出错误信息并返回。
-
如果发送成功,就从发送队列中弹出已发送的数据 (send_queue_.pop()),并检查队列是否为空。如果队列为空,说明没有待发送的数据,将 send_padding_ 设置为 false 表示没有数据需要发送。
-
如果队列不为空,表示还有待发送的数据,就取出队列的头部元素,即下一个要发送的数据,然后调用 socket_->async_send 再次异步发送数据。这个过程会重复,直到队列中的数据全部发送完毕。
-
总体而言,这段代码实现了异步发送数据的功能,保证了发送的顺序,同时也能正确处理发送过程中的错误。不同之处在于,它使用了 async_send 函数,该函数封装了发送的细节,使得发送数据更加方便。
5、异步读void AsyncReadSomeToSocket(const std::string& buffer)
接下来介绍异步读操作,异步读操作和异步的写操作类似同样又async_read_some和async_receive函数,前者触发的回调函数获取的读数据的长度可能会小于要求读取的总长度,后者触发的回调函数读取的数据长度等于读取的总长度。
先基于async_read_some封装一个读取的函数AsyncReadSomeToSocket,同样在Session类的声明中添加一些变量:
函数定义:
#include<iostream>
#include<boost/asio.hpp>
#include<memory>
#include<string>
#include<queue>
#include"buffer.h"
class Session {
public:
Session(std::shared_ptr <boost::asio::ip::tcp::socket> socket);
bool Connect(boost::asio::ip::tcp::endpoint& ep);
//异步写 这个异步写存在问题
void AsyncWriteSomeToSocketErr(const std::string& buffer);
void AsyncWriteSomeCallBackErr(const boost::system::error_code& err, std::size_t bytes_transferred, std::shared_ptr<Buffer> buffer);
void AsyncWriteSomeToSocket(const std::string& buffer);
void AsyncWriteSomeCallBack(const boost::system::error_code& err, std::size_t bytes_transferred);
//优先取这个
void AsyncSendToSocket(const std::string& buffer);
void AsyncSendCallBack(boost::system::error_code& err, std::size_t bytes_transferred);
//异步读 优先取这个
void AsyncReadSomeToSocket(const std::string& buffer);
void AsyncReadSomeCallBack(boost::system::error_code& err, std::size_t bytes_transferred);
private:
std::shared_ptr<boost::asio::ip::tcp::socket> socket_;
std::shared_ptr<Buffer> send_buffer_;
std::queue<std::shared_ptr<Buffer>> send_queue_;
std::shared_ptr<Buffer> recv_buffer_;
bool send_padding_;
bool recv_padding_;
};
#endif // !__ASYNC_DEMO_H_2023_8_22__
函数实现:
void Session::AsyncReadSomeToSocket(const std::string& buffer) {
//判断是否正在读数据,这里第一次读数据
if (recv_padding_) {
return;
}
recv_buffer_ = std::make_shared<Buffer>(buffer.c_str(), buffer.length());
//异步读取数据
socket_->async_read_some(boost::asio::buffer(recv_buffer_->GetMsg() + recv_buffer_->GetCurLen(),
recv_buffer_->GetTotalLen() - recv_buffer_->GetCurLen()),
std::bind(&Session::AsyncReadSomeCallBack, this, std::placeholders::_1, std::placeholders::_2));
recv_padding_ = true;
}
void Session::AsyncReadSomeCallBack(boost::system::error_code& err, std::size_t bytes_transferred) {
if (0 != err.value()) {
std::cout << "error occured!error code: " << err.value() << " .message: " << err.what() << std::endl;
return;
}
//判断读取的字节数,没有读取完继续读取
recv_buffer_->SetCurLen(recv_buffer_->GetCurLen() + bytes_transferred);
if (recv_buffer_->GetCurLen() < recv_buffer_->GetTotalLen()) {
socket_->async_read_some(boost::asio::buffer(recv_buffer_->GetMsg() + recv_buffer_->GetCurLen(),
recv_buffer_->GetTotalLen() - recv_buffer_->GetCurLen()),
std::bind(&Session::AsyncReadSomeCallBack, this, std::placeholders::_1, std::placeholders::_2));
return;
}
//将数据投递到队列里交给逻辑线程处理,此处略去
//如果读完了则将标记置为false
recv_padding_ = false;
recv_buffer_ = nullptr;
}
这段代码的主要功能是异步读取数据,并在读取完成后调用回调函数 AsyncReadSomeCallBack 处理数据。以下是代码逻辑的详细解释:
-
AsyncReadSomeToSocket 函数用于异步读取数据。在这个函数中,首先检查 recv_padding_ 是否为 true。如果为 true,表示正在读取数据,直接返回,避免重复读取。
-
如果 recv_padding_ 为 false,说明可以开始读取数据。这时,创建一个 Buffer 对象 recv_buffer_,并初始化为要读取的数据。
-
接着,调用 socket_->async_read_some 函数进行异步读取数据。这个函数会在读取完成后调用回调函数 AsyncReadSomeCallBack。
-
在 AsyncReadSomeCallBack 回调函数中,首先检查错误码 err,如果不为 0,表示读取出现错误,输出错误信息并返回。
-
如果读取成功,将已读取的字节数添加到 recv_buffer_ 的当前长度 CurLen 中。然后,检查是否已经读取完所有数据,即 CurLen 是否小于 TotalLen。
-
如果未读取完,继续调用 socket_->async_read_some 函数继续异步读取剩余的数据,直到读取完所有数据。
-
如果读取完了,将 recv_padding_ 置为 false,表示没有正在读取的数据。最后,清空 recv_buffer_ 对象,以便下次读取新的数据。
这段代码实现了异步读取数据的逻辑,确保数据被正确读取并处理。如果数据没有完全读取,它会继续异步读取剩余的部分,直到读取完整个数据。如果有新的数据需要读取,可以再次调用 AsyncReadSomeToSocket。
6、异步读void AsyncReceiveToSocket(const std::string& buffer)
我们基于async_receive再封装一个接收数据的函数:
函数声明:
#pragma once
#ifndef __ASYNC_DEMO_H_2023_8_22__
#define __ASYNC_DEMO_H_2023_8_22__
#include<iostream>
#include<boost/asio.hpp>
#include<memory>
#include<string>
#include<queue>
#include"buffer.h"
class Session {
public:
Session(std::shared_ptr <boost::asio::ip::tcp::socket> socket);
bool Connect(boost::asio::ip::tcp::endpoint& ep);
//异步写 这个异步写存在问题
void AsyncWriteSomeToSocketErr(const std::string& buffer);
void AsyncWriteSomeCallBackErr(const boost::system::error_code& err, std::size_t bytes_transferred, std::shared_ptr<Buffer> buffer);
void AsyncWriteSomeToSocket(const std::string& buffer);
void AsyncWriteSomeCallBack(const boost::system::error_code& err, std::size_t bytes_transferred);
//优先取这个
void AsyncSendToSocket(const std::string& buffer);
void AsyncSendCallBack(boost::system::error_code& err, std::size_t bytes_transferred);
//异步读 优先取这个
void AsyncReadSomeToSocket(const std::string& buffer);
void AsyncReadSomeCallBack(boost::system::error_code& err, std::size_t bytes_transferred);
void AsyncReceiveToSocket(const std::string& buffer);
void AsyncReceiveCallBack(boost::system::error_code& err, std::size_t bytes_transferred);
private:
std::shared_ptr<boost::asio::ip::tcp::socket> socket_;
std::shared_ptr<Buffer> send_buffer_;
std::queue<std::shared_ptr<Buffer>> send_queue_;
std::shared_ptr<Buffer> recv_buffer_;
bool send_padding_;
bool recv_padding_;
};
#endif // !__ASYNC_DEMO_H_2023_8_22__
函数实现:
void Session::AsyncReceiveToSocket(const std::string& buffer) {
//判断是否有数据正在读取
if (recv_padding_) {
return;
}
recv_buffer_ = std::make_shared<Buffer>(buffer.c_str(), buffer.length());
socket_->async_receive(boost::asio::buffer(recv_buffer_->GetMsg() + recv_buffer_->GetCurLen(),
recv_buffer_->GetTotalLen() - recv_buffer_->GetCurLen()),
std::bind(&Session::AsyncReceiveCallBack, this, std::placeholders::_1, std::placeholders::_2));
recv_padding_ = true;
}
void Session::AsyncReceiveCallBack(boost::system::error_code& err, std::size_t bytes_transferred) {
if (0 != err.value()) {
std::cout << "error occured!error code: " << err.value() << " .message: " << err.what() << std::endl;
return;
}
recv_buffer_->SetCurLen(recv_buffer_->GetCurLen() + bytes_transferred);
recv_padding_ = false;
recv_buffer_ = nullptr;
}
这段代码看起来非常类似于前面提到的异步读取数据的代码。它实现了异步接收数据的逻辑,以下是代码的详细解释:
-
AsyncReceiveToSocket 函数用于异步接收数据。首先,它检查 recv_padding_ 是否为 true,如果为 true,表示已经有数据在读取,直接返回,以避免重复接收。
-
如果 recv_padding_ 为 false,说明可以开始接收数据。这时,创建一个 Buffer 对象 recv_buffer_,并初始化为要接收的数据。
接着,调用 socket_->async_receive 函数进行异步接收数据。这个函数会在接收完成后调用回调函数 AsyncReceiveCallBack。
在 AsyncReceiveCallBack 回调函数中,首先检查错误码 err,如果不为 0,表示接收出现错误,输出错误信息并返回。
如果接收成功,将已接收的字节数添加到 recv_buffer_ 的当前长度 CurLen 中。然后,将 recv_padding_ 置为 false,表示没有正在接收的数据。
最后,清空 recv_buffer_ 对象,以便下次接收新的数据。
这段代码实现了异步接收数据的逻辑,确保数据被正确接收并处理。如果数据没有完全接收,它会继续异步接收剩余的部分,直到接收完整个数据。如果有新的数据需要接收,可以再次调用 AsyncReceiveToSocket。
同样async_read_some和async_receive不能混合使用,否则会出现逻辑问题。
7、总结
总体代码声明:
#pragma once
#ifndef __ASYNC_DEMO_H_2023_8_22__
#define __ASYNC_DEMO_H_2023_8_22__
#include<iostream>
#include<boost/asio.hpp>
#include<memory>
#include<string>
#include<queue>
#include"buffer.h"
class Session {
public:
Session(std::shared_ptr <boost::asio::ip::tcp::socket> socket);
bool Connect(boost::asio::ip::tcp::endpoint& ep);
//异步写 这个异步写存在问题
void AsyncWriteSomeToSocketErr(const std::string& buffer);
void AsyncWriteSomeCallBackErr(const boost::system::error_code& err, std::size_t bytes_transferred, std::shared_ptr<Buffer> buffer);
void AsyncWriteSomeToSocket(const std::string& buffer);
void AsyncWriteSomeCallBack(const boost::system::error_code& err, std::size_t bytes_transferred);
//优先取这个
void AsyncSendToSocket(const std::string& buffer);
void AsyncSendCallBack(boost::system::error_code& err, std::size_t bytes_transferred);
//异步读 优先取这个
void AsyncReadSomeToSocket(const std::string& buffer);
void AsyncReadSomeCallBack(boost::system::error_code& err, std::size_t bytes_transferred);
void AsyncReceiveToSocket(const std::string& buffer);
void AsyncReceiveCallBack(boost::system::error_code& err, std::size_t bytes_transferred);
private:
std::shared_ptr<boost::asio::ip::tcp::socket> socket_;
std::shared_ptr<Buffer> send_buffer_;
std::queue<std::shared_ptr<Buffer>> send_queue_;
std::shared_ptr<Buffer> recv_buffer_;
bool send_padding_;
bool recv_padding_;
};
#endif // !__ASYNC_DEMO_H_2023_8_22__
总体代码定义:
#include"async_demo.h"
Session::Session(std::shared_ptr<boost::asio::ip::tcp::socket> socket)
:socket_(socket)
,send_padding_(false)
,recv_padding_(false)
{
send_buffer_ = nullptr;
recv_buffer_ = nullptr;
if (!send_queue_.empty()) {
send_queue_.pop();
}
}
bool Session::Connect(boost::asio::ip::tcp::endpoint& ep) {
socket_->connect(ep);
return true;
}
void Session::AsyncWriteSomeToSocketErr(const std::string& buffer) {
//先构造一个发送节点
send_buffer_ = std::make_shared<Buffer>(buffer.c_str(), buffer.length());
//然后构造async_write_some的参数buffer和回调和函数
socket_->async_write_some(boost::asio::buffer(buffer.c_str(), buffer.length()),
//绑定成员函数的地址,类的对象,参数占位符1,参数占位符2
std::bind(&Session::AsyncWriteSomeCallBackErr, this, std::placeholders::_1, std::placeholders::_2, send_buffer_));
}
//TCP缓冲区 收发端不对等 发11字节 TCP缓冲区只有5字节 那么要分两次发送,假设发送hello world ,第一次只发送hello,\
world未发送,那么如果用户再次调用WriteCallBackErr那么底层不保护发送顺序,那么可能收到的结果hello hello world world \
解决这种就是用一个队列把存储的数据存放到队列里面
void Session::AsyncWriteSomeCallBackErr(const boost::system::error_code& err, std::size_t bytes_transferred, std::shared_ptr<Buffer> buffer_) {
if (err.value() != 0) {
std::cout << "error occured!error code: " << err.value() << " . message: " << err.what() << std::endl;
return;
}
if (bytes_transferred + buffer_->GetCurLen() < buffer_->GetTotalLen()) {
//buffer_->GetCurLen() = buffer_->GetCurLen() + bytes_transferred;
buffer_->SetCurLen(buffer_->GetCurLen() + bytes_transferred);
socket_->async_write_some(boost::asio::buffer(buffer_->GetMsg() + buffer_->GetCurLen(), buffer_->GetTotalLen() - buffer_->GetCurLen()),
std::bind(&Session::AsyncWriteSomeCallBackErr, this, std::placeholders::_1, std::placeholders::_2, buffer_));
}
}
void Session::AsyncWriteSomeToSocket(const std::string& buffer) {
//发送节点插入队列
send_queue_.emplace(new Buffer(buffer.c_str(), buffer.length()));
//判断是否还有未发完的数据,false,表示没有,true表示还有
if (send_padding_) {
return;
}
//异步发送数据
socket_->async_write_some(boost::asio::buffer(buffer.c_str(), buffer.length()), std::bind(&Session::AsyncWriteSomeCallBack, this
, std::placeholders::_1, std::placeholders::_2));
send_padding_ = true;
}
void Session::AsyncWriteSomeCallBack(const boost::system::error_code& err, size_t bytes_transferred) {
if (err.value() != 0) {
std::cout << "error occured!error code: " << err.value() << " .message: " << err.what() << std::endl;
return;
}
//取出队列中队首元素
std::shared_ptr<Buffer> send_data = send_queue_.front();
send_data->SetCurLen(send_data->GetCurLen() + bytes_transferred);
//数据未发送完,继承调用异步函数取出队首元素发送
if (send_data->GetCurLen() < send_data->GetTotalLen()) {
socket_->async_write_some(boost::asio::buffer(send_data->GetMsg() + send_data->GetCurLen(),
send_data->GetTotalLen() - send_data->GetCurLen()),
std::bind(&Session::AsyncWriteSomeCallBack, this, std::placeholders::_1, std::placeholders::_2));
return;
}
//如果这个数据发送完了,把数据节点取出来
send_queue_.pop();
//判断队列里面是否还有下一个数据
if (send_queue_.empty()) {
send_padding_ = false;
return;
}
//有数据则继续发送
if (!send_queue_.empty()) {
std::shared_ptr<Buffer> send_data_next = send_queue_.front();
//异步发送的地址偏移
socket_->async_write_some(boost::asio::buffer(send_data_next->GetMsg() + send_data_next->GetCurLen(),
send_data_next->GetTotalLen() - send_data_next->GetCurLen()),
std::bind(&Session::AsyncWriteSomeCallBack, this, std::placeholders::_1, std::placeholders::_2));
}
}
void Session::AsyncSendToSocket(const std::string& buffer) {
//把发送消息协议构造成节点插入队列
send_queue_.emplace(new Buffer(buffer.c_str(), buffer.length()));
//判断是否还有未发完数据
if (send_padding_) {
return;
}
//异步发送数据
socket_->async_send(boost::asio::buffer(buffer.c_str(), buffer.length()),
std::bind(&Session::AsyncSendCallBack, this, std::placeholders::_1, std::placeholders::_2));
send_padding_ = true;
}
void Session::AsyncSendCallBack(boost::system::error_code& err, std::size_t bytes_transferred) {
if (0 != err.value()) {
//发送数据失败
std::cout << "error occured!error code: " << err.value() << " .message: " << err.what() << std::endl;
return;
}
//因为调用的是async_send()它的设计目标是简化发送数据的过程,\
让用户不必关心数据的细节,只需提供要发送的数据和回调函数即可
send_queue_.pop();
if (send_queue_.empty()) {
send_padding_ = false;
return;
}
if (!send_queue_.empty()) {
std::shared_ptr<Buffer> send_data_next = send_queue_.front();
//异步发送发生地址偏移
socket_->async_send(boost::asio::buffer(send_data_next->GetMsg() + send_data_next->GetCurLen(),
send_data_next->GetTotalLen() - send_data_next->GetCurLen()),
std::bind(&Session::AsyncSendCallBack, this,std::placeholders::_1, std::placeholders::_2));
}
}
void Session::AsyncReadSomeToSocket(const std::string& buffer) {
//判断是否正在读数据,这里第一次读数据
if (recv_padding_) {
return;
}
recv_buffer_ = std::make_shared<Buffer>(buffer.c_str(), buffer.length());
//异步读取数据
socket_->async_read_some(boost::asio::buffer(recv_buffer_->GetMsg() + recv_buffer_->GetCurLen(),
recv_buffer_->GetTotalLen() - recv_buffer_->GetCurLen()),
std::bind(&Session::AsyncReadSomeCallBack, this, std::placeholders::_1, std::placeholders::_2));
recv_padding_ = true;
}
void Session::AsyncReadSomeCallBack(boost::system::error_code& err, std::size_t bytes_transferred) {
if (0 != err.value()) {
std::cout << "error occured!error code: " << err.value() << " .message: " << err.what() << std::endl;
return;
}
//判断读取的字节数,没有读取完继续读取
recv_buffer_->SetCurLen(recv_buffer_->GetCurLen() + bytes_transferred);
if (recv_buffer_->GetCurLen() < recv_buffer_->GetTotalLen()) {
socket_->async_read_some(boost::asio::buffer(recv_buffer_->GetMsg() + recv_buffer_->GetCurLen(),
recv_buffer_->GetTotalLen() - recv_buffer_->GetCurLen()),
std::bind(&Session::AsyncReadSomeCallBack, this, std::placeholders::_1, std::placeholders::_2));
return;
}
//将数据投递到队列里交给逻辑线程处理,此处略去
//如果读完了则将标记置为false
recv_padding_ = false;
recv_buffer_ = nullptr;
}
void Session::AsyncReceiveToSocket(const std::string& buffer) {
//判断是否有数据正在读取
if (recv_padding_) {
return;
}
recv_buffer_ = std::make_shared<Buffer>(buffer.c_str(), buffer.length());
socket_->async_receive(boost::asio::buffer(recv_buffer_->GetMsg() + recv_buffer_->GetCurLen(),
recv_buffer_->GetTotalLen() - recv_buffer_->GetCurLen()),
std::bind(&Session::AsyncReceiveCallBack, this, std::placeholders::_1, std::placeholders::_2));
recv_padding_ = true;
}
void Session::AsyncReceiveCallBack(boost::system::error_code& err, std::size_t bytes_transferred) {
if (0 != err.value()) {
std::cout << "error occured!error code: " << err.value() << " .message: " << err.what() << std::endl;
return;
}
recv_buffer_->SetCurLen(recv_buffer_->GetCurLen() + bytes_transferred);
recv_padding_ = false;
recv_buffer_ = nullptr;
}
本文介绍了boost asio异步读写的操作,仅仅是代码片段和api的封装便于大家理解,下一篇利用这些异步api写一个异步的服务器展示收发效果。