网络编程(6)——发送的时序性,全双工通信

news2024/9/27 15:15:29

六、day6

在上午学习完如何通过c++11特性模拟伪闭包实现连接的安全回收之后,下午学习如何封装一个发送接口,该接口能保证发送的时序性(异步发送时TCP底层缓冲区可能无法将所有数据一次发出去,如果这时候再次调用异步发送,就可能造成数据错乱)。实现的关键在于:多次发送时,异步的发送要保证回调触发后再次发送才能确保数据是有序的(队列)。

文章开始前将前面文章中提到的Server和Session类分成CServer.h和CSession.h两个文件,注意两个文件的依赖关系:

CSession.h

#pragma once
#include <iostream>
#include <boost/asio.hpp>
#include <map>
#include <boost/uuid/uuid_generators.hpp>
#include <boost/uuid/uuid_io.hpp>

using boost::asio::ip::tcp;
using std::cout;
using std::cin;
using std::endl;

class CServer;

class MsgNode {
public:
	int _total_len; // 数据的总长度
	int _cur_len; // 已经处理的长度(已读的长度或者已写的长度)
	char* _msg; // 数据域首地址

	MsgNode(const char* msg, int total_len) :_total_len(total_len), _cur_len(0) { // 构造写节点
		_msg = new char[total_len];
		memcpy(_msg, msg, total_len);
	}
	MsgNode(int total_len) : _total_len(total_len), _cur_len(0) { // 构造读节点
		_msg = new char[total_len];
	}
	~MsgNode() {
		delete[] _msg;
	}
};

class CSession:public std::enable_shared_from_this<CSession>
{
private:
	tcp::socket _socket; // 处理客户端读写的套接字
	enum { max_length = 1024 };
	char _data[max_length]; 

	// headle回调函数
	void headle_read(const boost::system::error_code& error, size_t bytes_transferred,
		std::shared_ptr<CSession> _self_shared);
	void haddle_write(const boost::system::error_code& error, std::shared_ptr<CSession> _self_shared);
	std::string _uuid;
	CServer* _server;
public:
	CSession(boost::asio::io_context& ioc, CServer* server) : _socket(ioc), _server(server){
		// random_generator是函数对象,加()就是函数,再加一个()就是调用该函数
		boost::uuids::uuid a_uuid = boost::uuids::random_generator()();
		_uuid = boost::uuids::to_string(a_uuid);
	}
	tcp::socket& Socket() { return _socket; }
	const std::string& GetUuid() const { return _uuid; }
	void Start();
};

CSession.cpp

#include "CSession.h"
#include "CServer.h"

void CSession::Start() {
	memset(_data, 0, max_length); // 缓冲区清零
	// 从套接字中读取数据,并绑定回调函数headle_read
	_socket.async_read_some(boost::asio::buffer(_data, max_length),
		// 这里可以将shared_ptr<Session>(this)给bind绑定吗?
		// 不可以,会造成多个智能指针绑定同一块内存的问题
		std::bind(&CSession::headle_read, this, std::placeholders::_1, std::placeholders::_2,
			shared_from_this()));
}

// 
void CSession::headle_read(const boost::system::error_code& error, size_t bytes_transferred,
	std::shared_ptr<CSession> _self_shared) {
	if (!error) {
		cout << "server receive data is " << _data << endl;
		boost::asio::async_write(_socket, boost::asio::buffer(_data, bytes_transferred),
			std::bind(&CSession::haddle_write, this, std::placeholders::_1, _self_shared));
	}
	else {
		cout << "read error" << endl;
		_server->ClearSession(_uuid);
	}
}

void CSession::haddle_write(const boost::system::error_code& error, std::shared_ptr<CSession> _self_shared) {
	if (!error) {
		memset(_data, 0, max_length);
		_socket.async_read_some(boost::asio::buffer(_data, max_length),
			std::bind(&CSession::headle_read, this, std::placeholders::_1, std::placeholders::_2, _self_shared));
	}
	else {
		cout << "write error" << error.value() << endl;
		_server->ClearSession(_uuid);
	}
}

