【boost网络库从青铜到王者】第六篇:asio网络编程中的socket异步读(接收)写(发送)

news2024/11/25 0:53:01

文章目录

  • 1、简介
  • 2、异步写 void AsyncWriteSomeToSocketErr(const std::string& buffer)
  • 3、异步写void AsyncWriteSomeToSocket(const std::string& buffer)
  • 4、异步写void AsyncSendToSocket(const std::string& buffer)
  • 5、异步读void AsyncReadSomeToSocket(const std::string& buffer)
  • 6、异步读void AsyncReceiveToSocket(const std::string& buffer)
  • 7、总结

1、简介

本文介绍boost asio的异步读写操作及注意事项,为保证知识便于读者吸收,仅介绍api使用的代码片段。下一节再编写完整的客户端和服务器程序。

所以我们定义一个session类,这个session类表示服务器处理客户端连接的管理类

#pragma once
#ifndef __ASYNC_DEMO_H_2023_8_22__
#define __ASYNC_DEMO_H_2023_8_22__
#include<iostream>
#include<boost/asio.hpp>
#include<memory>
#include<string>
#include<queue>
#include"buffer.h"

class Session {
public:
	Session(std::shared_ptr <boost::asio::ip::tcp::socket> socket);
	bool Connect(boost::asio::ip::tcp::endpoint& ep);
	private:
	std::shared_ptr<boost::asio::ip::tcp::socket> socket_;
	std::shared_ptr<Buffer> send_buffer_;
	};

#endif // !__ASYNC_DEMO_H_2023_8_22__

session类定义了一个socket成员变量,负责处理对端**(ip+端口)的连接读写,封装了Connect**函数:

#include"async_demo.h"

Session::Session(std::shared_ptr<boost::asio::ip::tcp::socket> socket)
	:socket_(socket)
{
	send_buffer_ = nullptr;
}

bool Session::Connect(boost::asio::ip::tcp::endpoint& ep) {
	socket_->connect(ep);

	return true;
}

这里只是简单意思一下,下面核心介绍异步读写api的使用。

2、异步写 void AsyncWriteSomeToSocketErr(const std::string& buffer)

在写操作前,我们先封装一个Buffer结构。用来管理要发送和接收的数据,该结构包含数据域首地址,数据的总长度,以及已经处理的长度(已读的长度或者已写的长度)

写了两个构造函数,两个参数的负责构造写节点,一个参数的负责构造读节点。

#pragma once
#include<iostream>

//trv
const int RECVSIZE = 1024;
class Buffer {
public:
	//发送消息协议
	//param 协议首地址,协议总长度
	Buffer(const char* msg,int32_t total_len)
		:msg_(new char[total_len])
		,total_len_(total_len)
		,cur_len_(0)
	{
		memcpy(msg_, msg, total_len);
	}
	//接收消息协议
	//param 协议总长度,当前接收协议长度
	Buffer(int32_t total_len)
		:total_len_(total_len)
		,cur_len_(0)
	{
		msg_ = new char[total_len];
	}

	~Buffer() {
		delete[] msg_;
	}

	char* GetMsg() {
		return msg_;
	}

	int32_t GetTotalLen() {
		return total_len_;
	}

	void SetTotalLen(int32_t total_len) {
		total_len_ = total_len;
	}

	int32_t GetCurLen() {
		return cur_len_;
	}

	void SetCurLen(int32_t cur_len) {
		cur_len_ = cur_len;
	}
private:
	//消息协议的首地址
	char* msg_;
	//消息协议的总长度
	int32_t total_len_;
	//消息协议的当前发送长度 +上已经发送长度 = total_len (已经处理的长度(已读的长度或者已写的长度))
	int32_t cur_len_;
};

接下来为Session添加异步写发送数据操作和负责发送写数据的节点。

#include<iostream>
#include<boost/asio.hpp>
#include<memory>
#include<string>
#include<queue>
#include"buffer.h"

class Session {
public:
	Session(std::shared_ptr <boost::asio::ip::tcp::socket> socket);
	bool Connect(boost::asio::ip::tcp::endpoint& ep);

	//异步写  这个异步写存在问题
	void AsyncWriteSomeToSocketErr(const std::string& buffer);
	void AsyncWriteSomeCallBackErr(const boost::system::error_code& err, std::size_t bytes_transferred, std::shared_ptr<Buffer> buffer);
private:
	std::shared_ptr<boost::asio::ip::tcp::socket> socket_;
	std::shared_ptr<Buffer> send_buffer_;
};

#endif // !__ASYNC_DEMO_H_2023_8_22__

在这里插入图片描述

AsyncWriteSomeToSocketErr函数为我们封装的写操作,AsyncWriteSomeToSocketErr为异步写操作回调的函数,为什么会有三个参数呢,我们可以看一下asio源码:

  template <typename ConstBufferSequence,
      BOOST_ASIO_COMPLETION_TOKEN_FOR(void (boost::system::error_code,
        std::size_t)) WriteToken
          BOOST_ASIO_DEFAULT_COMPLETION_TOKEN_TYPE(executor_type)>
  BOOST_ASIO_INITFN_AUTO_RESULT_TYPE_PREFIX(WriteToken,
      void (boost::system::error_code, std::size_t))
  async_write_some(const ConstBufferSequence& buffers,
      BOOST_ASIO_MOVE_ARG(WriteToken) token
        BOOST_ASIO_DEFAULT_COMPLETION_TOKEN(executor_type))
    BOOST_ASIO_INITFN_AUTO_RESULT_TYPE_SUFFIX((
      async_initiate<WriteToken,
        void (boost::system::error_code, std::size_t)>(
          declval<initiate_async_send>(), token,
          buffers, socket_base::message_flags(0))))
  {
    return async_initiate<WriteToken,
      void (boost::system::error_code, std::size_t)>(
        initiate_async_send(this), token,
        buffers, socket_base::message_flags(0));
  }

async_write_some是异步写的函数,这个异步写函数有两个参数,第一个参数为ConstBufferSequence常引用类型的buffer,就是构造buffer结构。

第二个参数为WriteToken类型,而WriteToken在上面定义了,是一个函数对象类型,返回值为void,参数为error_codesize_t,所以我们为了调用async_write_some函数也要传入一个符合WriteToken定义的函数,就是我们声明的AsyncWriteSomeToSocketErr函数,前两个参数为WriteToken规定的参数,第三个参数为Buffer的智能指针,这样通过智能指针保证我们发送的Buffer数据生命周期延长。

