网络编程(18)——使用asio协程实现并发服务器

news2024/10/11 23:42:20

十八、day18

到目前为止,我们以及学习了单线程同步/异步服务器、多线程IOServicePool和多线程IOThreadPool模型,今天学习如何通过asio协程实现并发服务器

并发服务器有以下几种好处:

  • 协程比线程更轻量,创建和销毁协程的开销较小,适合高并发场景
  • 协程通常在单线程中运行,避免了多线程带来的资源竞争和同步问题,从而减少了内存使用
  • 将回调函数改写为顺序调用,让异步的函数能够以同步的方式写出来的同时不降低性能,提高开发效率
  • 协程调度比线程调度更轻量化,因为协程是运行在用户空间的,线程切换需要在用户空间和内核空间切换

首先需将C++语言标准换为C++20标准,协程是在C++20之后引入的新标准

1. 官方案例

asio官网提供了一个协程并发编程的案例,如下

#include <iostream>
#include <boost/asio/co_spawn.hpp>
#include <boost/asio/detached.hpp>
#include <boost/asio/io_context.hpp>
#include <boost/asio/ip/tcp.hpp>
#include <boost/asio/signal_set.hpp>
#include <boost/asio/write.hpp>

using boost::asio::ip::tcp;
using boost::asio::awaitable;
using boost::asio::co_spawn;
using boost::asio::detached;
using boost::asio::use_awaitable;
namespace this_coro = boost::asio::this_coro;

awaitable<void> echo(tcp::socket socket) {
    try {
        char data[1024];
        for (;;) {
            std::size_t n = co_await socket.async_read_some(boost::asio::buffer(data), use_awaitable);
            co_await async_write(socket, boost::asio::buffer(data, n), use_awaitable);
        }
    }
    catch (std::exception& e) {
        std::cout << "Echo exception is " << e.what() << std::endl;
    }
}

awaitable<void> listener() {
    auto executor = co_await this_coro::executor;
    tcp::acceptor acceptor(executor, { tcp::v4(), 10086 });
    for (;;) {
        tcp::socket socket = co_await acceptor.async_accept(use_awaitable);
        co_spawn(executor, echo(std::move(socket)), detached);
    }
}

int main()
{
    try {
        boost::asio::io_context io_context(1); // 1被用于提供有关所需并发级别的提示
        boost::asio::signal_set signals(io_context, SIGINT, SIGTERM);
        signals.async_wait([&](auto, auto) { // 处理几个信号就传入几个参数,这里使用auto自动推断
            io_context.stop();
            });
        co_spawn(io_context, listener(), detached);
        io_context.run();

    }
    catch (std::exception& e) {
        std::cout << "Exception is " << e.what() << std::endl;
    }
}

a. 声明

using boost::asio::ip::tcp;
using boost::asio::awaitable;
using boost::asio::co_spawn;
using boost::asio::detached;
using boost::asio::use_awaitable;
namespace this_coro = boost::asio::this_coro;
  • awaitable :用于定义可以在协程中使用的异步操作,可以通过 co_await 关键字等待异步任务的完成,使异步的函数能够以同步的方式写出来的同时不降低性能
  • co_spawn:用于启动新的协程的函数,可以用它来创建新的异步任务并在指定的执行上下文中运行
  • detached:指示器,表示创建的协程不需要等待其结果。使用 detached 后,协程会在后台独立运行
  • use_awaitable:适配器,指示以协程的方式使用 Boost.Asio 的异步操作,它使得异步操作可以与 co_await 关键字结合使用。适配器允许将异步操作的结果直接与协程的执行流结合,使得异步调用能够以同步的方式写出,从而避免了手动管理回调函数
  • co_await 关键字的作用:
    • 当协程遇到 co_await 时,它会挂起执行,直到被等待的异步操作完成。这允许当前线程释放 CPU,去处理其他任务或协程。
    • 一旦等待的操作完成,协程会自动恢复执行,继续从挂起的地方运行。这样可以避免复杂的回调地狱,提供更直观的控制流。
    • co_await 会自动获取异步操作的结果并将其返回给调用者。例如,如果等待的是一个返回值的异步操作,结果会被赋给相应的变量。
    • 如果在 co_await 等待的异步操作中发生异常,协程可以捕获这些异常,方便进行错误处理。

b. echo()

