boost asio异步服务器(4)处理粘包问题tlv

news2024/11/24 4:45:43

粘包的产生

当客户端发送多个数据包给服务器时,服务器底层的tcp接收缓冲区收到的数据为粘连在一起的。这种情况的产生通常是服务器端处理数据的速率不如客户端的发送速率的情况。比如:客户端1s内连续发送了两个hello world!,服务器过了2s才接收数据,那一次性读出两个hello world!

tcp底层的安全和效率机制不允许字节数特别少的小包发送频率过高,tcp会在底层累计数据长度到一定大小才一起发送,比如连续发送1字节的数据要累计到多个字节才发送。

粘包处理

处理粘包的方式主要采用应用层定义收发包格式的方式,这个过程俗称切包处理,常用的协议被称为tlv协议(消息id+消息长度+消息内容)。

tlv

TLV(Type-Length-Value)是一种通信协议,用于在通信中传输结构化数据。它将数据分为三个部分:类型(Type)、长度(Length)和值(Value),每个部分都以固定的格式进行编码和解码。

但是我下边的格式并不是标准的tlv格式,而是采用的lv模式,即只包含length和value。

完善消息节点

class MsgNode {
public:
    //这里的构造方法主要方便后续调用Send接口构造消息节点
	MsgNode(char* msg, short data_len) : total_len(data_len + HEAD_LENGTH), cur_len(0) {
		_data = new char[total_len + 1];
		memcpy(_data, &data_len, HEAD_LENGTH);
		memcpy(_data + HEAD_LENGTH, msg, data_len);
		_data[total_len] = '\0';
	}
    //这里的构造方法则是用于在进行切包过程中构造处理数据的节点
	MsgNode(short data_len) :total_len(data_len), cur_len(0) {
		_data = new char[total_len + 1];
	}
    //Clear方法是用于清理节点的数据,避免多次构造析构节点
	void Clear() {
		memset(_data, 0, total_len);
		cur_len = 0;
	}
	~MsgNode() {
		delete[] _data;
	}
private:
	friend class Session;
	//表示已经处理的数据长度
	int cur_len;
	//表示处理数据的总长度
	int total_len;
	//表示数据的首地址
	char* _data;
};

完善两个构造函数和添加Clear函数

1、第一个构造方法主要方便后续调用Send接口构造消息节点
2、第二个构造方法则是用于在进行切包过程中构造处理数据的节点
3、Clear方法是用于清理节点的数据,避免多次构造析构节点

session类完善

_recv_msg_node用于存放收到数据包中的数据

_b_head_parse表示头部是否解析完成

_recv_head_node用于存放接收到数据包中的头部信息

完善hand_read回调函数