CServer.h

#pragma once
#include "CSession.h"

class CServer
{
private:
	void start_accept();  // 启动一个acceptor
	// 当acceptor接收到连接后启动该函数
	void handle_accept(std::shared_ptr<CSession> new_session, const boost::system::error_code& error);
	boost::asio::io_context& _ioc;
	tcp::acceptor _acceptor;
	std::map<std::string, std::shared_ptr<CSession>> _sessions;
public:
	CServer(boost::asio::io_context& ioc, short port);
	void ClearSession(std::string uuid);
};

CServer.cpp

#include "CServer.h"


// 初始化服务器对象,绑定 I/O 上下文和监听的端口,并启动服务器
CServer::CServer(boost::asio::io_context& ioc, short port) : _ioc(ioc),
_acceptor(ioc, tcp::endpoint(tcp::v4(), port)) {
	cout << "Server start success, on port: " << port << endl;
	// 开始异步地接受客户端连接请求。服务器启动后就进入等待客户端连接的状态
	start_accept();
}

void CServer::ClearSession(std::string uuid) {
	_sessions.erase(uuid);
}

void CServer::start_accept() {
	// make_shared分配并构造一个 std::shared_ptr,_ioc, this是传给Session的参数
	std::shared_ptr<CSession> new_session = std::make_shared<CSession>(_ioc, this);
	// 开始一个异步接受操作,当new_session的socket与客户端连接成功时,调用回调函数handle_accept
	// 为什么new_session在右括号结束后仍不结束,而是bind后计数加一?
	// new_session通过bind绑定时,new_session的计数就会加一,所以在bind后,new_session的生命周期和
	// 新构造函数的生命周期相同,因为新生成的函数对象引用了new_session(new_session通过值传递的方式被复制构造函数使用)。
	// 所以只要新构造的bind回调函数没有被调用、移除,new_session的声明周期就始终存在,所以new_session不会随着'}'的结束而释放。
	_acceptor.async_accept(new_session->Socket(), std::bind(&CServer::handle_accept, this, new_session,
		std::placeholders::_1));
}

// 当handle_accept触发时,也就是start_accept的回调函数被触发,当该回调函数结束后从队列中移除后,new_session的引用计数减一
void CServer::handle_accept(std::shared_ptr<CSession> new_session, const boost::system::error_code& error) {
	// 如果没有错误(error 为 false),调用 new_session->Start() 来启动与旧客户端的会话
	if (!error) {
		new_session->Start();
		_sessions.insert(std::make_pair(new_session->GetUuid(), new_session));
	}
	else cout << "session accept failed, error is " << error.what() << endl;
	// 无论当前连接是否成功,都重新调用 start_accept(),以便服务器能够继续接受下一个新客户端的连接请求。
	// 服务器始终保持在监听状态,随时准备接受新连接
	start_accept();
}

1)数据节点设计

首先,使用网络编程(3)中的数据节点,作为异步服务器数据的存储节点,放在CSession.h文件中

爱吃土豆:网络编程自学(3)1 赞同 · 0 评论文章

class MsgNode {
public:
	int _total_len; // 数据的总长度
	int _cur_len; // 已经处理的长度(已读的长度或者已写的长度)
	char* _msg; // 数据域首地址

	MsgNode(const char* msg, int total_len) :_total_len(total_len), _cur_len(0) { // 构造写节点
		_msg = new char[total_len];
		memcpy(_msg, msg, total_len);
	}
	MsgNode(int total_len) : _total_len(total_len), _cur_len(0) { // 构造读节点
		_msg = new char[total_len];
	}
	~MsgNode() {
		delete[] _msg;
	}
};

2)封装发送接口

服务器的发送接口一般是在逻辑线程调用,所以调用发送线程的接口和asio回调的网络线程不在一个线程,这个发送队列就存在两个线程的共同访问,所以需增加一个保证发送队列的安全性,同时新增一个发送接口Send