awaitable<void> echo(tcp::socket socket) {
    try {
        char data[1024];
        for (;;) {
            std::size_t n = co_await socket.async_read_some(boost::asio::buffer(data), use_awaitable);
            co_await async_write(socket, boost::asio::buffer(data, n), use_awaitable);
        }
    }
    catch (std::exception& e) {
        std::cout << "Echo exception is " << e.what() << std::endl;
    }
}

awaitable<void>类型允许函数在执行时可以被暂停和恢复,这使得它能够与 co_await 一起使用,所以函数返回类型必须是awaitable<void>。

echo 函数能够高效处理多个客户端连接而不阻塞线程,主要是因为:

  • echo 函数使用 socket.async_read_some 和 async_write 方法进行异步读写操作。这意味着当函数执行这些操作时,它不会阻塞当前线程,而是可以在等待 I/O 完成时让出控制权。
  • 使用协程和 co_await,当 I/O 操作挂起时,协程会被暂停并释放线程。这使得同一线程可以处理其他任务或更多的连接,而不需要为每个连接创建新的线程。
  • 服务器的主循环(io_context.run())会持续运行,处理所有已准备好的异步操作。这样一来,多个连接可以并发处理,而不需要多个线程同时活跃。
  • 当协程通过 co_await 等待 I/O 操作时,它不会占用 CPU 资源。主线程可以继续接受新的连接或处理其他已完成的操作,从而提高并发能力。

其中

std::size_t n = co_await socket.async_read_some(boost::asio::buffer(data), use_awaitable);

该段代码使用co_await 关键字等待异步读取操作完成,并将读取的字节数存储到n中。和之前异步服务器异步操作需要绑定回调函数不同,这里通过协程实现的并发服务器读写通过co_await 关键字和use_awaitable适配器组合使用,会自动处理异步操作的结果。当调用 socket.async_read_some 时,协程会暂停执行,并在操作完成时恢复。这个机制隐藏了回调的复杂性,使得代码更简洁和易读。当异步操作完成时,协程会自动继续执行,并将结果传递给 n 。

co_await async_write(socket, boost::asio::buffer(data, n), use_awaitable);

同理,异步写函数也以同步的方式使用,不需要显示bind回调函数,co_await 关键字会等待异步读取操作完成,而适配器use_awaitable允许将异步操作的结果直接与协程的执行流结合

c. listener()

awaitable<void> listener() {
    auto executor = co_await this_coro::executor;
    tcp::acceptor acceptor(executor, { tcp::v4(), 10086 });
    for (;;) {
        tcp::socket socket = co_await acceptor.async_accept(use_awaitable);
        co_spawn(executor, echo(std::move(socket)), detached);
    }
}

该函数不断监听 TCP 端口,接受来自客户端的连接。每当有新连接到达时,它会启动一个 echo 协程来处理该连接。这种设计使得服务器能够同时处理多个客户端连接而不会阻塞,提高了并发处理能力。

auto executor = co_await this_coro::executor;

获取执行器:

  • this_coro::executor 是特殊的上下文,用于获取当前协程的执行器(executor),它定义了协程将在哪个上下文(io_context)中运行
  • co_await 关键字使得协程在获取执行器时可以暂停,并在获取到执行器后恢复执行。
co_spawn(executor, echo(std::move(socket)), detached);
  • 启动处理协程:
    • co_spawn 启动一个新的协程
    • executor 指定了新的协程的执行上下文
    • echo(std::move(socket)) 创建一个新的 echo 协程来处理该连接。std::move(socket) 将 socket 移动到 echo 协程中,避免不必要的拷贝。移动socket之后,上面的socket便无法发挥作用,因为该socket已经被移动至echo中。
    • detached 表示新协程的执行不需要主协程等待其完成。

d. main()

int main()
{
    try {
        boost::asio::io_context io_context(1); // 1被用于提供有关所需并发级别的提示
        boost::asio::signal_set signals(io_context, SIGINT, SIGTERM);
        signals.async_wait([&](auto, auto) { // 处理几个信号就传入几个参数,这里使用auto自动推断
            io_context.stop();
            });
        co_spawn(io_context, listener(), detached);
        io_context.run();

    }
    catch (std::exception& e) {
        std::cout << "Exception is " << e.what() << std::endl;
    }
}

io_context有多个重载,这里使用的重载原型为

explicit io_context(int concurrency_hint);

