boost asio异步服务器(4)处理粘包

news2024/10/7 11:40:21

粘包的产生

当客户端发送多个数据包给服务器时,服务器底层的tcp接收缓冲区收到的数据为粘连在一起的。这种情况的产生通常是服务器端处理数据的速率不如客户端的发送速率的情况。比如:客户端1s内连续发送了两个hello world!,服务器过了2s才接收数据,那一次性读出两个hello world!

tcp底层的安全和效率机制不允许字节数特别少的小包发送频率过高,tcp会在底层累计数据长度到一定大小才一起发送,比如连续发送1字节的数据要累计到多个字节才发送。

粘包处理

处理粘包的方式主要采用应用层定义收发包格式的方式,这个过程俗称切包处理,常用的协议被称为tlv协议(消息id+消息长度+消息内容)。

tlv

TLV(Type-Length-Value)是一种通信协议,用于在通信中传输结构化数据。它将数据分为三个部分:类型(Type)、长度(Length)和值(Value),每个部分都以固定的格式进行编码和解码。

但是我下边的格式并不是标准的tlv格式,而是采用的lv模式,即只包含length和value。

完善消息节点

class MsgNode {
public:
    //这里的构造方法主要方便后续调用Send接口构造消息节点
	MsgNode(char* msg, short data_len) : total_len(data_len + HEAD_LENGTH), cur_len(0) {
		_data = new char[total_len + 1];
		memcpy(_data, &data_len, HEAD_LENGTH);
		memcpy(_data + HEAD_LENGTH, msg, data_len);
		_data[total_len] = '\0';
	}
    //这里的构造方法则是用于在进行切包过程中构造处理数据的节点
	MsgNode(short data_len) :total_len(data_len), cur_len(0) {
		_data = new char[total_len + 1];
	}
    //Clear方法是用于清理节点的数据,避免多次构造析构节点
	void Clear() {
		memset(_data, 0, total_len);
		cur_len = 0;
	}
	~MsgNode() {
		delete[] _data;
	}
private:
	friend class Session;
	//表示已经处理的数据长度
	int cur_len;
	//表示处理数据的总长度
	int total_len;
	//表示数据的首地址
	char* _data;
};

完善两个构造函数和添加Clear函数

1、第一个构造方法主要方便后续调用Send接口构造消息节点
2、第二个构造方法则是用于在进行切包过程中构造处理数据的节点
3、Clear方法是用于清理节点的数据,避免多次构造析构节点

session类完善

_recv_msg_node用于存放收到数据包中的数据

_b_head_parse表示头部是否解析完成

_recv_head_node用于存放接收到数据包中的头部信息

完善hand_read回调函数

