【C++boost::asio网络编程】有关异步读写api的笔记

news2025/1/15 13:21:25

异步读写api

  • 异步写操作
    • async_write_some
    • async_send
  • 异步读操作
    • async_read_some
    • async_receive

  定义一个Session类,主要是为了服务端专门为客户端服务创建的管理类

class Session {
public:
    Session(std::shared_ptr<asio::ip::tcp::socket> socket);
    void Connect(const asio::ip::tcp::endpoint& ep);
private:
    std::shared_ptr<asio::ip::tcp::socket> _socket;
};

异步写操作

  在介绍异步写之前,需要先封装一个Node结构,用来管理发送的数据

class MsgNode
{
	friend class Session;
public:
	MsgNode(const char* msg, int total_len)
		:_total_len(total_len)
		,_cur_len(0)
	{
		_msg = new char[total_len];
		memcpy(_msg, msg, total_len);
	}
	MsgNode(int total_len)
		:_total_len(total_len)
		,_cur_len(0)
	{
		_msg = new char[_total_len];
	}
	~MsgNode()
	{
		delete[] _msg;
	}
private:
	char* _msg;
	int _total_len;
	int _cur_len;
};

  其中,_msg表示要发送的数据,_cur_len表示已经发送的长度,而_total_len表示数据的总长度

async_write_some

在这里插入图片描述
  通过源码可以看出,async_write_some需要两个参数。第一个参数是buffer结构的数据,用来放需要发送的数据;第二个参数是一个回调函数,这个回调函数又有两个参数,一个是用来存放错误码的对象,另一个是无符号整数(这个无符号整数代表的就是当前具体发送数据的大小)
  当调用完async_write_some之后(即一次异步写操作结束之后),系统会调用这个回调函数。

void Session::WriteCallBackErr(const boost::system::error_code& ec, std::size_t bytes_transferred, std::shared_ptr<MsgNode> node)
{
	if (node->_cur_len + bytes_transferred <= node->_total_len)
	{
		node->_cur_len += bytes_transferred;
		this->_socket->async_write_some(boost::asio::buffer(node->_msg + node->_cur_len, node->_total_len - node->_cur_len),
			std::bind(&Session::WriteCallBackErr, this, std::placeholders::_1, std::placeholders::_2, _send_node));
	}
}

void Session::WriteToSocketErr(const std::string& buf)
{
	_send_node = std::make_shared<MsgNode>(buf.c_str(), buf.size());
	_socket->async_write_some(boost::asio::buffer(buf.c_str(), buf.size()),
		std::bind(&Session::WriteCallBackErr, this, std::placeholders::_1, std::placeholders::_2, _send_node));
}

  在以上代码中,先在WriteToSocketErr函数中创建一个消息结点,然后调用async_write_some将数据发送出去。当一次写操作结束之后。系统会将错误码和已写入数据的长度作为参数给回调函数。

if (node->_cur_len + bytes_transferred <= node->_total_len)

  在回调函数中判断是否已经将数据全部发送出去了,如果没有,则更新_cur_len,然后继续执行异步发送操作
  但是,以上代码逻辑中存在一个漏洞。在异步执行的逻辑中,代码调用的顺序是不确定的。
  举个例子,当需要连续两次发送hello world

//连续两次调用
WriteToSocketErr("HelloWorld");
WriteToSocketErr("HelloWorld");

  可能会发生第一次进行写入的时候只写入了Hello,这时按照逻辑需要执行回调函数,当在回调函数中发现数据并没有发送完全,于是再次调用async_write_some想继续写入World,但此时第二次调用WriteToSocketErr("HelloWorld");中,已经提前一步调用了async_write_some并将数据全部写完,然后才轮到第一次发送时的回调函数将剩下的World继续发完。这最终导致的结果时对方收到的数据为HelloHelloWorldWorld.
  为了确保发送顺序的问题,可以在Session类中定义一个队列用来管理需要发送的结点和i一个布尔类型变量用来表示当前是否有数据正在被发送(初始化为false)