void Send(char* msg,  int max_length);
std::queue<shared_ptr<MsgNode> > _send_que;
std::mutex _send_lock;

以及send的实现:

发送接口里判断发送队列是否为空,如果不为空说明有数据未发送完,需要将数据放入队列,然后返回。如果发送队列为空,则说明当前没有未发送完的数据,将要发送的数据放入队列并调用async_write函数发送数据。

这个函数确保了在多线程环境下,发送操作的有序性与安全性。通过锁来保护发送队列,通过队列来缓存多个待发送的消息,并使用异步写操作 async_write 进行非阻塞的发送。函数还确保了只有一个异步写操作会在某一时刻进行,避免同时多次发送操作对同一套接字的竞争访问。

void CSession::Send(char* msg, int max_length) {
	bool pending = false; // 发送标志,true时有未完成的发送操作,false为空
	// 使用lock_guard锁住_send_lock,确保_send_lock(发送队列)的访问的线程安全的
	// 锁的存在确保了多个线程不会同时修改发送队列
	std::lock_guard<std::mutex> lock(_send_lock);
	// 判断队列是否有未完成的发送操作
	if (_send_que.size() > 0) {
		pending = true;
	}
	_send_que.push(std::make_shared<MsgNode>(msg, max_length)); // 将发送消息存储至队列
	if (pending) { // 如果有未完成的发送,直接返回
		return;
	}
	// 异步发送
	boost::asio::async_write(_socket, boost::asio::buffer(msg, max_length),
		std::bind(&CSession::haddle_write, this, std::placeholders::_1, shared_from_this()));
} // 当'}'结束后,_send_lock解锁,发送队列解锁

3)修改读和写回调

写回调(实现了异步写操作完成后的处理逻辑,在写入操作成功时从发送队列中移除已发送的数据,并继续处理队列中的下一个数据包;如果写入操作失败,则处理错误并清除会话):

// 异步写操作完成后的回调处理函数
void CSession::haddle_write(const boost::system::error_code& error, std::shared_ptr<CSession> _self_shared) {
	if (!error) { // 检查异步写是否成功
		std::lock_guard<std::mutex> lock(_send_lock); // 加锁保护发送队列
		_send_que.pop(); // 移除上一个已发送的消息(send函数中的异步发)
		if (!_send_que.empty()) { // 若队列不为空,处理下一个消息
			auto& msgnode = _send_que.front();
			boost::asio::async_write(_socket, boost::asio::buffer(msgnode->_msg, msgnode->_total_len),
				std::bind(&CSession::haddle_write, this, std::placeholders::_1, _self_shared));
		}
	}
	else {
		std::cout << "handle write failed, error is " << error.what() << endl;
		_server->ClearSession(_uuid);
	}
}

读回调:

因为服务器一般是全双工通信,所以要一直监听对端发送的数据,在每次收到数据后继续绑定监听事件

void CSession::headle_read(const boost::system::error_code& error, size_t bytes_transferred,
	std::shared_ptr<CSession> _self_shared) {
	if (!error) {
		cout << "server receive data is " << _data << endl;
		Send(_data, bytes_transferred); // 将收到的消息回传
		memset(_data, 0, max_length); // 缓冲区清零
		_socket.async_read_some(boost::asio::buffer(_data, max_length), std::bind(&CSession::headle_read, this,
			std::placeholders::_1, std::placeholders::_2, _self_shared));
	}
	else {
		std::cout << "handle read failed, error is " << error.what() << endl;
		_server->ClearSession(_uuid);
	}
}

1.服务器的发送接口一般是在逻辑线程调用,调用发送线程的接口和asio回调的网络线程不在一个线程,发送队列存在两个线程的共同访问,如何解释这句话?

1)逻辑线程

  • 服务器程序可能有多个线程执行不同的任务,其中有一个或多个线程专门负责业务逻辑(通常称为逻辑线程)。这些线程负责处理如游戏逻辑、业务处理等高层次的操作。
  • 发送数据的请求通常由这些逻辑线程发起,也就是说,逻辑线程会调用服务器的发送接口来准备或触发向客户端发送数据。