void Session::handle_read(const boost::system::error_code& ec, size_t bytes_transferred,
	std::shared_ptr<Session> self_shared) {
	if (ec) {
		std::cout << "read error, error code: " << ec.value() <<
			" read message: " << ec.message() << std::endl;
		Close();
		server_->ClearSession(uuid);
	}
	else {
		PrintRecvData(data_, bytes_transferred);
		std::chrono::milliseconds dura(2000);
		std::this_thread::sleep_for(dura);
		
		//已经移动的字节数
		int copy_len = 0;
		while (bytes_transferred) {
			//头部尚未解析完成
			if (!_b_head_parse) {
				//收到的数据不足头部大小,这种情况很少发生
				if (bytes_transferred + _recv_head_node->cur_len < HEAD_LENGTH) {
					memcpy(_recv_head_node->_data + _recv_head_node->cur_len, data_ + copy_len, bytes_transferred);
					_recv_head_node->cur_len += bytes_transferred;
					memset(data_, 0, MAX_LENGTH);
					sock_.async_read_some(boost::asio::buffer(data_, MAX_LENGTH),
						std::bind(&Session::handle_read, this,
							std::placeholders::_1, std::placeholders::_2, self_shared));
					return;
				}

				//走到这里,说明收到的数据大于头部,可能是一个粘连的数据包,但是首先需要将头部节点两字节读完

				//处理头部剩余未复制的长度
				int head_remain = HEAD_LENGTH - _recv_head_node->cur_len;
				if (head_remain) {
					memcpy(_recv_head_node->_data + _recv_head_node->cur_len, data_ + copy_len, head_remain);
					//更新已处理的数据
					copy_len += head_remain;
					/*
					* 这里不能更新头部节点的cur_len。
					* 因为
					* 1、当一次进来cur_len等于0,处理之后的偏移量copy_len就为2
					* 2、当头部未读取完成,后续读取会修正为正确的偏移量(但是种情况很少发生)
					* 3、之后的读取头部信息都会发生覆盖
					*/
					//_recv_head_node->cur_len += head_remain;
					bytes_transferred -= head_remain;
				}

				//获取头部数据
				short data_len = 0;
				memcpy(&data_len, _recv_head_node->_data, HEAD_LENGTH);
				std::cout << "data_len is " << data_len << std::endl;

				if (data_len > MAX_LENGTH) {
					std::cout << "invalid data length is " << data_len << std::endl;
					server_->ClearSession(uuid);
					return;
				}

				//头部节点处理完成,就可以开始处理数据域的数据节点
				_recv_msg_node = std::make_shared<MsgNode>(data_len);

				//消息长度小于头部规定长度,说明数据未收全,则先将消息放到接收节点中
				if (bytes_transferred < data_len) {
					memcpy(_recv_msg_node->_data + _recv_msg_node->cur_len, data_ + copy_len, bytes_transferred);
					_recv_msg_node->cur_len += bytes_transferred;
					memset(data_, 0, MAX_LENGTH);
					sock_.async_read_some(boost::asio::buffer(data_, MAX_LENGTH),
						std::bind(&Session::handle_read, this,
							std::placeholders::_1, std::placeholders::_2, self_shared));

					//表示头部处理完成,当下次进来的时候,就会直接跳过头部处理环节
					_b_head_parse = true;
					return;
				}

				//走到这里表示消息长度大于头部规定长度,这里可能是一个完整包,也可能是多个粘连的包
				memcpy(_recv_msg_node->_data + _recv_msg_node->cur_len, data_ + copy_len, data_len);
				_recv_msg_node->cur_len += data_len;
				copy_len += data_len;
				bytes_transferred -= data_len;
				_recv_msg_node->_data[_recv_msg_node->total_len] = '\0';
				std::cout << "receive data is: " << _recv_msg_node->_data << std::endl;

				//调用send发送给客户端
				Send(_recv_msg_node->_data, _recv_msg_node->total_len);

				//继续轮询处理下个未处理的数据,重置数据包和头部解析的情况
				_b_head_parse = false;
				_recv_msg_node->Clear();
				//说明这不是一个多个粘连的数据包
				if (bytes_transferred <= 0) {
					memset(data_, 0, MAX_LENGTH);
					sock_.async_read_some(boost::asio::buffer(data_, MAX_LENGTH),
						std::bind(&Session::handle_read, this,
							std::placeholders::_1, std::placeholders::_2, self_shared));
					return;
				}
				//走到这里说明这就是一个多个粘连的数据包
				continue;
			}

			//走到这里就说明头部是已经解析完成的,是处理数据未收全的情况
			int remain_msg = _recv_msg_node->total_len - _recv_msg_node->cur_len;
			//说明收到的数据仍然不足头部规定大小的情况
			if (bytes_transferred < remain_msg) {
				memcpy(_recv_msg_node->_data + _recv_msg_node->cur_len, data_ + copy_len, bytes_transferred);
				_recv_msg_node->cur_len += bytes_transferred;
				memset(data_, 0, MAX_LENGTH);
				sock_.async_read_some(boost::asio::buffer(data_, MAX_LENGTH),
					std::bind(&Session::handle_read, this,
						std::placeholders::_1, std::placeholders::_2, self_shared));
				return;
			}

			//走到这里说明收到的数据是大于等于头部规定大小的,接收到的数据可能是个完整的数据包,也可能多个粘连的数据包
			memcpy(_recv_msg_node->_data + _recv_msg_node->cur_len, data_ + copy_len, remain_msg);
			_recv_msg_node->cur_len += remain_msg;
			bytes_transferred -= remain_msg;
			copy_len += remain_msg;
			_recv_msg_node->_data[_recv_msg_node->total_len] = '\0';
			std::cout << "receive data is: " << _recv_msg_node->_data << std::endl;

			//处理完当前数据包的分割后,调用send接口向客户端发送回去
			Send(_recv_msg_node->_data, _recv_msg_node->total_len);

			//继续轮询处理下个数据包,重置接收数据节点和头部解析情况
			_b_head_parse = false;
			_recv_msg_node->Clear();
			//说明数据包并不是粘连的
			if (bytes_transferred <= 0) {
				memset(data_, 0, MAX_LENGTH);
				sock_.async_read_some(boost::asio::buffer(data_, MAX_LENGTH),
					std::bind(&Session::handle_read, this,
						std::placeholders::_1, std::placeholders::_2, self_shared));
				return;
			}
			//走到这里说明数据包是粘连的
			continue;	
		}
	}
}