我们看一下AsyncWriteSomeToSocketErr函数的具体实现:

void Session::AsyncWriteSomeToSocketErr(const std::string& buffer) {
	//先构造一个发送节点
	send_buffer_ = std::make_shared<Buffer>(buffer.c_str(), buffer.length());
	//然后构造async_write_some的参数buffer和回调和函数
	socket_->async_write_some(boost::asio::buffer(buffer.c_str(), buffer.length()),
		//绑定成员函数的地址,类的对象,参数占位符1,参数占位符2
		std::bind(&Session::AsyncWriteSomeCallBackErr, this, std::placeholders::_1, std::placeholders::_2, send_buffer_));
}

//TCP缓冲区 收发端不对等 发11字节 TCP缓冲区只有5字节 那么要分两次发送,假设发送hello world ,第一次只发送hello,\
world未发送,那么如果用户再次调用WriteCallBackErr那么底层不保护发送顺序,那么可能收到的结果hello hello world world \
解决这种就是用一个队列把存储的数据存放到队列里面
void Session::AsyncWriteSomeCallBackErr(const boost::system::error_code& err, std::size_t bytes_transferred, std::shared_ptr<Buffer> buffer_) {
	if (err.value() != 0) {
		std::cout << "error occured!error code: " << err.value() << " . message: " << err.what() << std::endl;
		return;
	}
	if (bytes_transferred + buffer_->GetCurLen() < buffer_->GetTotalLen()) {
		//buffer_->GetCurLen() = buffer_->GetCurLen() + bytes_transferred;
		buffer_->SetCurLen(buffer_->GetCurLen() + bytes_transferred);
		socket_->async_write_some(boost::asio::buffer(buffer_->GetMsg() + buffer_->GetCurLen(), buffer_->GetTotalLen() - buffer_->GetCurLen()),
			std::bind(&Session::AsyncWriteSomeCallBackErr, this, std::placeholders::_1, std::placeholders::_2, buffer_));
	}
}

这段代码的作用是实现异步发送数据的功能,主要包括两个函数:AsyncWriteSomeToSocketErrAsyncWriteSomeCallBackErr

  • AsyncWriteSomeToSocketErr 函数的作用是将数据放入发送队列中,并触发异步写操作。具体步骤如下:

    • 首先,使用 std::make_shared 创建一个 Buffer 对象,这个对象用于存储要发送的数据。
    • 然后,使用 socket_->async_write_some 函数触发异步写操作,将数据写入套接字。在这里,你绑定了回调函数 AsyncWriteSomeCallBackErr
  • AsyncWriteSomeCallBackErr 函数是异步写操作完成后的回调函数。它的主要作用是处理写操作的结果,检查是否发生错误,以及是否需要继续发送剩余的数据。具体步骤如下:

    • 首先,检查 err 参数,如果其值不为 0,表示发送出现错误,就输出错误信息并返回。
    • 然后没有错误,检查已传输的字节数 bytes_transferred 加上 buffer_ 对象中已经发送的字节数 buffer_->GetCurLen() 是否小于总的数据长度 buffer_->GetTotalLen()。如果小于总长度,说明还有剩余数据需要发送。
    • 如果有剩余数据需要发送,就更新 buffer_ 对象中的已发送字节数 buffer_->SetCurLen(buffer_->GetCurLen() + bytes_transferred),然后继续触发异步写操作,将剩余的数据发送出去。这里再次调用 socket_->async_write_some 并绑定了相同的回调函数,以便在写操作完成后再次检查和处理。
    • 总体来说,这段代码实现了异步发送数据的逻辑,确保了数据的完整性和发送顺序。通过使用回调函数,可以在每次写操作完成后处理相应的逻辑,包括检查错误、更新已发送字节数以及触发下一次写操作。

AsyncWriteSomeToSocketErr函数里判断如果已经发送的字节数没有达到要发送的总字节数,那么久更新节点已经发送的长度,然后计算剩余要发送的长度,如果有数据未发送完,再次调用async_write_some函数异步发送。

但是这个函数并不能投入实际应用,因为async_write_some回调函数返回已发送的字节数可能并不是全部长度。比如TCP发送缓存区总大小为8字节,但是有3字节未发送(上一次未发送完),这样剩余空间为5字节。
在这里插入图片描述
此时我们调用async_write_some发送hello world!实际发送的长度就是为5,也就是只发送了hello,剩余world!通过我们的回调继续发送。

而实际开发的场景用户是不清楚底层tcp的多路复用调用情况的,用户想发送数据的时候就调用WriteToSocketErr,或者循环调用WriteToSocketErr,很可能在一次没发送完数据还未调用回调函数时再次调用WriteToSocketErr,因为boost::asio封装的时epoll和iocp等多路复用模型。当写事件就绪后就发数据,发送的数据按照async_write_some调用的顺序发送,所以回调函数内调用的async_write_some可能并没有被及时调用。

比如我们如下代码:

//用户发送数据
AsyncWriteSomeToSocketErr("Hello World!");
//用户无感知下层调用情况又一次发送了数据
AsyncWriteSomeToSocketErr("Hello World!");

那么很可能第一次只发送了Hello,后面的数据没发完,第二次发送了Hello World!之后又发送了World!

所以对端收到的数据很可能是HelloHello World! World!

3、异步写void AsyncWriteSomeToSocket(const std::string& buffer)

那怎么解决这个问题呢,我们可以通过队列保证应用层的发送顺序。我们在Session中定义一个发送队列,然后重新定义正确的异步发送函数和回调处理:

#include<iostream>
#include<boost/asio.hpp>
#include<memory>
#include<string>
#include<queue>
#include"buffer.h"

class Session {
public:
	Session(std::shared_ptr <boost::asio::ip::tcp::socket> socket);
	bool Connect(boost::asio::ip::tcp::endpoint& ep);

	//异步写  这个异步写存在问题
	void AsyncWriteSomeToSocketErr(const std::string& buffer);
	void AsyncWriteSomeCallBackErr(const boost::system::error_code& err, std::size_t bytes_transferred, std::shared_ptr<Buffer> buffer);
	