void Session::handle_read(const boost::system::error_code& ec, size_t bytes_transferred,
	std::shared_ptr<Session> self_shared) {
	if (ec) {
		std::cout << "read error, error code: " << ec.value() <<
			" read message: " << ec.message() << std::endl;
		Close();
		server_->ClearSession(uuid);
	}
	else {
		PrintRecvData(data_, bytes_transferred);
		std::chrono::milliseconds dura(2000);
		std::this_thread::sleep_for(dura);
		
		//已经移动的字节数
		int copy_len = 0;
		while (bytes_transferred) {
			//头部尚未解析完成
			if (!_b_head_parse) {
				//收到的数据不足头部大小,这种情况很少发生
				if (bytes_transferred + _recv_head_node->cur_len < HEAD_LENGTH) {
					memcpy(_recv_head_node->_data + _recv_head_node->cur_len, data_ + copy_len, bytes_transferred);
					_recv_head_node->cur_len += bytes_transferred;
					memset(data_, 0, MAX_LENGTH);
					sock_.async_read_some(boost::asio::buffer(data_, MAX_LENGTH),
						std::bind(&Session::handle_read, this,
							std::placeholders::_1, std::placeholders::_2, self_shared));
					return;
				}

				//走到这里,说明收到的数据大于头部,可能是一个粘连的数据包,但是首先需要将头部节点两字节读完

				//处理头部剩余未复制的长度
				int head_remain = HEAD_LENGTH - _recv_head_node->cur_len;
				if (head_remain) {
					memcpy(_recv_head_node->_data + _recv_head_node->cur_len, data_ + copy_len, head_remain);
					//更新已处理的数据
					copy_len += head_remain;
					/*
					* 这里不能更新头部节点的cur_len。
					* 因为
					* 1、当一次进来cur_len等于0,处理之后的偏移量copy_len就为2
					* 2、当头部未读取完成,后续读取会修正为正确的偏移量(但是种情况很少发生)
					* 3、之后的读取头部信息都会发生覆盖
					*/
					//_recv_head_node->cur_len += head_remain;
					bytes_transferred -= head_remain;
				}

				//获取头部数据
				short data_len = 0;
				memcpy(&data_len, _recv_head_node->_data, HEAD_LENGTH);
				std::cout << "data_len is " << data_len << std::endl;

				if (data_len > MAX_LENGTH) {
					std::cout << "invalid data length is " << data_len << std::endl;
					server_->ClearSession(uuid);
					return;
				}

				//头部节点处理完成,就可以开始处理数据域的数据节点
				_recv_msg_node = std::make_shared<MsgNode>(data_len);

				//消息长度小于头部规定长度,说明数据未收全,则先将消息放到接收节点中
				if (bytes_transferred < data_len) {
					memcpy(_recv_msg_node->_data + _recv_msg_node->cur_len, data_ + copy_len, bytes_transferred);
					_recv_msg_node->cur_len += bytes_transferred;
					memset(data_, 0, MAX_LENGTH);
					sock_.async_read_some(boost::asio::buffer(data_, MAX_LENGTH),
						std::bind(&Session::handle_read, this,
							std::placeholders::_1, std::placeholders::_2, self_shared));

					//表示头部处理完成,当下次进来的时候,就会直接跳过头部处理环节
					_b_head_parse = true;
					return;
				}

				//走到这里表示消息长度大于头部规定长度,这里可能是一个完整包,也可能是多个粘连的包
				memcpy(_recv_msg_node->_data + _recv_msg_node->cur_len, data_ + copy_len, data_len);
				_recv_msg_node->cur_len += data_len;
				copy_len += data_len;
				bytes_transferred -= data_len;
				_recv_msg_node->_data[_recv_msg_node->total_len] = '\0';
				std::cout << "receive data is: " << _recv_msg_node->_data << std::endl;

				//调用send发送给客户端
				Send(_recv_msg_node->_data, _recv_msg_node->total_len);

				//继续轮询处理下个未处理的数据,重置数据包和头部解析的情况
				_b_head_parse = false;
				_recv_msg_node->Clear();
				//说明这不是一个多个粘连的数据包
				if (bytes_transferred <= 0) {
					memset(data_, 0, MAX_LENGTH);
					sock_.async_read_some(boost::asio::buffer(data_, MAX_LENGTH),
						std::bind(&Session::handle_read, this,
							std::placeholders::_1, std::placeholders::_2, self_shared));
					return;
				}
				//走到这里说明这就是一个多个粘连的数据包
				continue;
			}

			//走到这里就说明头部是已经解析完成的,是处理数据未收全的情况
			int remain_msg = _recv_msg_node->total_len - _recv_msg_node->cur_len;
			//说明收到的数据仍然不足头部规定大小的情况
			if (bytes_transferred < remain_msg) {
				memcpy(_recv_msg_node->_data + _recv_msg_node->cur_len, data_ + copy_len, bytes_transferred);
				_recv_msg_node->cur_len += bytes_transferred;
				memset(data_, 0, MAX_LENGTH);
				sock_.async_read_some(boost::asio::buffer(data_, MAX_LENGTH),
					std::bind(&Session::handle_read, this,
						std::placeholders::_1, std::placeholders::_2, self_shared));
				return;
			}

			//走到这里说明收到的数据是大于等于头部规定大小的,接收到的数据可能是个完整的数据包,也可能多个粘连的数据包
			memcpy(_recv_msg_node->_data + _recv_msg_node->cur_len, data_ + copy_len, remain_msg);
			_recv_msg_node->cur_len += remain_msg;
			bytes_transferred -= remain_msg;
			copy_len += remain_msg;
			_recv_msg_node->_data[_recv_msg_node->total_len] = '\0';
			std::cout << "receive data is: " << _recv_msg_node->_data << std::endl;

			//处理完当前数据包的分割后,调用send接口向客户端发送回去
			Send(_recv_msg_node->_data, _recv_msg_node->total_len);

			//继续轮询处理下个数据包,重置接收数据节点和头部解析情况
			_b_head_parse = false;
			_recv_msg_node->Clear();
			//说明数据包并不是粘连的
			if (bytes_transferred <= 0) {
				memset(data_, 0, MAX_LENGTH);
				sock_.async_read_some(boost::asio::buffer(data_, MAX_LENGTH),
					std::bind(&Session::handle_read, this,
						std::placeholders::_1, std::placeholders::_2, self_shared));
				return;
			}
			//走到这里说明数据包是粘连的
			continue;	
		}
	}
}