这里hand_read函数的完善逻辑代码比较长,其中的注释给的比较详细,需要各位仔细读。但是逻辑可能头一两次读可能还是会有些蒙,多读几遍可能就会好得多。

这里还是得必要得说一下,我们都知道异步读写函数得回调函数中的参数bytes_transferred表示已经读取到的字节数,但是我们在这里还是需要对这些已经读到的数据进行处理。其中定义copy_len表示已经处理的字节数,bytes_transferred则表示为还未处理的数据(尽管已经被读取到了,但是还是尚未被处理,需要好好理解下)。

这里在session类中还定义了两个宏,MAX_LENGTH表示数据包的最大长度,就是1024*2字节。HEAD_LENGTH表示头部长度,就是2字节。

这里我也画了一个逻辑图供大家梳理这里的代码逻辑,希望能对大家理解有帮助。

粘包现象的测试

在session类中写一个打印函数,在每次触发读事件回调的时候调用下这个函数。这里打印的是tcp缓冲区的数据,boost asio从tcp已经是已经做了将tcp缓冲区的数据拿出来的,所以这里打印即可。

为了制造粘包现象,我们可以让服务器端隔2s处理一次读写,而客户端则不停的发送和读取就能制造出粘包现象了。下边是提供的客户端的代码。

#include <iostream>
#include <boost/asio.hpp>
#include <thread>
using namespace std;
using namespace boost::asio::ip;
const int MAX_LENGTH = 1024 * 2;
const int HEAD_LENGTH = 2;
int main()
{
	//测试粘包现象客户端
	try {
		//创建上下文服务
		boost::asio::io_context   ioc;
		//构造endpoint
		tcp::endpoint  remote_ep(address::from_string("127.0.0.1"), 1234);
		tcp::socket  sock(ioc);
		boost::system::error_code   error = boost::asio::error::host_not_found;
		sock.connect(remote_ep, error);
		if (error) {
			cout << "connect failed, code is " << error.value() << " error msg is " << error.message();
			return 0;
		}

		thread send_thread([&sock] {
			for (;;) {
				this_thread::sleep_for(std::chrono::milliseconds(2));
				const char* request = "hello world!";
				size_t request_length = strlen(request);
				char send_data[MAX_LENGTH] = { 0 };
				memcpy(send_data, &request_length, 2);
				memcpy(send_data + 2, request, request_length);
				boost::asio::write(sock, boost::asio::buffer(send_data, request_length + 2));
			}
			});

		thread recv_thread([&sock] {
			for (;;) {
				this_thread::sleep_for(std::chrono::milliseconds(2));
				cout << "begin to receive..." << endl;
				char reply_head[HEAD_LENGTH];
				size_t reply_length = boost::asio::read(sock, boost::asio::buffer(reply_head, HEAD_LENGTH));
				short msglen = 0;
				memcpy(&msglen, reply_head, HEAD_LENGTH);
				char msg[MAX_LENGTH] = { 0 };
				size_t  msg_length = boost::asio::read(sock, boost::asio::buffer(msg, msglen));

				std::cout << "Reply is: ";
				std::cout.write(msg, msglen) << endl;
				std::cout << "Reply len is " << msglen;
				std::cout << "\n";
			}
			});

		send_thread.join();
		recv_thread.join();
	}
	catch (std::exception& e) {
		std::cerr << "Exception: " << e.what() << endl;
	}
	return 0;
}

现象如下图,测试环境Windows visual studio 