	void AsyncWriteSomeToSocket(const std::string& buffer);
	void AsyncWriteSomeCallBack(const boost::system::error_code& err, std::size_t bytes_transferred);
private:
	std::shared_ptr<boost::asio::ip::tcp::socket> socket_;
	std::shared_ptr<Buffer> send_buffer_;
	std::queue<std::shared_ptr<Buffer>> send_queue_;
	bool send_padding_;
};

#endif // !__ASYNC_DEMO_H_2023_8_22__

定义了bool变量send_padding_,该变量为true表示一个节点还未发送完,false代表发送完成。send_padding_ 用来缓存要发送的消息协议节点,是一个队列。

我们实现异步发送功能:

Session::Session(std::shared_ptr<boost::asio::ip::tcp::socket> socket)
	:socket_(socket)
	,send_padding_(false)
{
	send_buffer_ = nullptr;
	if (!send_queue_.empty()) {
		send_queue_.pop();
	}
}

函数实现:

void Session::AsyncWriteSomeToSocket(const std::string& buffer) {
	//发送节点插入队列
	send_queue_.emplace(new Buffer(buffer.c_str(), buffer.length()));

	//判断是否还有未发完的数据,false,表示没有,true表示还有
	if (send_padding_) {
		return;
	}

	//异步发送数据
	socket_->async_write_some(boost::asio::buffer(buffer.c_str(), buffer.length()), std::bind(&Session::AsyncWriteSomeCallBack, this
		, std::placeholders::_1, std::placeholders::_2));
	send_padding_ = true;
}

void Session::AsyncWriteSomeCallBack(const boost::system::error_code& err, size_t bytes_transferred) {
	if (err.value() != 0) {
		std::cout << "error occured!error code: " << err.value() << " .message: " << err.what() << std::endl;
		return;
	}
	//取出队列中队首元素
	std::shared_ptr<Buffer> send_data = send_queue_.front();
	send_data->SetCurLen(send_data->GetCurLen() + bytes_transferred);
	//数据未发送完,继承调用异步函数取出队首元素发送
	if (send_data->GetCurLen() < send_data->GetTotalLen()) {
		socket_->async_write_some(boost::asio::buffer(send_data->GetMsg() + send_data->GetCurLen(),
			send_data->GetTotalLen() - send_data->GetCurLen()),
			std::bind(&Session::AsyncWriteSomeCallBack, this, std::placeholders::_1, std::placeholders::_2));
		return;
	}

	//如果这个数据发送完了,把数据节点取出来
	send_queue_.pop();
	//判断队列里面是否还有下一个数据
	if (send_queue_.empty()) {
		send_padding_ = false;
		return;
	}
	//有数据则继续发送
	if (!send_queue_.empty()) {
		std::shared_ptr<Buffer> send_data_next = send_queue_.front();
		//异步发送的地址偏移
		socket_->async_write_some(boost::asio::buffer(send_data_next->GetMsg() + send_data_next->GetCurLen(),
			send_data_next->GetTotalLen() - send_data_next->GetCurLen()),
			std::bind(&Session::AsyncWriteSomeCallBack, this, std::placeholders::_1, std::placeholders::_2));
	}
}

这段代码的作用是实现异步发送数据并保证发送顺序的逻辑,主要包括两个函数:AsyncWriteSomeToSocketAsyncWriteSomeCallBack

  • AsyncWriteSomeToSocket 函数的作用是将数据放入发送队列中,并触发异步写操作。具体步骤如下:

    • 首先,将一个新的 Buffer 对象(用于存储要发送的数据)插入到 send_queue_ 队列中。
    • 接着,检查是否还有未发完的数据,如果有,说明还在等待前一次异步写操作完成,直接返回。
    • 如果没有未发完的数据,说明可以触发异步发送操作,使用 socket_->async_write_some 函数将数据写入套接字,并绑定回调函数 AsyncWriteSomeCallBack
  • AsyncWriteSomeCallBack 函数是异步写操作完成后的回调函数。其主要作用是处理写操作的结果,继续发送队列中的下一个数据。具体步骤如下:

    • 首先,检查 err 参数,如果其值不为 0,表示发送出现错误,就输出错误信息并返回。
    • 然后,取出队列中队首元素,该元素是一个 Buffer 对象,表示待发送的数据。
    • 接着,更新这个数据的已发送字节数 send_data->SetCurLen(send_data->GetCurLen() + bytes_transferred)
    • 然后,检查数据是否已经全部发送完,如果未发送完,则继续触发异步写操作,将剩余的数据发送出去。
    • 如果这个数据已经发送完毕,就从队列中移除这个数据节点,并检查队列是否还有下一个数据。
    • 如果队列不为空,表示还有数据需要发送,就取出下一个数据节点,更新已发送字节数,并触发下一个异步写操作,以便发送下一个数据。

这段代码的设计确保了数据的发送顺序,即使在异步发送的情况下也可以保持数据的完整性和顺序。如果发送错误,它也会正确地处理错误情况。

async_write_some函数不能保证每次回调函数触发时发送的长度为要总长度,这样我们每次都要在回调函数判断发送数据是否完成,asio提供了一个更简单的发送函数async_send,这个函数在发送的长度未达到我们要求的长度时就不会触发回调,所以触发回调函数时要么时发送出错了要么是发送完成了,其内部的实现原理就是帮我们不断的调用async_write_some直到完成发送,所以async_send不能和async_write_some混合使用,我们基于async_send封装另外一个发送函数。

4、异步写void AsyncSendToSocket(const std::string& buffer)

函数定义:

#include<iostream>
#include<boost/asio.hpp>
#include<memory>
#include<string>
#include<queue>
#include"buffer.h"

class Session {
public:
	Session(std::shared_ptr <boost::asio::ip::tcp::socket> socket);
	bool Connect(boost::asio::ip::tcp::endpoint& ep);

	//异步写  这个异步写存在问题
	void AsyncWriteSomeToSocketErr(const std::string& buffer);
	void AsyncWriteSomeCallBackErr(const boost::system::error_code& err, std::size_t bytes_transferred, std::shared_ptr<Buffer> buffer);
	
	void AsyncWriteSomeToSocket(const std::string& buffer);
	void AsyncWriteSomeCallBack(const boost::system::error_code& err, std::size_t bytes_transferred);