2)网络线程

  • 使用Boost.Asio这样的异步I/O库时,实际处理网络通信的部分是由网络线程(通常是由Boost.Asio提供的线程)来负责。这些线程处理所有的网络事件和I/O操作,比如读写操作完成时的回调函数。
  • 当逻辑线程调用发送接口时,实际的数据发送操作是交由网络线程处理的,因此存在逻辑线程网络线程之间的协作。这两个线程不是同一个线程,存在并发访问的问题。

3)发送队列

  • 为了实现异步发送,服务器通常会有一个发送队列,用于暂存即将发送的数据包。逻辑线程将数据放入这个队列中,而网络线程则从队列中取出数据并通过网络发送出去。
  • 因为这个发送队列是由两个不同的线程(逻辑线程和网络线程)共同访问的,因此会有并发问题。如果没有进行适当的同步控制(如加锁或使用无锁队列),可能会导致数据竞争(data race)、不一致或崩溃等问题。

2."std::lock_guard<std::mutex> lock(_send_lock)"是如何保护发送队列的,什么时候解除保护?

std::lock_guard<std::mutex> 是一个类模板,它会在创建时锁住传递的互斥量(mutex),并在离开作用域时自动解锁。锁的保护通过以下机制实现:

std::lock_guard<std::mutex> lock(_send_lock);
  • 这行代码创建了一个 std::lock_guard 对象 lock,并将 _send_lock 传递给它,表示要锁定 _send_lock 互斥量。
  • 一旦 lock 对象被创建,构造函数会立即锁定 _send_lock,从而确保在该作用域内,其他线程无法同时访问受该锁保护的资源。
  • 锁定后,直到当前代码块结束前,其他线程无法获取 _send_lock,从而保证了临界区(即锁定代码之后的代码块)的线程安全。

std::lock_guard<std::mutex> 的锁定持续到该对象的生命周期结束。当 lock 对象超出其作用域时(即代码块结束时),它会自动调用其析构函数,从而释放互斥锁 _send_lock。

3.锁的机制

锁是用于协调对共享资源(比如发送队列)的访问,确保在多线程环境中只有一个线程能够在某一时刻访问该资源,当一个线程在访问发送队列时:

std::lock_guard<std::mutex> lock(_send_lock);
_send_que.push(...);  // 或者 _send_que.pop()
  • 通过 lock_guard 锁定 _send_lock,只有当前线程能进入这段代码,并操作 _send_que
  • 如果其他线程也想访问队列,它们会在获取 _send_lock 时被阻塞,直到当前线程释放锁。这就防止了多个线程同时修改队列的可能性。

举例:

void CSession::Send(char* msg, int max_length) {
    std::lock_guard<std::mutex> lock(_send_lock); // 锁定互斥锁
    _send_que.push(std::make_shared<MsgNode>(msg, max_length)); // 对共享队列的操作
    // 其他代码
}
  • 当线程A调用 Send() 函数并进入这段代码时,它加锁 _send_lock,防止其他线程B同时修改 _send_que。
  • 线程B调用 Send() 函数时,会发现 _send_lock 被线程A持有,线程B必须等待线程A释放锁后,才能获得锁并访问队列。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/2170327.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

el-table+el-form实现表单校验和解决不垂直居中导致的问题

el-tableel-form实现表单校验 1.实现el-table的表单校验 关键点123 2.解决不垂直居中导致的问题 问题效果图 解决方案 .item-align-center {display: inline-flex; }

OJ在线评测系统 原生Java代码沙箱核心实现流程三 整理封装输出结果 拿到程序执行时间(stopwatch类) 和 运行内存

我们在之前的操作中已经拿到程序进行了编译和运行 接下来我们要将我们的结果输出 整理输出 // 4.收集整理输出结果 ExecuteCodeResponse executeCodeResponse new ExecuteCodeResponse(); ArrayList<String> outputList new ArrayList<>();for (ExecuteMessage…