这里hand_read函数的完善逻辑代码比较长,其中的注释给的比较详细,需要各位仔细读。但是逻辑可能头一两次读可能还是会有些蒙,多读几遍可能就会好得多。

这里还是得必要得说一下,我们都知道异步读写函数得回调函数中的参数bytes_transferred表示已经读取到的字节数,但是我们在这里还是需要对这些已经读到的数据进行处理。其中定义copy_len表示已经处理的字节数,bytes_transferred则表示为还未处理的数据(尽管已经被读取到了,但是还是尚未被处理,需要好好理解下)。

这里在session类中还定义了两个宏,MAX_LENGTH表示数据包的最大长度,就是1024*2字节。HEAD_LENGTH表示头部长度,就是2字节。

这里我也画了一个逻辑图供大家梳理这里的代码逻辑,希望能对大家理解有帮助。

粘包现象的测试

在session类中写一个打印函数,在每次触发读事件回调的时候调用下这个函数。这里打印的是tcp缓冲区的数据,boost asio从tcp已经是已经做了将tcp缓冲区的数据拿出来的,所以这里打印即可。

为了制造粘包现象,我们可以让服务器端隔2s处理一次读写,而客户端则不停的发送和读取就能制造出粘包现象了。下边是提供的客户端的代码。

#include <iostream>
#include <boost/asio.hpp>
#include <thread>
using namespace std;
using namespace boost::asio::ip;
const int MAX_LENGTH = 1024 * 2;
const int HEAD_LENGTH = 2;
int main()
{
	//测试粘包现象客户端
	try {
		//创建上下文服务
		boost::asio::io_context   ioc;
		//构造endpoint
		tcp::endpoint  remote_ep(address::from_string("127.0.0.1"), 1234);
		tcp::socket  sock(ioc);
		boost::system::error_code   error = boost::asio::error::host_not_found;
		sock.connect(remote_ep, error);
		if (error) {
			cout << "connect failed, code is " << error.value() << " error msg is " << error.message();
			return 0;
		}

		thread send_thread([&sock] {
			for (;;) {
				this_thread::sleep_for(std::chrono::milliseconds(2));
				const char* request = "hello world!";
				size_t request_length = strlen(request);
				char send_data[MAX_LENGTH] = { 0 };
				memcpy(send_data, &request_length, 2);
				memcpy(send_data + 2, request, request_length);
				boost::asio::write(sock, boost::asio::buffer(send_data, request_length + 2));
			}
			});

		thread recv_thread([&sock] {
			for (;;) {
				this_thread::sleep_for(std::chrono::milliseconds(2));
				cout << "begin to receive..." << endl;
				char reply_head[HEAD_LENGTH];
				size_t reply_length = boost::asio::read(sock, boost::asio::buffer(reply_head, HEAD_LENGTH));
				short msglen = 0;
				memcpy(&msglen, reply_head, HEAD_LENGTH);
				char msg[MAX_LENGTH] = { 0 };
				size_t  msg_length = boost::asio::read(sock, boost::asio::buffer(msg, msglen));

				std::cout << "Reply is: ";
				std::cout.write(msg, msglen) << endl;
				std::cout << "Reply len is " << msglen;
				std::cout << "\n";
			}
			});

		send_thread.join();
		recv_thread.join();
	}
	catch (std::exception& e) {
		std::cerr << "Exception: " << e.what() << endl;
	}
	return 0;
}

现象如下图,测试环境Windows visual studio 

完整服务端代码:codes-C++: C++学习 - Gitee.com

这里的echo服务器实现了粘包的处理,但是在不同的平台下仍存在收发数据异常的问题,其根本原因就是平台大小端的差异。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/1861659.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