完整服务端代码:codes-C++: C++学习 - Gitee.com

这里的echo服务器实现了粘包的处理,但是在不同的平台下仍存在收发数据异常的问题,其根本原因就是平台大小端的差异。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/1876022.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

【scau大数据原理】期末复习——堂测题

一、集群安装知识 启动集群的命令start-all.sh位于 Hadoop安装目录的sbin文件夹 目录下。 bin文件夹下包含常见的Hadoop,yarn命令&#xff1b;sbin命令下包含集群的启动、停止命令。 启动集群的命令start-all.sh包含 同时启动start-dfs.sh和start-yarn.sh 功能。…

大模型ReAct:思考与工具协同完成复杂任务推理

ReAct: Synergizing Reasoning and Acting in Language Models Github&#xff1a;https://github.com/ysymyth/ReAct 一、动机 人类的认知通常具备一定的自我调节&#xff08;self-regulation&#xff09;和策略制定&#xff08;strategization&#xff09;的能力&#xff0…

ONLYOFFICE8.1版本桌面编辑器——功能测评

一、编辑DOCX 相信大家都有写word文档的经历&#xff0c;不知道大家是不是跟我一样&#xff0c;感觉做一个word不难&#xff0c;但想做好一个word却很麻烦&#xff0c;功能太多&#xff0c;看的人眼花缭乱&#xff0c;有时候一个功能要找很久&#xff0c;甚至有的功能用一辈子都…

mybatis核心配置介绍

mybatis核心配置 【mybatis全局配置介绍】 ​ mybatis-config.xml&#xff0c;是MyBatis的全局配置文件&#xff0c;包含全局配置信息&#xff0c;如数据库连接参数、插件等。整个框架中只需要一个即可。 1、mybatis全局配置文件是mybatis框架的核心配置&#xff0c;整个框架…

Unity | Shader基础知识(第十五集:透明效果)

目录 一、前言 二、素材准备 三、准备基础代码 四、准备基础场景 五、SurfaceOutput结构体 六、透明度 七、渲染顺序 八、选错的后果 九、Tags之渲染顺序 十、Cull&#xff08;正面和反面渲染&#xff09; 十一、代码汇总 十二、作者的碎碎念 一、前言 因为shader…

python-斐波那契数列

[题目描述] 斐波那契数列是指这样的数列&#xff1a;数列的第一个和第二个数都为 1&#xff0c;接下来每个数都等于前面 2个数之和。 给出一个正整数 a&#xff0c;要求斐波那契数列中第 a 个数是多少。输入&#xff1a; 第 1 行是测试数据的组数 n&#xff0c;后面跟着 n 行输…

UnityShader SDF有向距离场简单实现

UnityShader SDF有向距离场简单实现 前言项目场景布置连连看画一个圆复制一个圆计算修改shader参数 鸣谢 前言 突然看到B站的一个教程&#xff0c;还不错&#xff0c;记录一下 项目 场景布置 使用ASE连连看&#xff0c;所以先要导入Amplify Shader Editor 连连看 画一个…

有人问周鸿祎: 学历不重要,为什么360只要985和211?

关注、星标公众号&#xff0c;直达精彩内容 有人问周鸿祎:你说学历不重要&#xff0c;为什么360招聘的人才只要985和211&#xff1f;他说这个事情&#xff0c;我专门问了我们的人力资源&#xff0c;我们的干品分为校园招聘和社会招聘 校园招聘的话会看文凭 社会招聘的话&#x…

性能优化:Java垃圾回收机制深度解析 - 让你的应用飞起来!

文章目录 一、什么是垃圾回收二、Java 内存区域划分三、垃圾回收算法1. 标记-清除&#xff08;Mark-Sweep&#xff09;算法2. 复制&#xff08;Copying&#xff09;算法3. 标记-整理&#xff08;Mark-Compact&#xff09;算法4. 分代收集&#xff08;Generational Collecting&a…

光储充一体化解决方案

慧哥充电桩开源平台V2.5.2_ 【源码下载】 https://liwenhui.blog.csdn.net/article/details/134773779?spm1001.2014.3001.5502 本文从光储充的原理以及总体系统等角度&#xff0c;全方位介绍光储充一体化解决方案实例。 一、典型应用场景 针对整县区域光伏项目&#xff0c;在…