class Session{
public:
	Session(std::shared_ptr<boost::asio::ip::tcp::socket> socket)
	:_socket(socket)
	,_send_pending(false)
	{}
    void WriteCallBack(const boost::system::error_code& ec, std::size_t bytes_transferred);
    void WriteToSocket(const std::string &buf);
private:
    std::queue<std::shared_ptr<MsgNode>> _send_queue;
    std::shared_ptr<asio::ip::tcp::socket> _socket;
    bool _send_pending;
};

  此时再对写操作进行改进

void Session::WriteCallBack(const boost::system::error_code& ec, std::size_t bytes_transferred)
{
	if (ec.value() != 0)
	{
		std::cout << "Error! Code is " << ec.value() << ".Message is " << ec.message() << std::endl;
		return;
	}
	std::shared_ptr<MsgNode>&node = _send_queue.front();
	node->_cur_len += bytes_transferred;
	if (node->_cur_len + bytes_transferred < node->_total_len)//还没有发送完
	{
		_socket->async_write_some(boost::asio::buffer(node->_msg + node->_cur_len, node->_total_len - bytes_transferred),
			std::bind(&WriteCallBack, this, std::placeholders::_1, std::placeholders::_2));
		return;
	}
	_send_queue.pop();
	if (_send_queue.empty())
	{
		_send_pending = false;
	}
	else
	{
		std::shared_ptr<MsgNode>& node = _send_queue.front();
		_socket->async_write_some(boost::asio::buffer(node->_msg, node->_total_len),
			std::bind(&Session::WriteCallBack, std::placeholders::_1, std::placeholders::_2));
	}
}

void Session::WriteToSocket(const std::string& buf)
{
	_send_queue.push(std::make_shared<MsgNode>(buf.c_str(), buf.size()));
	if (_send_pending)//当前有消息正在发
	{
		return;
	}
	_socket->async_write_some(boost::asio::buffer(buf.c_str(), buf.size()),
		std::bind(&Session::WriteCallBack, this, std::placeholders::_1, std::placeholders::_2));
	_send_pending = true;
}

  在WriteToSocket函数中,先不着急将数据立马发送出去,而是将数据节点放入到发送队列中,然后判断当前是否有数据正在发送,如果有就返回避免冲突;没有就直接调用async_write_some,在回调函数中,永远都是取出队首的结点进行发送,如果判断队首的元素数据已经发送完了就pop掉,并且检查队列中是否还有需要发送的元素:如果有,继续执行发送逻辑;如果没有就将_send_pending置为false表示当前已经没有数据正在发送了。

async_send

  async_send的作用是直接将所有数据全部发送完,代码逻辑也比async_write_some要简单一些

void Session::WriteAllToSocket(const std::string& buf)
{
	_send_queue.push(std::make_shared<MsgNode>(buf.c_str(), buf.size()));
	if (_send_pending)
	{
		return;
	}
	_socket->async_send(boost::asio::buffer(buf.c_str(), buf.size()),
		std::bind(&Session::WriteAllCallBck, this, std::placeholders::_1, std::placeholders::_2));
	_send_pending = true;
}

void Session::WriteAllCallBck(const boost::system::error_code& ec, std::size_t bytes_tranferred)
{
	if (ec.value() != 0)
	{
		std::cout << "Error! Code is " << ec.value() << ".Message is " << ec.message() << std::endl;
		return;
	}
	_send_queue.pop();
	if (_send_queue.empty())
	{
		_send_pending = false;
	}
	else
	{
		std::shared_ptr<MsgNode>& node = _send_queue.front();
		_socket->async_send(boost::asio::buffer(node->_msg, node->_total_len),
			std::bind(&Session::WriteAllCallBck, this, std::placeholders::_1, std::placeholders::_2));
	}
}

注意
  async_sendasync_write_some不要放在一起使用,因为async_send底层还是多次调用的async_write_some。如果一起使用,还是会引发数据冲突的问题

异步读操作

  为了准备读操作,需要在Session类中添加数据结点_recv_node和一个布尔变量_recv_pending