	//优先取这个
	void AsyncSendToSocket(const std::string& buffer);
	void AsyncSendCallBack(boost::system::error_code& err, std::size_t bytes_transferred);
private:
	std::shared_ptr<boost::asio::ip::tcp::socket> socket_;
	std::shared_ptr<Buffer> send_buffer_;
	std::queue<std::shared_ptr<Buffer>> send_queue_;
	bool send_padding_;
};

#endif // !__ASYNC_DEMO_H_2023_8_22__

函数实现:

void Session::AsyncSendToSocket(const std::string& buffer) {
	//把发送消息协议构造成节点插入队列
	send_queue_.emplace(new Buffer(buffer.c_str(), buffer.length()));

	//判断是否还有未发完数据
	if (send_padding_) {
		return;
	}

	//异步发送数据
	socket_->async_send(boost::asio::buffer(buffer.c_str(), buffer.length()),
		std::bind(&Session::AsyncSendCallBack, this, std::placeholders::_1, std::placeholders::_2));
	send_padding_ = true;
}

void Session::AsyncSendCallBack(boost::system::error_code& err, std::size_t bytes_transferred) {
	if (0 != err.value()) {
		//发送数据失败
		std::cout << "error occured!error code: " << err.value() << " .message: " << err.what() << std::endl;
		return;
	}

	//因为调用的是async_send()它的设计目标是简化发送数据的过程,\
	让用户不必关心数据的细节,只需提供要发送的数据和回调函数即可
	send_queue_.pop();
	if (send_queue_.empty()) {
		send_padding_ = false;
		return;
	}
	if (!send_queue_.empty()) {
		std::shared_ptr<Buffer> send_data_next = send_queue_.front();
		//异步发送发生地址偏移
		socket_->async_send(boost::asio::buffer(send_data_next->GetMsg() + send_data_next->GetCurLen(),
			send_data_next->GetTotalLen() - send_data_next->GetCurLen()),
			std::bind(&Session::AsyncSendCallBack, this,std::placeholders::_1, std::placeholders::_2));
	}
}

这段代码的目的是实现异步发送数据,并在发送完成后调用回调函数进行处理。这与你之前提到的代码逻辑类似,但使用了 async_send 函数代替了 async_write_some,并且没有需要手动维护已发送字节数。

  • 具体的逻辑如下:

    • AsyncSendToSocket 函数用于将数据包装成一个 Buffer 对象并插入发送队列 send_queue_ 中。

    • 接着,检查是否已经有数据正在等待发送(send_padding_ 是否为 true),如果是,则说明还在等待前一次异步发送完成,直接返回。

    • 如果没有等待发送的数据,就调用 socket_->async_send 函数进行异步发送。这个函数会将数据发送到套接字,并在发送完成后调用回调函数 AsyncSendCallBack

    • AsyncSendCallBack 回调函数中,首先检查错误码 err,如果不为 0,表示发送出现错误,输出错误信息并返回。

    • 如果发送成功,就从发送队列中弹出已发送的数据 (send_queue_.pop()),并检查队列是否为空。如果队列为空,说明没有待发送的数据,将 send_padding_ 设置为 false 表示没有数据需要发送。

    • 如果队列不为空,表示还有待发送的数据,就取出队列的头部元素,即下一个要发送的数据,然后调用 socket_->async_send 再次异步发送数据。这个过程会重复,直到队列中的数据全部发送完毕。

总体而言,这段代码实现了异步发送数据的功能,保证了发送的顺序,同时也能正确处理发送过程中的错误。不同之处在于,它使用了 async_send 函数,该函数封装了发送的细节,使得发送数据更加方便。

5、异步读void AsyncReadSomeToSocket(const std::string& buffer)

接下来介绍异步读操作,异步读操作和异步的写操作类似同样又async_read_someasync_receive函数,前者触发的回调函数获取的读数据的长度可能会小于要求读取的总长度,后者触发的回调函数读取的数据长度等于读取的总长度。

先基于async_read_some封装一个读取的函数AsyncReadSomeToSocket,同样在Session类的声明中添加一些变量:

函数定义:

#include<iostream>
#include<boost/asio.hpp>
#include<memory>
#include<string>
#include<queue>
#include"buffer.h"

class Session {
public:
	Session(std::shared_ptr <boost::asio::ip::tcp::socket> socket);
	bool Connect(boost::asio::ip::tcp::endpoint& ep);

	//异步写  这个异步写存在问题
	void AsyncWriteSomeToSocketErr(const std::string& buffer);
	void AsyncWriteSomeCallBackErr(const boost::system::error_code& err, std::size_t bytes_transferred, std::shared_ptr<Buffer> buffer);
	
	void AsyncWriteSomeToSocket(const std::string& buffer);
	void AsyncWriteSomeCallBack(const boost::system::error_code& err, std::size_t bytes_transferred);

	//优先取这个
	void AsyncSendToSocket(const std::string& buffer);
	void AsyncSendCallBack(boost::system::error_code& err, std::size_t bytes_transferred);

	//异步读 优先取这个
	void AsyncReadSomeToSocket(const std::string& buffer);
	void AsyncReadSomeCallBack(boost::system::error_code& err, std::size_t bytes_transferred);

private:
	std::shared_ptr<boost::asio::ip::tcp::socket> socket_;
	std::shared_ptr<Buffer> send_buffer_;
	std::queue<std::shared_ptr<Buffer>> send_queue_;
	std::shared_ptr<Buffer> recv_buffer_;
	bool send_padding_;
	bool recv_padding_;
};

#endif // !__ASYNC_DEMO_H_2023_8_22__

函数实现:

void Session::AsyncReadSomeToSocket(const std::string& buffer) {
	
	//判断是否正在读数据,这里第一次读数据
	if (recv_padding_) {
		return;
	}
	recv_buffer_ = std::make_shared<Buffer>(buffer.c_str(), buffer.length());
	
	//异步读取数据
	socket_->async_read_some(boost::asio::buffer(recv_buffer_->GetMsg() + recv_buffer_->GetCurLen(),
		recv_buffer_->GetTotalLen() - recv_buffer_->GetCurLen()),
		std::bind(&Session::AsyncReadSomeCallBack, this, std::placeholders::_1, std::placeholders::_2));
	recv_padding_ = true;
}