综合评价类模型——突变级数法

含义 首先&#xff1a;对评价目标进行多层次矛盾分解其次&#xff1a;利用突变理论和模糊数学相结合产生突变模糊隶属函数再次&#xff1a;由归一公式进行综合量化运算最终&#xff1a;归一为一个参数&#xff0c;即求出总的隶属函数&#xff0c;从而对评价目标进行排序分析特点…

c++指针和引用之高难度(二)习题讲解

1.【单选题】 int a[4] { 1001,1002,1003,1004 }&#xff1b; int* p{ &a[1] }; p[1] ? A 1001 B 1002 C 1003 解析&#xff1a;这道题考察了指针和数组可以混用。p 指向了 数组 a[0] 的地址&#xff0c;也就是 1002 的地址&#xff0c;此时 *p p[0]…

DDPM pytorch 代码复现

本次只分享代码以及效果&#xff0c;后续更新原理 代码参考 deep_thought 先看动图效果 1.选择一个数据集 %matplotlib inline import matplotlib.pyplot as plt import numpy as np from sklearn.datasets import make_s_curve import torchs_curve, _ make_s_curve(10 **…

零基础STM32单片机编程入门(四)ADC详解及实战含源码视频

文章目录 一.概要二.STM32F103C8T6单片机ADC外设特点三.STM32单片机ADC内部结构图1.ADC相关引脚说明2.ADC通道分类3.触发源4.转换周期5.电压转换计算6.更精确电压转换计算 四.规则通道ADC采集信号流向1.单次转换模式2.连续转换模式 五.CubeMX配置一个ADC采集例程六.CubeMX工程源…

通天星CMSV6车载监控平台CompanyList信息泄露漏洞

1 漏洞描述 通天星CMSV6车载视频监控平台是东莞市通天星软件科技有限公司研发的监控平台,通天星CMSV6产品覆盖车载录像机、单兵录像机、网络监控摄像机、行驶记录仪等产品的视频综合平台。通天星科技应用于公交车车载、校车车载、大巴车车载、物流车载、油品运输车载、警车车…

风控图算法之中心性算法(小数据集Python版)

风控图算法之中心性算法&#xff08;小数据集Python版&#xff09; 图算法在金融风控领域的应用已经超越了传统的社区发现技术&#xff0c;这些技术曾被主要用于识别和分析欺诈性行为模式&#xff0c;例如黑产团伙。当前&#xff0c;一系列图统计算法&#xff0c;包括介数中心…

笔记本重装系统怎么操作? windows电脑重装系统,超实用的四种方法

重新安装操作系统是维护计算机性能和确保系统稳定运行的重要步骤。对于 Windows 笔记本用户而言&#xff0c;熟悉重装系统的方法可以帮助他们解决各种问题&#xff0c;从提高系统速度到修复软件故障。然而具体来讲&#xff0c;笔记本重装系统怎么操作呢&#xff1f;接下来&…

【01】Java代码如何运行

JRE: 包含Java虚拟机以及核心类库 JDK: 同样包含了JRE&#xff0c;并且附带了一系列开发、诊断工具 一、为什么Java要在虚拟机中运行 一、 Java语言特性&#xff1a;高级、语法复杂、抽象 Java语言-- 【编译器】 --> Java字节码 --【虚拟机】–> 实现 二、 托管环境 自…

正点原子rk3588编译sdk

1、编译SDK 1.1 安装 RK3588 Linux SDK .repo/repo/repo sync -l -j101.2 SDK 工程目录介绍 app&#xff1a;存放上层应用 app&#xff0c;包括 Qt 应用程序&#xff0c;以及其它的 C/C应用程序。 buildroot&#xff1a;基于 buildroot 开发的根文件系统。 debian&#xff1…

AIGC对图片行业的影响分析!

前言 自从去年生成式AI火起来之后&#xff0c;不论是文字领域还是图片领域受到的冲击都非常大。比如说SD和Midjourney的爆火&#xff0c;不止是创作者&#xff0c;还有交易平台和使用方&#xff0c;都在发生变化。 AIGC自2023年全面进入大家视野&#xff0c;对各行各业造成了或…