concurrency_hint用来提示实现该类的系统,它应当允许多少个线程(不是协程)同时运行。

  • concurrency_hint=0时,则I/O操作的实现将使用默认的并发级别,此时,io_context 将根据内部实现和系统资源自动决定使用多少线程;
  • concurrency_hint=1时,则I/O操作的实现将尝试最小化线程的创建,并且不会创建额外的工作线程,常表示仅使用一个线程来处理所有 I/O 操作,适用于大多数简单的应用场景,避免不必要的线程开销;
  • concurrency_hint>1时,则I/O操作的实现将允许同时运行多个工作线程,允许程序在多个线程中并行处理 I/O 操作,从而提高性能。
        signals.async_wait([&](auto, auto) { // 处理几个信号就传入几个参数,这里使用auto自动推断
            io_context.stop();
            });

信号处理,当遇到退出信号(ctrl+c或强制终止信号)时,执行lambd函数,停止ioc的运行。

co_spawn(io_context, listener(), detached);

启动一个listener协程,开始监听客户端连接,并且这个协程的执行不需要主协程等待其完成。

2. 客户端

#include <iostream>
#include <boost/asio.hpp>

const int MAX_LENGTH = 1024;

int main()
{
    try {
        boost::asio::io_context ioc;
        boost::asio::ip::tcp::endpoint remote_ep(boost::asio::ip::address::from_string("127.0.0.1"), 10086);
        boost::asio::ip::tcp::socket sock(ioc);
        boost::system::error_code error = boost::asio::error::host_not_found;
        sock.connect(remote_ep, error);
        if (error) {
        std::cout << "connect failed, code is " << error.value() << " error msg is " << error.what() << std::endl;

        return 0;
        }

        std::cout << "Enter message: ";
        char request[MAX_LENGTH];
        std::cin.getline(request, MAX_LENGTH);
        size_t request_length = strlen(request);
        boost::asio::write(sock, boost::asio::buffer(request, request_length));

        char reply[MAX_LENGTH];
        size_t reply_length = boost::asio::read(sock, boost::asio::buffer(reply, request_length));
        std::cout << "reply is " << std::string(reply, reply_length) << std::endl;
        getchar();
    }
    catch (std::exception& e) {
        std::cerr << "Exception is " << e.what() << std::endl;
    }

    return 0;
}

和之前的客户端处理基本类似,只不过忽略了消息节点封装和序列号操作。

3. 修改之前的服务器函数

void CSession::Start() {
    auto shared_this = shared_from_this();
    //开启接收协程
    co_spawn(_io_context, [=]()->awaitable<void> {
        try {
            for (;!_b_close;) {
                _recv_head_node->Clear();
                std::size_t n = co_await boost::asio::async_read(_socket,
                    boost::asio::buffer(_recv_head_node->_data, HEAD_TOTAL_LEN),
                    use_awaitable);
                if (n == 0) {
                    std::cout << "receive peer closed" << endl;
                    Close();
                    _server->ClearSession(_uuid);
                    co_return;
                }
                //获取头部MSGID数据
                short msg_id = 0;
                memcpy(&msg_id, _recv_head_node->_data, HEAD_ID_LEN);
                //网络字节序转化为本地字节序
                msg_id = boost::asio::detail::socket_ops::network_to_host_short(msg_id);
                std::cout << "msg_id is " << msg_id << endl;
                //id非法
                if (msg_id > MAX_LENGTH) {
                    std::cout << "invalid msg_id is " << msg_id << endl;
                    _server->ClearSession(_uuid);
                    co_return;
                }
                short msg_len = 0;
                memcpy(&msg_len, _recv_head_node->_data + HEAD_ID_LEN, HEAD_DATA_LEN);
                //网络字节序转化为本地字节序
                msg_len = boost::asio::detail::socket_ops::network_to_host_short(msg_len);
                std::cout << "msg_len is " << msg_len << endl;
                //长度非法
                if (msg_len > MAX_LENGTH) {
                    std::cout << "invalid data length is " << msg_len << endl;
                    _server->ClearSession(_uuid);
                    co_return;
                }
                _recv_msg_node = make_shared<RecvNode>(msg_len, msg_id);
                //读出包体
                n = co_await boost::asio::async_read(_socket,
                    boost::asio::buffer(_recv_msg_node->_data, _recv_msg_node->_total_len), use_awaitable);
                if (n == 0) {
                    std::cout << "receive peer closed" << endl;
                    Close();
                    _server->ClearSession(_uuid);
                    co_return;
                }
                _recv_msg_node->_data[_recv_msg_node->_total_len] = '\0';
                cout << "receive data is " << _recv_msg_node->_data << endl;
                //投递给逻辑线程
                LogicSystem::GetInstance().PostMsgToQue(make_shared<LogicNode>(shared_from_this(), _recv_msg_node));
            }
        }
        catch (std::exception& e) {
            std::cout << "exception is " << e.what() << endl;
            Close();
            _server->ClearSession(_uuid);
        }
        }, detached);
}