void Session::AsyncReadSomeCallBack(boost::system::error_code& err, std::size_t bytes_transferred) {
	if (0 != err.value()) {
		std::cout << "error occured!error code: " << err.value() << " .message: " << err.what() << std::endl;
		return;
	}
	//判断读取的字节数,没有读取完继续读取
	recv_buffer_->SetCurLen(recv_buffer_->GetCurLen() + bytes_transferred);
	if (recv_buffer_->GetCurLen() < recv_buffer_->GetTotalLen()) {
		socket_->async_read_some(boost::asio::buffer(recv_buffer_->GetMsg() + recv_buffer_->GetCurLen(),
			recv_buffer_->GetTotalLen() - recv_buffer_->GetCurLen()),
			std::bind(&Session::AsyncReadSomeCallBack, this, std::placeholders::_1, std::placeholders::_2));
		return;
	}
	//将数据投递到队列里交给逻辑线程处理,此处略去
	//如果读完了则将标记置为false
	recv_padding_ = false;
	recv_buffer_ = nullptr;
}

这段代码的主要功能是异步读取数据,并在读取完成后调用回调函数 AsyncReadSomeCallBack 处理数据。以下是代码逻辑的详细解释:

  • AsyncReadSomeToSocket 函数用于异步读取数据。在这个函数中,首先检查 recv_padding_ 是否为 true。如果为 true,表示正在读取数据,直接返回,避免重复读取。

  • 如果 recv_padding_false,说明可以开始读取数据。这时,创建一个 Buffer 对象 recv_buffer_,并初始化为要读取的数据。

  • 接着,调用 socket_->async_read_some 函数进行异步读取数据。这个函数会在读取完成后调用回调函数 AsyncReadSomeCallBack

  • AsyncReadSomeCallBack 回调函数中,首先检查错误码 err,如果不为 0,表示读取出现错误,输出错误信息并返回。

  • 如果读取成功,将已读取的字节数添加到 recv_buffer_ 的当前长度 CurLen 中。然后,检查是否已经读取完所有数据,即 CurLen 是否小于 TotalLen

  • 如果未读取完,继续调用 socket_->async_read_some 函数继续异步读取剩余的数据,直到读取完所有数据。

  • 如果读取完了,将 recv_padding_ 置为 false,表示没有正在读取的数据。最后,清空 recv_buffer_ 对象,以便下次读取新的数据。

这段代码实现了异步读取数据的逻辑,确保数据被正确读取并处理。如果数据没有完全读取,它会继续异步读取剩余的部分,直到读取完整个数据。如果有新的数据需要读取,可以再次调用 AsyncReadSomeToSocket

6、异步读void AsyncReceiveToSocket(const std::string& buffer)

我们基于async_receive再封装一个接收数据的函数:

函数声明:

#pragma once
#ifndef __ASYNC_DEMO_H_2023_8_22__
#define __ASYNC_DEMO_H_2023_8_22__
#include<iostream>
#include<boost/asio.hpp>
#include<memory>
#include<string>
#include<queue>
#include"buffer.h"

class Session {
public:
	Session(std::shared_ptr <boost::asio::ip::tcp::socket> socket);
	bool Connect(boost::asio::ip::tcp::endpoint& ep);

	//异步写  这个异步写存在问题
	void AsyncWriteSomeToSocketErr(const std::string& buffer);
	void AsyncWriteSomeCallBackErr(const boost::system::error_code& err, std::size_t bytes_transferred, std::shared_ptr<Buffer> buffer);

	void AsyncWriteSomeToSocket(const std::string& buffer);
	void AsyncWriteSomeCallBack(const boost::system::error_code& err, std::size_t bytes_transferred);

	//优先取这个
	void AsyncSendToSocket(const std::string& buffer);
	void AsyncSendCallBack(boost::system::error_code& err, std::size_t bytes_transferred);

	//异步读 优先取这个
	void AsyncReadSomeToSocket(const std::string& buffer);
	void AsyncReadSomeCallBack(boost::system::error_code& err, std::size_t bytes_transferred);

	void AsyncReceiveToSocket(const std::string& buffer);
	void AsyncReceiveCallBack(boost::system::error_code& err, std::size_t bytes_transferred);

private:
	std::shared_ptr<boost::asio::ip::tcp::socket> socket_;
	std::shared_ptr<Buffer> send_buffer_;
	std::queue<std::shared_ptr<Buffer>> send_queue_;
	std::shared_ptr<Buffer> recv_buffer_;
	bool send_padding_;
	bool recv_padding_;
};

#endif // !__ASYNC_DEMO_H_2023_8_22__

函数实现:

void Session::AsyncReceiveToSocket(const std::string& buffer) {
	//判断是否有数据正在读取
	if (recv_padding_) {
		return;
	}
	recv_buffer_ = std::make_shared<Buffer>(buffer.c_str(), buffer.length());
	socket_->async_receive(boost::asio::buffer(recv_buffer_->GetMsg() + recv_buffer_->GetCurLen(),
		recv_buffer_->GetTotalLen() - recv_buffer_->GetCurLen()),
		std::bind(&Session::AsyncReceiveCallBack, this, std::placeholders::_1, std::placeholders::_2));
	recv_padding_ = true;
}

void Session::AsyncReceiveCallBack(boost::system::error_code& err, std::size_t bytes_transferred) {
	if (0 != err.value()) {
		std::cout << "error occured!error code: " << err.value() << " .message: " << err.what() << std::endl;
		return;
	}
	recv_buffer_->SetCurLen(recv_buffer_->GetCurLen() + bytes_transferred);

	recv_padding_ = false;
	recv_buffer_ = nullptr;
}

这段代码看起来非常类似于前面提到的异步读取数据的代码。它实现了异步接收数据的逻辑,以下是代码的详细解释:

  • AsyncReceiveToSocket 函数用于异步接收数据。首先,它检查 recv_padding_ 是否为 true,如果为 true,表示已经有数据在读取,直接返回,以避免重复接收。

  • 如果 recv_padding_false,说明可以开始接收数据。这时,创建一个 Buffer 对象 recv_buffer_,并初始化为要接收的数据。

接着,调用 socket_->async_receive 函数进行异步接收数据。这个函数会在接收完成后调用回调函数 AsyncReceiveCallBack

AsyncReceiveCallBack 回调函数中,首先检查错误码 err,如果不为 0,表示接收出现错误,输出错误信息并返回。

如果接收成功,将已接收的字节数添加到 recv_buffer_ 的当前长度 CurLen 中。然后,将 recv_padding_ 置为 false,表示没有正在接收的数据。