Library介绍(一)

之前和大家介绍过cell delay是如何计算的。那么&#xff0c;本文将着重和大家介绍一些timing lib中的各个参数定义是什么意思。会分以下几个部分介绍&#xff1a;库属性描述、时序弧介绍、环境描述、单元描述。之前介绍的cell delay template就是单元描述中的一部分。本文主要介…

网络安全入门必备:这四点你做到了吗?

数据的鸿沟无疑是显而易见的&#xff0c;网络安全领域亟需熟练的专业人员。 组织在这方面投入巨大资金&#xff0c;但挑战依旧存在。 根据最新的研究&#xff0c;有64%的违规行为是导致机构过去一年收入损失及/或罚款的主要原因。 60%的组织在努力招聘网络安全人才&#xff…

【市场解读】新能源汽车换代问题

参考文献&#xff1a;百分点舆情中心《新能源汽车换代问题消费者情绪洞察报告》 行业背景 新能源汽车市场竞争加剧&#xff0c;车企不断推陈出新政府发布《汽车以旧换新补贴实施细则》&#xff0c;激励市场发展 *对汽车换代问题媒体关注度与网友讨论度高&#xff0c;正面声量…

电脑退域后系统黑屏

之前加入域时迁移了账号系统&#xff0c;导致退域后本地账号系统没了东西黑屏但能看到鼠标。也登不了域账号了一顿慌张&#xff08;操作如下&#xff09; 解决&#xff1a;又加回了域哈哈哈 重启电脑按F8进不去安全模式&#xff0c;找不到触发时间... winr打开运行&#xff0c;…

都说网络安全缺口那么大,但为何招聘数量却不多?总算明白了!

为啥网安领域缺口多达300多万人&#xff0c;但网安工程师也就是白帽黑客却很少&#xff0c;难道又是砖家在忽悠人&#xff1f; 原因主要为这三点: 首先是学校的原因&#xff0c;很多学校网络安全课程用的还都是十年前的老教材&#xff0c;教学脱离社会需求&#xff0c;实操技能…

【Python报错已解决】TypeError: expected Tensor as element 1 in argument 0, but got int

&#x1f3ac; 鸽芷咕&#xff1a;个人主页 &#x1f525; 个人专栏: 《C干货基地》《粉丝福利》 ⛺️生活的理想&#xff0c;就是为了理想的生活! 专栏介绍 在软件开发和日常使用中&#xff0c;BUG是不可避免的。本专栏致力于为广大开发者和技术爱好者提供一个关于BUG解决的经…

NHANES数据(复杂调查数据)亚组交互函数2.3版(P for interaction)发布---用于一键生成交互效应表

写在前面的话&#xff0c;本函数只支持NHANES数据(复杂调查数据)的逻辑回归和线性回归&#xff0c;其他类型均不支持&#xff0c;请注意甄别&#xff0c;电子产品&#xff0c;买错不能退换。 在SCI文章中&#xff0c;交互效应表格&#xff08;通常是表五&#xff09;能为文章锦…

多类别物体检测系统源码分享

多类别物体检测检测系统源码分享 [一条龙教学YOLOV8标注好的数据集一键训练_70全套改进创新点发刊_Web前端展示] 1.研究背景与意义 项目参考AAAI Association for the Advancement of Artificial Intelligence 项目来源AACV Association for the Advancement of Computer V…

“领航猿1号” 正式更名为 “AGI舰长”

亲爱的朋友们&#xff0c;很高兴的告诉大家&#xff1a; 我各个平台的账号昵称正式 由“领航猿1号” 更名为 “AGI舰长” 为什么更名&#xff1a; 为了更好的更专注的为大家提供关于“AI大模型全栈”的分享&#xff0c;特此以 AI 为关键元素更名账号名称&#xff0c;大家可以…

企业内网知识问答库小程序源码系统 收录好+排名高 带完整的安装代码包以及搭建部署教程