在新的Session中,不需要绑定回调函数进行处理,而是通过关键字co_await 和适配器use_awaitable,使异步函数通过同步方式写出来,在一个函数中进行数据的粘包处理、网络序列-本地序列转换、序列化处理,并将消息投递至逻辑队列。

通过协程实现并发服务器可大大减少代码量,相比异步编程更加直观,但受限于平台,目前C++20的协程说是协程库,实际上只是开放了无栈协程的协议,正儿八经的官方协程还未发布。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/2206240.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

【python】之socket编程(附带无偿源码)

本章内容 1、socket 2、IO多路复用 3、socketserver Socket socket起源于Unix&#xff0c;而Unix/Linux基本哲学之一就是“一切皆文件”&#xff0c;对于文件用【打开】【读写】【关闭】模式来操作。socket就是该模式的一个实现&#xff0c;socket即是一种特殊的文件&…

【路径规划】自主机器人的路径规划和导航

摘要 本文讨论了如何利用路径规划算法对自主机器人进行路径规划和导航。自主机器人在环境中的路径规划是通过参考路径与机器人的当前位置进行比对&#xff0c;采用纯追踪算法&#xff08;Pure Pursuit&#xff09;进行路径跟踪&#xff0c;以确保机器人沿预定路线行驶。本文通…

黑马程序员C++核心编程学习笔记

黑马程序员C核心编程学习笔记 一、内存 1.1 内存四区 C程序在执行时&#xff0c;将内存大致分为4个区域&#xff1a;代码区&#xff0c;全局区&#xff0c;栈区&#xff0c;堆区 代码区&#xff1a;存放函数体的的二进制代码&#xff0c;操作系统管理。 &#x1f535;特点&a…

从数据管理到功能优化:Vue+TS 项目实用技巧分享

引言 在项目开发过程中&#xff0c;优化用户界面和完善数据处理逻辑是提升用户体验的重要环节。本篇文章将带你一步步实现从修改项目图标、添加数据、优化日期显示&#xff0c;到新增自定义字段、调整按钮样式以及自定义按钮跳转等功能。这些操作不仅提升了项目的可视化效果&am…

双十一适合买什么?2024双十一值得入手好物推荐

即将来临的2024年双十一&#xff0c;有哪些超值宝贝会令人忍不住疯狂下单呢&#xff1f;双十一购物狂欢节&#xff0c;这个一年一度的盛大庆典&#xff0c;向来使我们这些热衷于购物的消费者们激动万分。那么&#xff0c;在今年的双十一&#xff0c;究竟有哪些商品能够成功吸引…

利用FnOS搭建虚拟云桌面,并搭建前端开发环境(二)

利用FnOS搭建虚拟云桌面&#xff0c;并搭建前端开发环境 二 一、docker镜像二、环境配置三、核心环境配置流程文档 利用FnOS搭建虚拟云桌面&#xff0c;并搭建前端开发环境&#xff08;一&#xff09; 上一章安装了飞牛FnOS系统&#xff0c;界面如下&#xff0c;这一张配置前端…

Docker安装Minio+SpringBoot上传下载文件

Docker 安装Minio docker pull minio/minio docker images REPOSITORY TAG IMAGE ID CREATED SIZE minio/minio latest 162489e21d26 7 days ago 165MB nginx latest 7f553e8bbc89 7 days ago 192MB # 外挂磁盘存储使用 mkdir -p…

高清实拍类型视频素材网站推荐

大家好&#xff0c;我是一名新媒体创作者&#xff0c;今天想和大家分享一些平时常用的高清实拍类型视频素材资源。作为新媒体人&#xff0c;视频素材的质量直接影响作品的受欢迎程度&#xff0c;因此找到优质的视频素材库非常重要。接下来&#xff0c;我将为大家推荐一些非常优…

计算机前沿技术-人工智能算法-大语言模型-最新研究进展-2024-10-12

计算机前沿技术-人工智能算法-大语言模型-最新研究进展-2024-10-12 1. Autoregressive Large Language Models are Computationally Universal D Schuurmans, H Dai, F Zanini - arXiv preprint arXiv:2410.03170, 2024 https://arxiv.org/pdf/2410.03170 自回归大型语言模型…