最后,清空 recv_buffer_ 对象,以便下次接收新的数据。

这段代码实现了异步接收数据的逻辑,确保数据被正确接收并处理。如果数据没有完全接收,它会继续异步接收剩余的部分,直到接收完整个数据。如果有新的数据需要接收,可以再次调用 AsyncReceiveToSocket

同样async_read_someasync_receive不能混合使用,否则会出现逻辑问题。

7、总结

总体代码声明:

#pragma once
#ifndef __ASYNC_DEMO_H_2023_8_22__
#define __ASYNC_DEMO_H_2023_8_22__
#include<iostream>
#include<boost/asio.hpp>
#include<memory>
#include<string>
#include<queue>
#include"buffer.h"

class Session {
public:
	Session(std::shared_ptr <boost::asio::ip::tcp::socket> socket);
	bool Connect(boost::asio::ip::tcp::endpoint& ep);

	//异步写  这个异步写存在问题
	void AsyncWriteSomeToSocketErr(const std::string& buffer);
	void AsyncWriteSomeCallBackErr(const boost::system::error_code& err, std::size_t bytes_transferred, std::shared_ptr<Buffer> buffer);

	void AsyncWriteSomeToSocket(const std::string& buffer);
	void AsyncWriteSomeCallBack(const boost::system::error_code& err, std::size_t bytes_transferred);

	//优先取这个
	void AsyncSendToSocket(const std::string& buffer);
	void AsyncSendCallBack(boost::system::error_code& err, std::size_t bytes_transferred);

	//异步读 优先取这个
	void AsyncReadSomeToSocket(const std::string& buffer);
	void AsyncReadSomeCallBack(boost::system::error_code& err, std::size_t bytes_transferred);

	void AsyncReceiveToSocket(const std::string& buffer);
	void AsyncReceiveCallBack(boost::system::error_code& err, std::size_t bytes_transferred);

private:
	std::shared_ptr<boost::asio::ip::tcp::socket> socket_;
	std::shared_ptr<Buffer> send_buffer_;
	std::queue<std::shared_ptr<Buffer>> send_queue_;
	std::shared_ptr<Buffer> recv_buffer_;
	bool send_padding_;
	bool recv_padding_;
};

#endif // !__ASYNC_DEMO_H_2023_8_22__

总体代码定义:

#include"async_demo.h"

Session::Session(std::shared_ptr<boost::asio::ip::tcp::socket> socket)
	:socket_(socket)
	,send_padding_(false)
	,recv_padding_(false)
{
	send_buffer_ = nullptr;
	recv_buffer_ = nullptr;
	if (!send_queue_.empty()) {
		send_queue_.pop();
	}
}

bool Session::Connect(boost::asio::ip::tcp::endpoint& ep) {
	socket_->connect(ep);

	return true;
}

void Session::AsyncWriteSomeToSocketErr(const std::string& buffer) {
	//先构造一个发送节点
	send_buffer_ = std::make_shared<Buffer>(buffer.c_str(), buffer.length());
	//然后构造async_write_some的参数buffer和回调和函数
	socket_->async_write_some(boost::asio::buffer(buffer.c_str(), buffer.length()),
		//绑定成员函数的地址,类的对象,参数占位符1,参数占位符2
		std::bind(&Session::AsyncWriteSomeCallBackErr, this, std::placeholders::_1, std::placeholders::_2, send_buffer_));
}

//TCP缓冲区 收发端不对等 发11字节 TCP缓冲区只有5字节 那么要分两次发送,假设发送hello world ,第一次只发送hello,\
world未发送,那么如果用户再次调用WriteCallBackErr那么底层不保护发送顺序,那么可能收到的结果hello hello world world \
解决这种就是用一个队列把存储的数据存放到队列里面
void Session::AsyncWriteSomeCallBackErr(const boost::system::error_code& err, std::size_t bytes_transferred, std::shared_ptr<Buffer> buffer_) {
	if (err.value() != 0) {
		std::cout << "error occured!error code: " << err.value() << " . message: " << err.what() << std::endl;
		return;
	}
	if (bytes_transferred + buffer_->GetCurLen() < buffer_->GetTotalLen()) {
		//buffer_->GetCurLen() = buffer_->GetCurLen() + bytes_transferred;
		buffer_->SetCurLen(buffer_->GetCurLen() + bytes_transferred);
		socket_->async_write_some(boost::asio::buffer(buffer_->GetMsg() + buffer_->GetCurLen(), buffer_->GetTotalLen() - buffer_->GetCurLen()),
			std::bind(&Session::AsyncWriteSomeCallBackErr, this, std::placeholders::_1, std::placeholders::_2, buffer_));
	}
}

void Session::AsyncWriteSomeToSocket(const std::string& buffer) {
	//发送节点插入队列
	send_queue_.emplace(new Buffer(buffer.c_str(), buffer.length()));

	//判断是否还有未发完的数据,false,表示没有,true表示还有
	if (send_padding_) {
		return;
	}

	//异步发送数据
	socket_->async_write_some(boost::asio::buffer(buffer.c_str(), buffer.length()), std::bind(&Session::AsyncWriteSomeCallBack, this
		, std::placeholders::_1, std::placeholders::_2));
	send_padding_ = true;
}

void Session::AsyncWriteSomeCallBack(const boost::system::error_code& err, size_t bytes_transferred) {
	if (err.value() != 0) {
		std::cout << "error occured!error code: " << err.value() << " .message: " << err.what() << std::endl;
		return;
	}
	//取出队列中队首元素
	std::shared_ptr<Buffer> send_data = send_queue_.front();
	send_data->SetCurLen(send_data->GetCurLen() + bytes_transferred);
	//数据未发送完,继承调用异步函数取出队首元素发送
	if (send_data->GetCurLen() < send_data->GetTotalLen()) {
		socket_->async_write_some(boost::asio::buffer(send_data->GetMsg() + send_data->GetCurLen(),
			send_data->GetTotalLen() - send_data->GetCurLen()),
			std::bind(&Session::AsyncWriteSomeCallBack, this, std::placeholders::_1, std::placeholders::_2));
		return;
	}

	//如果这个数据发送完了,把数据节点取出来
	send_queue_.pop();
	//判断队列里面是否还有下一个数据
	if (send_queue_.empty()) {
		send_padding_ = false;
		return;
	}
	//有数据则继续发送
	if (!send_queue_.empty()) {
		std::shared_ptr<Buffer> send_data_next = send_queue_.front();
		//异步发送的地址偏移
		socket_->async_write_some(boost::asio::buffer(send_data_next->GetMsg() + send_data_next->GetCurLen(),
			send_data_next->GetTotalLen() - send_data_next->GetCurLen()),
			std::bind(&Session::AsyncWriteSomeCallBack, this, std::placeholders::_1, std::placeholders::_2));
	}
}