class Session
{
public:
	Session(std::shared_ptr<boost::asio::ip::tcp::socket> socket)
		:_socket(socket)
		,_send_pending(false)
		,_recv_pending(false)
	{}
	void Connect(boost::asio::ip::tcp::endpoint& ep);
	void WriteCallBackErr(const boost::system::error_code& ec, std::size_t bytes_transferred, std::shared_ptr<MsgNode>);
	void WriteToSocketErr(const std::string& buf);

	void WriteCallBack(const boost::system::error_code& ec, std::size_t bytes_transferred);
	void WriteToSocket(const std::string& buf);

	void WriteAllToSocket(const std::string& buf);
	void WriteAllCallBck(const boost::system::error_code& ec, std::size_t bytes_tranferred);

	void ReadFromSocket();
	void ReadCallBack(const boost::system::error_code& ec, std::size_t bytes_transferred);

	void ReadAllFromSocket();
	void ReadAllCallBack(const boost::system::error_code& ec, std::size_t bytes_transferred);

private:
	std::shared_ptr<boost::asio::ip::tcp::socket> _socket;
	std::shared_ptr<MsgNode> _send_node;
	std::queue<std::shared_ptr<MsgNode>> _send_queue;
	std::shared_ptr<MsgNode> _recv_node;
	bool _recv_pending;
	bool _send_pending;
};

  由于接收的数据在TCP缓冲区里面已经是排好序了的,所以并不需要队列来维护顺序

async_read_some

其实异步读和异步写的逻辑类似,这里就不多介绍了

void Session::ReadFromSocket()
{
	if (_recv_pending)
	{
		return;
	}
	_recv_node = std::make_shared<MsgNode>(RECVSIZE);
	_socket->async_read_some(boost::asio::buffer(_recv_node->_msg, _recv_node->_total_len),
		std::bind(&Session::ReadCallBack, this, std::placeholders::_1, std::placeholders::_2));
	_recv_pending = true;
}
void Session::ReadCallBack(const boost::system::error_code& ec, std::size_t bytes_transferred)
{
	if (ec.value() != 0)
	{
		std::cout << "Error! Code is " << ec.value() << ".Message is " << ec.message() << std::endl;
		return;
	}
	if (_recv_node->_cur_len + bytes_transferred < _recv_node->_total_len)
	{
		_recv_node->_cur_len += bytes_transferred;
		_socket->async_read_some(boost::asio::buffer(_recv_node->_msg + _recv_node->_cur_len, _recv_node->_total_len - _recv_node->_cur_len),
			std::bind(&Session::ReadCallBack, this, std::placeholders::_1, std::placeholders::_2));
		return;
	}
	_recv_pending = false;
}

async_receive

void Session::ReadCallBack(const boost::system::error_code& ec, std::size_t bytes_transferred)
{
	if (ec.value() != 0)
	{
		std::cout << "Error! Code is " << ec.value() << ".Message is " << ec.message() << std::endl;
		return;
	}
	if (_recv_node->_cur_len + bytes_transferred < _recv_node->_total_len)
	{
		_recv_node->_cur_len += bytes_transferred;
		_socket->async_read_some(boost::asio::buffer(_recv_node->_msg + _recv_node->_cur_len, _recv_node->_total_len - _recv_node->_cur_len),
			std::bind(&Session::ReadCallBack, this, std::placeholders::_1, std::placeholders::_2));
		return;
	}
	_recv_pending = false;
}

void Session::ReadAllFromSocket()
{
	if (_recv_pending)
	{
		return;
	}
	_recv_node = std::make_shared<MsgNode>(RECVSIZE);
	_socket->async_receive(boost::asio::buffer(_recv_node->_msg, _recv_node->_total_len),
		std::bind(&Session::ReadAllCallBack, this, std::placeholders::_1, std::placeholders::_2));
	_recv_pending = true;
}
void Session::ReadAllCallBack(const boost::system::error_code& ec, std::size_t bytes_transferred)
{
	if (ec.value() != 0)
	{
		std::cout << "Error! Code is " << ec.value() << ".Message is " << ec.message() << std::endl;
		return;
	}
	_recv_pending = false;
}

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/2252663.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

atcoder abc 382 lazy_tag线段树