太速科技-628-基于VU3P的双路100G光纤加速计算卡

基于VU3P的双路100G光纤加速计算卡 一、板卡概述 基于Xilinx UltraScale16 nm VU3P芯片方案基础上研发的一款双口100 G FPGA光纤以太网PCI-Express v3.0 x16智能加速计算卡&#xff0c;该智能卡拥有高吞吐量、低延时的网络处理能力以及辅助CPU进行网络功能卸载的能力…

UE5安卓,多指点击时会调出控制台

参考文章&#xff1a; How to turn off "console window" on swipe (my Lemurs keep opening it!) - Platform & Builds / Mobile - Epic Developer Community Forums (unrealengine.com) 准确来说是4只手指同时在屏幕中按下。这个控制台能像编辑器那样&#xf…

浏览器和客户端结合的erp系统,java控制浏览器操作自动登录,socket客户端通信进行表单赋值

java做一个toB的客户端操作系统&#xff0c;客户端和web的结合&#xff1b; 主要是使用java编写客户端代码&#xff0c;采用selenium控制浏览器&#xff0c;主要是用到selenium自动化测试的功能&#xff1b; javaEE 项目调用 selenium使用谷歌控件chromedriver.exe控制浏览器…

小米员工薪资一览表

小米 之前我们写了 京东 和 华为OD&#xff0c;不少同学在后台点名要看小米的职级和薪资。 没问题&#xff0c;在了解小米的薪资分布前&#xff0c;我们要先对小米职级有个初步概念。 小米职级从 13 到 22&#xff0c;共 10 级。 title 大致分为 专员&#xff08;13~15级&#…

go语言中的template使用

在 Go 语言中&#xff0c;你可以使用 text/template 或 html/template 包来创建和执行模板。以下是一个基本示例&#xff0c;展示如何使用 Go 的模板语法&#xff1a; 1. 导入包 import ("os""text/template" )2. 创建数据结构 定义一个数据结构&#x…

反向指标KDJ?只要做个简单的魔改,就能一直在新高路上!

KDJ又叫随机指标&#xff0c;是一个适用于短线的技术指标&#xff0c;在股票、期货等市场受到广泛使用。在老Q看来&#xff0c;这是一个很有趣的指标。但是如果你按照经典用法来使用的话&#xff0c;它就变成财富毁灭机了&#xff01; 下边&#xff0c;老Q就一步步从统计原理、…

【阿里云中的大数据组件】技术选型和数仓系统流程设计 --- 阿里云的组件简介

文章目录 一、DataHub二、DataWorks 和 MaxCompute三、RDS四、技术选型和对比1、阿里云技术跟之前的技术对比2、技术选型 五、系统流程设计 一、DataHub 通俗来说这个 DataHub 类似于传统大数据解决方案中 Kafka 的角色&#xff0c;提供了一个数据队列功能 对于离线计算&#x…

ES 全文检索完全匹配高亮查询

我们ES会将数据文字进行拆词操作&#xff0c;并将拆解之后的数据保存到倒排索引当中几十使用文字的一部分也能查询到数据&#xff0c;这种检索方式我们就称之为全文检索&#xff0c;ES的查询结果也会倒排索引中去查询匹配 下面的查询结果中输入的词&#xff0c;就是输入小也可…

PDF文件怎么添加水印?这里有6个方法

PDF文件怎么添加水印&#xff1f;在职场中&#xff0c;随着信息数字化的普及&#xff0c;PDF文件已成为我们日常工作中不可或缺的一部分。然而&#xff0c;如何在这些文件中确保信息的安全性和版权保护&#xff0c;成为了许多企业面临的重要课题。其中&#xff0c;给PDF文件添加…

Android常用组件

目录 1. TextView 控件 常用属性: 1&#xff09;android:text: 2&#xff09;android:gravity: 3&#xff09;android:textSize: 4&#xff09;android:textColor: 5&#xff09;android:background: 6&#xff09;android:padding: 7&#xff09;android:layout_width 和 andr…

Web集群服务-Nginx

1. web服务 1. WEB服务:网站服务,部署并启动了这个服务,你就可以搭建一个网站 2. WEB中间件: 等同于WEB服务 3. 中间件:范围更加广泛,指的负载均衡之后的服务 4. 数据库中间件:数据库缓存,消息对列 2. 极速上手指南 nginx官网: nginx documentation 2.1 配置yum源 vim /etc/…