void Session::AsyncSendToSocket(const std::string& buffer) {
	//把发送消息协议构造成节点插入队列
	send_queue_.emplace(new Buffer(buffer.c_str(), buffer.length()));

	//判断是否还有未发完数据
	if (send_padding_) {
		return;
	}

	//异步发送数据
	socket_->async_send(boost::asio::buffer(buffer.c_str(), buffer.length()),
		std::bind(&Session::AsyncSendCallBack, this, std::placeholders::_1, std::placeholders::_2));
	send_padding_ = true;
}

void Session::AsyncSendCallBack(boost::system::error_code& err, std::size_t bytes_transferred) {
	if (0 != err.value()) {
		//发送数据失败
		std::cout << "error occured!error code: " << err.value() << " .message: " << err.what() << std::endl;
		return;
	}

	//因为调用的是async_send()它的设计目标是简化发送数据的过程,\
	让用户不必关心数据的细节,只需提供要发送的数据和回调函数即可
	send_queue_.pop();
	if (send_queue_.empty()) {
		send_padding_ = false;
		return;
	}
	if (!send_queue_.empty()) {
		std::shared_ptr<Buffer> send_data_next = send_queue_.front();
		//异步发送发生地址偏移
		socket_->async_send(boost::asio::buffer(send_data_next->GetMsg() + send_data_next->GetCurLen(),
			send_data_next->GetTotalLen() - send_data_next->GetCurLen()),
			std::bind(&Session::AsyncSendCallBack, this,std::placeholders::_1, std::placeholders::_2));
	}
}

void Session::AsyncReadSomeToSocket(const std::string& buffer) {
	
	//判断是否正在读数据,这里第一次读数据
	if (recv_padding_) {
		return;
	}
	recv_buffer_ = std::make_shared<Buffer>(buffer.c_str(), buffer.length());
	
	//异步读取数据
	socket_->async_read_some(boost::asio::buffer(recv_buffer_->GetMsg() + recv_buffer_->GetCurLen(),
		recv_buffer_->GetTotalLen() - recv_buffer_->GetCurLen()),
		std::bind(&Session::AsyncReadSomeCallBack, this, std::placeholders::_1, std::placeholders::_2));
	recv_padding_ = true;
}

void Session::AsyncReadSomeCallBack(boost::system::error_code& err, std::size_t bytes_transferred) {
	if (0 != err.value()) {
		std::cout << "error occured!error code: " << err.value() << " .message: " << err.what() << std::endl;
		return;
	}
	//判断读取的字节数,没有读取完继续读取
	recv_buffer_->SetCurLen(recv_buffer_->GetCurLen() + bytes_transferred);
	if (recv_buffer_->GetCurLen() < recv_buffer_->GetTotalLen()) {
		socket_->async_read_some(boost::asio::buffer(recv_buffer_->GetMsg() + recv_buffer_->GetCurLen(),
			recv_buffer_->GetTotalLen() - recv_buffer_->GetCurLen()),
			std::bind(&Session::AsyncReadSomeCallBack, this, std::placeholders::_1, std::placeholders::_2));
		return;
	}
	//将数据投递到队列里交给逻辑线程处理,此处略去
	//如果读完了则将标记置为false
	recv_padding_ = false;
	recv_buffer_ = nullptr;
}

void Session::AsyncReceiveToSocket(const std::string& buffer) {
	//判断是否有数据正在读取
	if (recv_padding_) {
		return;
	}
	recv_buffer_ = std::make_shared<Buffer>(buffer.c_str(), buffer.length());
	socket_->async_receive(boost::asio::buffer(recv_buffer_->GetMsg() + recv_buffer_->GetCurLen(),
		recv_buffer_->GetTotalLen() - recv_buffer_->GetCurLen()),
		std::bind(&Session::AsyncReceiveCallBack, this, std::placeholders::_1, std::placeholders::_2));
	recv_padding_ = true;
}

void Session::AsyncReceiveCallBack(boost::system::error_code& err, std::size_t bytes_transferred) {
	if (0 != err.value()) {
		std::cout << "error occured!error code: " << err.value() << " .message: " << err.what() << std::endl;
		return;
	}
	recv_buffer_->SetCurLen(recv_buffer_->GetCurLen() + bytes_transferred);

	recv_padding_ = false;
	recv_buffer_ = nullptr;
}

本文介绍了boost asio异步读写的操作,仅仅是代码片段和api的封装便于大家理解,下一篇利用这些异步api写一个异步的服务器展示收发效果。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/923050.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

Java 8 Stream 之 collect() 的奇技淫巧

来源&#xff1a;blog.csdn.net/qq_35387940/ article/details/127008965 前言 正文 第一个小玩法 第二个小玩法 前言 本身我是一个比较偏向少使用Stream的人&#xff0c;因为调试比较不方便。 但是, 不得不说&#xff0c;stream确实会给我们编码带来便捷。 所以还是忍…

python开发环境搭建

1、安装python 下载地址&#xff1a;https://www.python.org/downloads/windows/ 双击安装。 安装完验证&#xff1a; 2、安装IDE 下载地圵&#xff1a;https://www.jetbrains.com/zh-cn/pycharm/download/?sectionwindows 免费版本 3、安装依赖包 在项目的根目录下…

【RuoYi移动端】HBuild工具插件安装和系统配置manifest.json

一、点【工具】-【插件安装】安装如下工具 二、点【manifest.json】

搭建Tomcat HTTP服务:在Windows上实现外网远程访问的详细配置与设置教程

文章目录 前言1.本地Tomcat网页搭建1.1 Tomcat安装1.2 配置环境变量1.3 环境配置1.4 Tomcat运行测试1.5 Cpolar安装和注册 2.本地网页发布2.1.Cpolar云端设置2.2 Cpolar本地设置 3.公网访问测试4.结语 前言 Tomcat作为一个轻量级的服务器&#xff0c;不仅名字很有趣&#xff0…