A Daily Cookie 代码&#xff1a; #include <bits/stdc.h> using namespace std;typedef long long ll;int main() {int n, d;cin >> n >> d;string s;cin >> s;int cnt d;for(auto t: s) if(t .) cnt ;cout << min(n, cnt); } B Daily Co…

【NLP 8、normalization、sigmoid,softmax归一化函数】

"燃尽最后的本能&#xff0c;意志力会带你杀出重围" —— 24.12.2 1. Normalization&#xff08;归一化&#xff09; 归一化是将数据转换为具有统一尺度的形式&#xff0c;通常用于数据预处理阶段。常见的归一化方法包括 Min-Max归一化、Z-Score 归一化和 L…

深入学习指针(5)!!!!!!!!!!!!!!!

文章目录 1.回调函数是什么&#xff1f;2.qsort使用举例2.1使用qsort函数排序整形数据2.2使用sqort排序结构数据 3.qsort函数的模拟实现 1.回调函数是什么&#xff1f; 回调函数就是⼀个通过函数指针调⽤的函数。 如果你把函数的指针&#xff08;地址&#xff09;作为参数传递…

Matlab Simulink 电力电子仿真-单相电压型半桥逆变电路分析

目录 一、单相电压型半桥逆变电路仿真模型 1.电路模型 2.电路模型参数 二、仿真分析 三、总结 1.优缺点 2.应用场景 一、单相电压型半桥逆变电路仿真模型 1.电路模型 单相电压型半桥逆变电路是一种常见的逆变电路&#xff0c;主要用于将直流电源转换为交流电源。 &…

《Vue零基础入门教程》第十五课:样式绑定

往期内容 《Vue零基础入门教程》第六课&#xff1a;基本选项 《Vue零基础入门教程》第八课&#xff1a;模板语法 《Vue零基础入门教程》第九课&#xff1a;插值语法细节 《Vue零基础入门教程》第十课&#xff1a;属性绑定指令 《Vue零基础入门教程》第十一课&#xff1a;事…

做异端中的异端 -- Emacs裸奔之路5: 条件反射式移动

移动命令使用频率非常之高&#xff0c;只要方法多一个小小的弯路&#xff0c;对使用体验影响都很大。 克服移动上的难度&#xff0c;离掌握Emacs就不远了。 在不安装其它包的情况下&#xff0c;Emacs就可以&#xff1a; 以行为单位移动&#xff1a; C-n/C-p以段落为单位移动&…

基于单片机的WIFI、语音、储存、时钟、闹钟、定位系统

所有仿真详情导航&#xff1a; PROTEUS专栏说明-CSDN博客 目录 一、主要功能 二、硬件资源 三、程序编程 四、实现现象 一、主要功能 基于51单片机&#xff0c;采用DS1302时钟模块&#xff0c;通过LCD1602显示实时时间&#xff0c;也可以储存时间在AT2DC02中&#xff0c…

【阅读记录-章节4】Build a Large Language Model (From Scratch)

文章目录 4. Implementing a GPT model from scratch to generate text4.1 Coding an LLM architecture4.1.1 配置小型 GPT-2 模型4.1.2 DummyGPTModel代码示例4.1.3 准备输入数据并初始化 GPT 模型4.1.4 初始化并运行 GPT 模型 4.2 Normalizing activations with layer normal…

[创业之路-158]:《BLM战略规划》BLM业务业务领先模型与从战略到执行-模型(DSTE)

目录 一、BLM业务业务领先模型 - 整体框架 1.1 战略制定 1.2 战略执行 二、战略领导力&#xff1a;洞察&#xff08;看的远、看得深&#xff09;、决断&#xff08;看得清&#xff09;、执行力&#xff08;做得快、做得好&#xff09; 2.1 基本框架&#xff1a;战略制定、…

配置 Android Studio cursor/vscode 环境(切换 Flutter 版本)

系统环境变量 Path Android Studio 配置 Java 系统变量 Path PS&#xff1a;

Qt几何数据类型:QPoint类型详解(基础向)