一份简单的海外问卷,改变经济现状

在许多人看来&#xff0c;赚钱似乎总是与资金和技术密切相关。然而&#xff0c;即使没有丰富的资金和高超的技术&#xff0c;仍然有机会赚取可观的收入。 首先&#xff0c;需要明确的是&#xff0c;赚钱并非完全依赖于物质资本和技术能力。在这个充满机遇的时代&#xff0c;选…

鸿蒙开发网络管理:【@ohos.net.webSocket (WebSocket连接)】

WebSocket连接 说明&#xff1a; 本模块首批接口从API version 6开始支持。后续版本的新增接口&#xff0c;采用上角标单独标记接口的起始版本。 使用WebSocket建立服务器与客户端的双向连接&#xff0c;需要先通过[createWebSocket]方法创建[WebSocket]对象&#xff0c;然后通…

python基础篇(4):range语句

1 功能介绍 range语句的功能是获得一个数字序列&#xff08;可迭代类型的一种&#xff09; 2 语法 语法1&#xff1a; range(num) 获取一个从0开始&#xff0c;到num结束的数字序列&#xff08;不含num本身&#xff09; 如range(5)取得的数据是&#xff1a;[0, 1, 2, 3, 4…

springboot多数据源应用,A服务依赖于B服务jar包,A服务和B服务业务数据分别入自己的库如何做?

上一节我们简单阐述了springboot多数据源如何配置。在实际的业务场景中我们常常遇到A服务依赖于B服务jar包&#xff0c;A服务和B服务业务数据分别入自己的库中。为何要这么做呢&#xff1f;比如B服务是日志SDK&#xff0c;A服务集成B服务来实现记录日志的功能&#xff0c;但是日…

安卓直装植物大战僵尸杂交版V2.1版完美运行

在移动游戏的世界里&#xff0c;植物大战僵尸无疑是一款深受玩家喜爱的经典游戏。如今&#xff0c;随着技术的发展和玩家需求的变化&#xff0c;植物大战僵尸杂交版V2.1版应运而生&#xff0c;为安卓用户带来了全新的游戏体验。 这一全新版本在原有游戏的基础上进行了多项创新…

TOP 5免费音频转换器,轻松搞定音频格式转换(白嫖党必备)

你是否正在寻找能提升音乐体验并满足各种需求的好用的免费音频转换器呢&#xff1f;如果是的话&#xff0c;那么请不用再找了&#xff01;在本篇文章中&#xff0c;我们汇总了顶级的音频格式转换软件&#xff0c;并且列出了其所兼容的操作系统及适合人群&#xff0c;从而满足你…

【PyTorch函数解析】einsum的用法示例

一、前言 einsum 是一个非常强大的函数&#xff0c;用于执行张量&#xff08;Tensor&#xff09;运算。它的名称来源于爱因斯坦求和约定&#xff08;Einstein summation convention&#xff09;&#xff0c;在PyTorch中&#xff0c;einsum 可以方便地进行多维数组的操作和计算…

PCL笔记二 之VS环境配置(不同版本Debug+Release编译)

PCL笔记二 之VS环境配置&#xff08;不同版本DebugRelease编译&#xff09; PCL官网&#xff1a;https://github.com/PointCloudLibrary/pcl/releases众所周知&#xff0c;PCL是一个用于点云处理并且依赖不少三方库的一个算法库&#xff0c;同时在编译配置环境时也很复杂&…

【ONLYOFFICE8.1桌面编辑器】强势来袭—— 一款全面的办公软件套件

在日常工作和学习中&#xff0c;我们经常需要处理各种文档、表格和演示文稿。一款功能强大、易于使用的办公软件成为我们提高工作效率、便捷沟通和展示想法的得力助手。 而ONLYOFFICE 8.1桌面编辑器正是一款全面、高效的办公软件&#xff0c;集合了Word、PPT、Excel的功能&…

如何设置windows计划任务

如何设置windows计划任务 前言&#xff1a;在工作过程中写了一个python脚本&#xff0c;用于调用jira接口查询bug单数量&#xff0c;想要在本地定时任务执行&#xff0c;每天发送到钉钉群提醒&#xff0c;写下操作步骤用于记录。 1. 准备 Python 脚本 确保你的 Python 脚本已…