数据库概况

数据的基本概念&#xff1a; ①数据 描述事物的符号记录&#xff0c;包括数字&#xff0c;文字&#xff0c;图形&#xff0c;图形&#xff0c;声音&#xff0c;档案记录等以“记录”形式按统一的格式进行存储。 ②表 将不同的记录组织在一起 用来存储具体数据 ③数据库 表的…

I/O多路复用 select 、poll

前言 套接字通信并发如果我们服务器端想实现并发&#xff0c;有两种处理方式&#xff0c;第一种是通过多进程的方式来处理并发&#xff0c;第二种是通过多线程的方式来处理服务器端的并发。 【问题】如果服务器端的程序只有一个线程&#xff0c;或者说只有一个进程&#xff0…

腾讯云新老用户优惠券免费领取方法分享

腾讯云优惠券是腾讯云的一种优惠方式&#xff0c;领券之后购买腾讯云相关产品可以享受优惠&#xff0c;下面给大家分享腾讯云新老用户优惠券免费领取方法&#xff0c;助力大家轻松上云&#xff01; 一、腾讯云优惠券领取方法 腾讯云新用户优惠券&#xff1a;点此领取 腾讯云老…

ERP系统解析:全面了解企业资源规划系统

在当今快节奏的商业环境中&#xff0c;有效的企业资源计划&#xff08;Enterprise Resource Planning&#xff0c;简称ERP&#xff09;系统对于组织的成功运营至关重要。ERP系统是一种集成管理软件&#xff0c;通过整合各个部门的信息和流程&#xff0c;实现资源的高效利用和运…

11.redis持久化

1.redis持久化 Redis的所有数据都是保存在内存中&#xff0c;因此redis重启后数据就丢失了&#xff0c;所以需要不定期的通过异步方式保存到磁盘上(这称为“半持久化模式”)&#xff1b;或者把每一次数据变化都写入到一个append only file(aof)里面(这称为“全持久化模式”)。 …

Exploring Lottery Prompts for Pre-trained Language Models

Exploring Lottery Prompts for Pre-trained Language Models 文章链接 清深的工作&#xff0c;比较有意思的一篇。作者先给出假设&#xff0c;对于分类问题&#xff0c;在有限的语料空间内总能找到一个prompt让这个问题分类正确&#xff0c;作者称之为lottery prompt。为此&…

Rust常用加密算法

哈希运算(以Sha256为例) main.rs: use crypto::digest::Digest;use crypto::sha2::Sha256;fn main() { let input "dashen"; let mut sha Sha256::new(); sha.input_str(input); println!("{}", sha.result_str());} Cargo.toml: [package]n…

Python中实例方法、类方法、静态方法的区别与作用

前言 嗨喽&#xff0c;大家好呀~这里是爱看美女的茜茜呐 Python中至少有三种比较常见的方法类型&#xff0c;即实例方法&#xff0c;类方法、静态方法。 它们是如何定义的呢&#xff1f;如何调用的呢&#xff1f;它们又有何区别和作用呢&#xff1f;且看下文。 首先&#xf…

pdf.js构建时,报Cannot read property ‘createChildCompiler‘ of undefined #177的解决方法

在本地和CI工具进行构建时&#xff0c;报如下错误。 Cannot read property createChildCompiler of undefined #177解决方法&#xff1a; 找到vue.config.js&#xff0c;在 module.exports {parallel: false, //新增的一行chainWebpack(config) {....config.module.rule(&…

MySQL数据库管理高级语句

数据表高级操作 复制表及内容 #复制表及内容create table copy1 like zh1 ; #复制格式&#xff0c;通过LIKE方法&#xff0c;复制zh1表结构生成copy1表 insert into copy1 select * from zh1; #备份内容 克隆表 克隆表&#xff0c;将数据表的数据记录生成到新的表中C…

5个能提高效率的在线设计工具,真的很好用!

随着设计工作的不断变化&#xff0c;能在线使用的设计工具就成了设计师更需要的设计工具&#xff0c;它体量小&#xff0c;使用方便&#xff0c;不尽能帮助设计师完成正常的设计工作&#xff0c;还可以给设计师带来舒适的使用体验&#xff0c;今天本文收集整理了5款好用的在线设…

免费照片转绘画风格软件-FotoSketcher

FotoSketcher一款免费照片转绘画风格软件&#xff0c;只需点击几下鼠标即可自动将照片转换为艺术作品。支持从铅笔素描到水彩画或油画、钢笔画、墨水画、抽象艺术和卡通画&#xff0c;有 20 多种不同的风格可供选择&#xff0c;工具还可以修改原始照片增强对比度、锐化、简化图…

hive问题总结

往往用了很久的函数却只知道其单一的应用场景&#xff0c;本文将不断完善所遇到的好用的hive内置函数。 1.聚合函数或者求最大最小值函数搭配开窗函数使用可以实现滑动窗口 例&#xff1a; SELECT event,time,session_id,COLLECT_LIST(event) OVER (PARTITION BY session_id …

日本核污水今日入海,这帮黑客怒了!

自2011年东日本大地震以来&#xff0c;日本谋划已久的福岛核电站核污水排海计划已于8月24日下午起正式施行&#xff0c;预计排污周期长达30年&#xff0c;整个海洋及其生物都有可能遭受难不可逆的毁灭性打击。 据现场媒体报道&#xff0c;经过17分钟的流淌&#xff0c;核污染水…

Arduino程序设计(四)按键消抖+按键计数

按键消抖按键计数 前言一、按键消抖二、按键计数1、示例代码2、按键计数实验 参考资料 前言 本文主要介绍两种按键控制LED实验&#xff1a;第一种是采用软件消抖的方法检测按键按下的效果&#xff1b;第二种是根据按键按下次数&#xff0c;四个LED灯呈现不同的流水灯效果。 一…

mysql的登录与退出

mysql是c/s架构&#xff0c;意味着同时要有客户端和服务端 1 找到客户端。mysql.exe的安装目录 打开命令行 2 输入对应的服务器的ip&#xff0c;如果是本地&#xff0c;就是Localhost&#xff0c;如果是远程服务器&#xff0c;那就输入对应ip/域名。并且指定mysql监听的端口 …