目录 QPoint类 QPoint的构造 QPoint公有方法 isNull() rx() ry() setX() setY() toCGPoint() toPointF() transposed() x() y() manhattanLength() 各类重载运算符 QPoint的静态方法 dotproduct() QPoint类 在 Qt 框架中&#xff0c;QPoint 是一个简单且常用的类&…

外卖开发(三)开发笔记——AOP实现实现公共字段填充、主键回显、抛异常和事务管理

外卖开发&#xff08;三&#xff09;开发笔记 一、AOP实现实现公共字段填充&#xff08;减少重复工作&#xff09;实现思路自定义注解AutoFill自定义切面AutoFillAspect在Mapper接口上添加AutoFill注解 二、主键回显情况三、抛异常 和 事务管理 一、AOP实现实现公共字段填充&am…

Flutter 1.2:flutter配置gradle环境

1、在android的模块中进行gradle环境配置 ①在 gradle-wrapper.properties文件中将url配置为阿里云镜像&#xff0c;因为gradle的服务器在国外&#xff0c;国内下载非常慢&#xff0c;也可在官网进行下载 gradle版本下载 gradle版本匹配 阿里云镜像gradle下载 可以通过复制链…

神经网络入门实战:(九)分类问题 → 神经网络模型搭建模版和训练四步曲

(一) 神经网络模型搭建官方文档 每一层基本都有权重和偏置&#xff0c;可以仔细看官方文档。 pytorch 官网的库&#xff1a;torch.nn — PyTorch 2.5 documentation Containers库&#xff1a;用来搭建神经网络框架&#xff08;包含所有的神经网络的框架&#xff09;&#xff1b…

达梦数据库文件故障的恢复方法

目录 1、概述 1.1 概述 1.2 环境介绍 2、使用备份集的恢复方法 2.1 实验准备 2.2 误删除“用户表空间数据文件” 2.3 误删除SYSTEM.DBF 2.4 误删除ROLL.DBF 2.5 REDO日志文件 3、无备份集的恢复方法 3.1 误删除“表空间数据文件” 3.2误删除控制文件 3.3 误删除RO…

uniapp进阶技巧:如何优雅地封装request实例

在uniapp开发过程中&#xff0c;合理封装网络请求是提高代码质量和开发效率的关键。本文将介绍一种更为优雅的封装方式&#xff0c;通过创建一个request实例来管理不同类型的HTTP请求。 一、准备工作 在开始封装之前&#xff0c;请确保你的项目中已经安装了uniapp开发环境&…

45 基于单片机的信号选择与温度变化

目录 一、主要功能 二、硬件资源 三、程序编程 四、实现现象 一、主要功能 基于51单片机&#xff0c;采用DS18B20检测温度&#xff0c;通过三种LED灯代表不同状态。 采用DAC0832显示信号脉冲&#xff0c;通过8位数码管显示温度。 信号脉冲可以根据两个按键分别调整为正弦…

yagmail邮件发送库:如何用Python实现自动化邮件营销?

&#x1f3a5; 作者简介&#xff1a; CSDN\阿里云\腾讯云\华为云开发社区优质创作者&#xff0c;专注分享大数据、Python、数据库、人工智能等领域的优质内容 &#x1f338;个人主页&#xff1a; 长风清留杨的博客 &#x1f343;形式准则&#xff1a; 无论成就大小&#xff0c;…

付费版-多媒体云转码视频处理工具-付费系统完整源码搭建-先到先得-本文带完整搭建步骤-优雅草央千澈

付费版-多媒体云转码视频处理工具-付费系统完整源码搭建-先到先得-本文带完整搭建步骤-优雅草央千澈 环境 linuxnginxphp7.1mysql5.6 安装步骤 以下适用于宝塔已经安装好的情况 1.上传源码到你的网站目录 2.访问你的域名&#xff0c;按操作提示进行安装配置&#xff08;如…

java基础概念46-数据结构1

一、引入 List集合的三种实现类使用了不同的数据结构&#xff01; 二、数据结构的定义 三、常见的数据结构 3-1、栈 特点&#xff1a;先进后出&#xff0c;后进先出。 java内存容器&#xff1a; 3-2、队列 特点&#xff1a;先进先出、后进后出。 栈VS队列-小结 3-3、数组 3-…