系统概述 企业内网知识问答库小程序源码系统是一款集知识收集、整理、检索与分享于一体的综合解决方案。它基于现代Web技术和小程序框架开发&#xff0c;旨在为企业内部员工提供一个便捷、高效的知识交流平台。该系统不仅支持文本、图片、视频等多种形式的内容输入&#xff0c…

国际版短剧系统开发,海外多语言切换短剧APP源码部署上架

一、背景与需求 1. 背景介绍 随着全球化进程的加速和移动互联网的普及&#xff0c;短剧作为一种新型娱乐形式在全球范围内迅速走红。海外短剧系统是针对这一市场需求而开发的&#xff0c;旨在为全球观众提供高质量的短剧内容&#xff0c;并通过多样化的平台和服务&#xff0c…

禁止吸烟监测系统 基于图像处理的吸烟检测系统 YOLOv7

吸烟是引发火灾的重要原因之一。烟头在未熄灭的情况下&#xff0c;其表面温度可达200℃-300℃&#xff0c;中心温度甚至能高达700℃-800℃。在易燃、易爆的生产环境中&#xff0c;如化工厂、加油站、仓库等&#xff0c;一个小小的烟头就可能引发灾难性的火灾&#xff0c;造成巨…

【前端样式】Sweetalert2简单用法

1、 先安装sweetalert2库&#xff1a; npm install sweetalert2 2、引用SweetAlert2 库&#xff1a; import Swal from sweetalert2 &#xff1b; 3、代码拷过去直接去测试&#xff0c;vue代码 <template><div><el-button style"color: #C03639" clic…

【计算机网络 - 基础问题】每日 3 题(二十八)

✍个人博客&#xff1a;Pandaconda-CSDN博客 &#x1f4e3;专栏地址&#xff1a;http://t.csdnimg.cn/fYaBd &#x1f4da;专栏简介&#xff1a;在这个专栏中&#xff0c;我将会分享 C 面试中常见的面试题给大家~ ❤️如果有收获的话&#xff0c;欢迎点赞&#x1f44d;收藏&…

Kafka学习笔记(一)Linux环境基于Zookeeper搭建Kafka集群、Kafka的架构

文章目录 1 Kafka简介1.1 什么是Kafka1.2 Kafka的应用场景1.3 Kafka的优势 2 搭建Kafka集群2.1 搭建Zookeeper集群2.1.1 上传并解压安装包2.1.2 修改配置文件2.2.3 创建dataDir和myid文件2.2.4 分发到另外两个节点2.2.5 修改node-02节点、node-03节点的配置文件和myid文件2.2.6…

【原创教程】西门子_部件手动模式FB块编辑

1、软件配置 ①软件配置 名称 版本 博图 V16 2、建立FB块 在编辑手动程序前应该建立手动程序的FB块&#xff0c;FB块的建立内容如下图所示 ①FB块的输入接口 Input:FB块的输入接口&#xff0c;将下拉列表中的数据应用于该FB块所编辑的程序中。 NO&#xff1a;当前部件…

数据科学 - 字符文本处理

1. 字符串的基本操作 1.1 结构操作 1.1.1 拼接 • 字符串之间拼接 字符串之间的拼接使用进行字符串的拼接 a World b Hello print(b a) • 列表中的字符串拼接 将以分隔符‘,’为例子 str [apple,banana] print(,.join(str)); • 字符串中选择 通过索引进行切片操…

一个 Java 语言简化处理 PDF 的框架,提供了一套简单易用的 API 接口,满足多样化需求又能简化开发流程的处理方案(附教程)

前言 当前市面上处理 PDF 文件的工具众多&#xff0c;但它们往往存在一定的局限性&#xff0c;比如复杂交互、功能单一等问题。尤其对于那些需要频繁生成或编辑 PDF 文档的应用场景来说&#xff0c;找到一个既能满足多样化需求又能简化开发流程的处理方案显得尤为重要。那么&a…