暗影精灵8Pro声音没有了,这个方法可以解决,亲测有效!

这个OMEN by HP Gaming Laptop 16-k0xxx Windows 10 Sound Driver Mod &#xff0c;真的解决了我的大问题&#xff01; 如果你的暗影精灵8 Pro酷睿版突然变得哑巴了&#xff0c;扬声器和麦克风都发不出声音&#xff0c;那可能是声卡驱动出了问题。 别担心&#xff0c;我也是个…

Android简介

1. Android简介 Android是一种基于Linux内核的自由及开放源代码的操作系统。最初是由安迪鲁宾(Andy Rubin)开发的一款相机操作系统。2005年8月被Google收购。2007年11月&#xff0c;Google与84家硬件制造商、软件开发商及电信营运商组建开放手机联盟共同研发改良Android系统。…

8.12 矢量图层面要素单一符号使用七(随机标记填充)

文章目录 前言随机标记填充&#xff08;Random Marker Fill&#xff09;QGis设置面符号为随机标记填充&#xff08;Random Marker Fill&#xff09;二次开发代码实现随机标记填充&#xff08;Random Marker Fill&#xff09; 总结 前言 本章介绍矢量图层线要素单一符号中使用随…

手机照片压缩到20k以内免费,这几款心动软件快收好!

在数字化时代&#xff0c;手机拍照已成为我们记录生活的重要方式之一。然而&#xff0c;高清的照片也意味着占用着越来越多的手机存储空间。如果你正在为手机内存告急而烦恼&#xff0c;那么这几款手机照片压缩神器或许能成为你的救星&#xff01;它们不仅可以将照片轻松压缩至…

Three.js——第一篇:部署以及基础代码创建场景、GUI调整样式

three.js官网&#xff1a;three.js docs 中文技术文档1&#xff1a;| 麒跃科技 中文技术文档2&#xff1a;3. 开发和学习环境&#xff0c;引入threejs | Three.js中文网 很多教程一开始要大家自己部署three.js的中文本地部署&#xff0c;我就不弄了&#xff0c;我弄了半天也没…

大厂薪资福利篇第四弹:字节跳动

欢迎来到绝命Coding&#xff01; 今天继续更新大家最关心的 大厂薪资福利系列&#xff01; 往期分享&#xff1a; 福利开水喝不完&#xff1f;大厂薪资福利篇&#xff01;美团 职场文化发源地&#xff1f;大厂薪资福利篇&#xff01;阿里巴巴 给这么多&#xff01;还能带宠物上…

Adams Flex模块功能介绍

通过该教程对Adams Flex模块有基本的认知&#xff0c;为以后使用柔性体进行刚柔耦合做好基础学习。 有需要购买的可以邮箱&#xff08;digitaltwins126.com&#xff09;或站内信联系&#xff0c;谢谢&#xff01;

机器学习之数学基础(七)~过拟合(over-fitting)和欠拟合(under-fitting)

目录 1. 过拟合与欠拟合 1.1 Preliminary concept 1.2 过拟合 over-fitting 1.3 欠拟合 under-fitting 1.4 案例解析&#xff1a;黑天鹅 1. 过拟合与欠拟合 1.1 Preliminary concept 误差 经验误差&#xff1a;模型对训练集数据的误差。泛化误差&#xff1a;模型对测试…

基于SpringBoot的“智慧食堂”管理系统设计与实现

你好呀&#xff0c;我是计算机学姐码农小野&#xff01;如果有相关需求&#xff0c;可以私信联系我。 开发语言&#xff1a;Java 数据库&#xff1a;MySQL 技术&#xff1a;SpringBootVue 工具&#xff1a;IDEA/Eclipse、Navicat、Maven 系统展示 首页 用户管理界面 菜品…

超炫酷, 不用学前端也能自己做网页!这个Python库,3分钟内复刻GPT WEB应用

大家好&#xff0c;我是海鸽。 今天&#xff0c;我要和大家分享如何将请求 GPT 的案例&#xff0c;快速“复刻”成 GPT 网页版。这不仅简单&#xff0c;而且对于我们这些后端开发者来说&#xff0c;简直是福音&#xff01; 先睹为快 看看这个界面&#xff0c;是不是感觉很熟…