rust tokio select!宏详解

news2025/2/22 11:12:47

rust tokio select!宏详解

简介

本文介绍Tokioselect!的用法,重点是使用过程中可能遇到的问题,比如阻塞问题、优先级问题、cancel safe问题。在Tokio 中,select! 是一个宏,用于同时等待多个异步任务,并在其中任意一个任务完成时执行相应的逻辑。

基本用法

如下代码演示了如何使用 Tokio 库实现一个异步的消息传递系统,其中包括三个无限通道和一个关闭通道。程序使用了 select! 宏来等待通道和关闭通道的事件,并在事件发生时执行相应的操作。

程序的主要步骤如下:

  1. 创建三个无限通道和一个用于传递关闭信号的通道。
  2. 向三个通道中发送一些数据。
  3. 开启一个异步任务并在两秒后发送关闭信号。
  4. 在主循环中使用 select! 宏等待通道和关闭通道的事件。
  5. 当一个通道接收到数据时,打印出数据。
  6. 当关闭通道接收到信号时,退出循环。

程序中的 select! 宏使用了类似于 match 的语法,但是它可以同时等待多个异步事件。当其中一个事件发生时,宏将执行相应的代码块,并跳出循环。在本例中,当一个通道接收到数据时,打印出数据;当关闭通道接收到信号时,退出循环。
select!经常与loop搭配使用,循环地从多个通道中接收事件并处理。

use std::time::Duration;

use tokio::select;

#[tokio::main]
async fn main() {
    let (sender1, mut receiver1) = tokio::sync::mpsc::unbounded_channel::<String>();
    let (sender2, mut receiver2) = tokio::sync::mpsc::unbounded_channel::<String>();
    let (sender3, mut receiver3) = tokio::sync::mpsc::unbounded_channel::<String>();

    let (shutdown_sender, mut shutdown_receiver) = tokio::sync::watch::channel(());
    for i in 0..3 {
        sender1.send(i.to_string()).unwrap();
        sender2.send(i.to_string()).unwrap();
        sender3.send(i.to_string()).unwrap();
    }

    tokio::spawn(async move {
        tokio::time::sleep(Duration::from_secs(2)).await;
        shutdown_sender.send(()).unwrap(); //两秒后关闭
    });

    loop {
        select! {
            ret = receiver1.recv() => {
                println!("channel 1 received: {:?}", ret);
            },
            ret = receiver2.recv() => {
                println!("channel 2 received: {:?}", ret);
            },
            ret = receiver3.recv() => {
                println!("channel 3 received: {:?}", ret);
            },
            _ = shutdown_receiver.changed() => {
                println!("shutdown received");
                break;
            }
        };
    }
}

可能遇到的坑

阻塞

select中的各个分支是并行执行的,这里的并行是指分支中的各个future在并行执行。不过一旦某个分支的future完成并进入了分支代码块,如果在分支代码中有一些阻塞的操作,则其他分支是没有机会执行的。
比如下面代码,在receiver1.recv()完成时,sleep了10s,sleep期间其他的分支是不会执行的。即使在2s后发送了shutdown信号,select!因为无法及时处理此信号,实际上循环也无法退出。

 loop {
        select! {
            ret = receiver1.recv() => {
                println!("channel 1 received: {:?}", ret);
                tokio::time::sleep(Duration::from_secs(10)).await;//这里等待期间,其他的分支是无法被执行的
            },
            ret = receiver2.recv() => {
                println!("channel 2 received: {:?}", ret);
            },
            ret = receiver3.recv() => {
                println!("channel 3 received: {:?}", ret);
            },
            _ = shutdown_receiver.changed() => {
                println!("shutdown received");
                break;
            }
        };
    }

这个坑在网络编程中比较容易踩到,比如select这里是从channel中取出上层应用传来的数据,并将其写入到socket中,而写socket的操作是有可能阻塞的,阻塞期间其他的分支是无法执行的。

顺序

1、默认情况下select中的各个分支执行顺序是随机的,比如上面例子中三个channel都有消息的情况下,具体去执行哪个分支是随机的。执行结果如下:
在这里插入图片描述
2、如果想要区分优先级,可以加标志biased,这样每次select将会按照从上到下的顺序去poll每个future,也就是说优先级顺序是从上往下的。比如某些场景下需要按优先级处理各个channel中的数据时这个特性就很有用。代码如下:

    loop {
        select! {
            biased;//按顺序优先执行
            ret = receiver1.recv() => {
                println!("channel 1 received: {:?}", ret);
            },
            ret = receiver2.recv() => {
                println!("channel 2 received: {:?}", ret);
            },
            ret = receiver3.recv() => {
                println!("channel 3 received: {:?}", ret);
            },
            _ = shutdown_receiver.changed() => {
                println!("shutdown received");
                break;
            }
        };
    }

运行结果如下:
在这里插入图片描述
3、顺序执行时注意饿死问题
添加了biased标志后,顺序靠前的future总是先被执行,在上述例子中,极端情况下如果靠前的channel总是有数据,那后面的channel就没有机会被执行。比如例子中如果前三个channel中一直有数据,那shutdown_receiver就无法收到shutdown信号,导致程序功能不符合预期。
解决这个问题很简单,就是把更关键的控制性的future放在最前方。

关于cancel safe

select!中如果某个分支future completed了,会将其他分支的future cancel掉,这个cancel操作要格外小心,因为如果future不是cancel safe的可能会丢数据。tokio的官方文档中给出了常见的cancel safe和不safefuture
那么如何判断自己实现的future是否是cancel safe的呢? 很简单、只需要思考如果future中的代码执行到.await时被cancel了,是否是安全的。我们来看下cancel unsafe的代码长啥样:

pub async fn read_and_write(mut message_recevier: UnboundedReceiver<Bytes>, mut file: File) {
    let message = message_recevier.recv().await.unwrap();
    file.write(&message).await.unwrap();
}

该方法从一个channel中读取消息,并将此消息写入到文件中,这个future就明显不是cancel safe的。为啥呢?试想一下,此futurechannel中读到消息之后,在写文件时被cancel掉了,那message岂不是就丢了。
实际项目中一定要格外小心这个cancel safe问题,很容易造成丢数据或者数据重复等不良反应,而且一旦出现了还很难复现、不太容易想到是这里的问题。网络编程中尤其要注意tokio::io::AsyncWriteExt::write_all不是cancel safe的,因为它内部可能是多次调用write操作才将所有缓冲区写入。

数量

1、首先select!中的分支仅支持显式地用代码书写,无法动态增减。就是说在写代码时select中的futures数量就固定了,程序运行过程中无法动态删减。
2、目前最多支持64个分支。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/1257213.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

jenkins流水线(pipline)实例

1、pipline 语法介绍 声明式的pipeline语法格式 1. 所有的声明都必须包含在pipeline{}中 2. 块只能有节段&#xff0c;指令&#xff0c;步骤或者赋值语句组成 3. 阶段&#xff1a;agent&#xff0c;stages&#xff0c;post&#xff0c;steps 4. 指令&#xff1a;environment&a…

独乐乐不如众乐乐(二)-某汽车零部件厂商IC EMC企业规范

前言&#xff1a;该汽车零部件厂商关于IC EMC的规范可能是小编看过的企业标准里要求最明确的一份企业标准了&#xff0c;充分说明了标准方法不是死的&#xff0c;可以灵活应用。 先看看这份规范的抬头&#xff1a; 与其他企业规范一样&#xff0c;该汽车零部件厂商的IC EMC规范…

设计模式精讲:掌握单例模式的实现与优化

掌握单例模式的实现与优化 一、引言&#xff1a;如何学习设计模式&#xff1f;二、前置知识&#xff1a;对象的创建的销毁2.1、拷贝构造2.2、拷贝赋值构造2.3、移动构造2.4、移动赋值构造 三、单例模式的定义四、单例模式的实现与优化4.1、版本一4.2、版本二4.3、版本三4.4、版…

Java PriorityQueue

一般情况下, 我们使用队列是为了能够建造队列的先进先出 (First-In-First-Out) 模式的, 达到一种资源的公平分配, 先到达的任务 (元素) 先处理, 但有时需要在队列中基于优先级处理对象。 存入队列中的任务 (元素) 具有优先级, 需要根据优先级修复里面的数据。而在 JDK 1.5 引入…

python爬虫进阶篇(异步)

学习完前面的基础知识后&#xff0c;我们会发现这些爬虫的效率实在是太低了。那么我们需要学习一些新的爬虫方式来进行信息的获取。 异步 使用python3.7后的版本中的异步进行爬取&#xff0c;多线程虽然快&#xff0c;但是异步才是爬虫真爱。 基本概念讲解 1.什么是异步&…

光线追踪-Peter Shirley的RayTracing In One Weekend系列教程(book1-book3)代码分章节整理

自己码完了一遍了&#xff0c;把代码分章节整理了一下&#xff0c;可以按章节独立编译&#xff0c;运行, 也可以直接下载编译好的release版本直接运行。 项目地址&#xff1a; Github: https://github.com/disini/RayTracingInOneWeekendChaptByChapt ​ ​ ​ ​

【赠书第8期】工程效能十日谈

文章目录 前言 1 工程效能十日谈 1.1 制定清晰的目标和计划 1.2 引入先进的技术和工具 1.3 建立有效的沟通机制 1.4 灵活应对变化 1.5 确保资源充足 1.6 进行有效的风险管理 1.7 进行持续的监控和评估 1.8 优化团队合作 1.9 注重质量管理 1.10 进行项目总结和反思 …

【Amazon】安装Cloudwatch代理监控EC2

文章目录 一、实验概要二、实验操作步骤2.1 创建 CloudWatch 代理运行角色2.2 安装 CloudWatch 代理软件包2.3 使用 CloudWatch代理收集指标2.4 CloudWatch指标收集确认 三、参考链接 一、实验概要 使用 CloudWatch 代理从 Amazon EC2 实例和本地服务器中收集指标、日志和跟踪信…

NetSuite 应用性能管理器(APM)

前段时间&#xff0c;我们发过一个文章谈系统健康检查。 NetSuite ERP系统健康检查-CSDN博客文章浏览阅读119次。“健康检查”本质上是属于信息化持续改善的组成部分。如果说信息化是一个持续不断的过程的话&#xff0c;那么“健康检查”就是持续不断的PDCA中的Check那一环。h…

pygame加载图像,并让小球做平抛运动

文章目录 load转换和存储自由落体 在游戏设计中&#xff0c;图像显示是必不可少的功能&#xff0c;pygame中的image模块便用于加载图像。 load 通过load函数&#xff0c;可以加载多种图像格式&#xff0c;如下表所示 旧版本bmp, gpeg, png, pcx, tiff, xpmc, lbm(以及pbm, p…

CountDownLatch实战应用——批量数据多线程协调异步处理(主线程执行事务回滚)

&#x1f60a; 作者&#xff1a; 一恍过去 &#x1f496; 主页&#xff1a; https://blog.csdn.net/zhuocailing3390 &#x1f38a; 社区&#xff1a; Java技术栈交流 &#x1f389; 主题&#xff1a; CountDownLatch实战应用——批量数据多线程协调异步处理(主线程执行事务…

MIT 6.824 -- MapReduce Lab

MIT 6.824 -- MapReduce Lab 环境准备实验背景实验要求测试说明流程说明 实验实现GoLand 配置代码实现对象介绍协调器启动工作线程启动Map阶段分配任务执行任务 Reduce 阶段分配任务执行任务 终止阶段 崩溃恢复 注意事项并发安全文件转换golang 知识点 测试 环境准备 从官方gi…

nginx配置文件的简单结构

nginx的配置文件&#xff08;nginx.conf&#xff09;整体上可分为三个部分&#xff1a;全局块、events块、http块 区域职责全局块配置和nginx运行相关的全局配置events块配置和网络连接相关的配置http块配置代理、缓存、日志记录、虚拟主机等配置在http块中&#xff0c;可以包含…

Linux的基本指令(四)

目录 前言 时间相关的指令 date指令 时间戳 日志 时间戳转化为具体的时间 cal指令 find指令&#xff08;十分重要&#xff09; grep指令&#xff08;行文本过滤工具&#xff09; 学前补充 什么是打包和压缩&#xff1f; 为什么要打包和压缩&#xff1f; 怎么打包和…

【洛谷算法题】P5715-三位数排序【入门2分支结构】

&#x1f468;‍&#x1f4bb;博客主页&#xff1a;花无缺 欢迎 点赞&#x1f44d; 收藏⭐ 留言&#x1f4dd; 加关注✅! 本文由 花无缺 原创 收录于专栏 【洛谷算法题】 文章目录 【洛谷算法题】P5715-三位数排序【入门2分支结构】&#x1f30f;题目描述&#x1f30f;输入格式…

基于OGG实现MySQL实时同步

&#x1f4e2;&#x1f4e2;&#x1f4e2;&#x1f4e3;&#x1f4e3;&#x1f4e3; 哈喽&#xff01;大家好&#xff0c;我是【IT邦德】&#xff0c;江湖人称jeames007&#xff0c;10余年DBA及大数据工作经验 一位上进心十足的【大数据领域博主】&#xff01;&#x1f61c;&am…

常见树种(贵州省):020女贞、异叶梁王茶、掌叶梁王茶、鹅掌柴、楤木、柞木、华重阳木、马蹄荷、山桐子、刺楸

摘要&#xff1a;本专栏树种介绍图片来源于PPBC中国植物图像库&#xff08;下附网址&#xff09;&#xff0c;本文整理仅做交流学习使用&#xff0c;同时便于查找&#xff0c;如有侵权请联系删除。 图片网址&#xff1a;PPBC中国植物图像库——最大的植物分类图片库 一、女贞 …

我的创作纪念日-五周年

机缘 5年前&#xff0c;作为一名技术人员&#xff0c;平时利用CSDN作为学习平台工具&#xff0c;帮助解决工作中遇到的问题。随着30、35中年危机渐行渐近&#xff0c;回过头来发现平时虽然也有记录整理学习笔记的习惯&#xff0c;但还没有一个可以持续鞭笞自己和记录自己学习的…

C#,数值计算——插值和外推,RBF_fn 与 RBF_gauss 的计算方法与源程序

1 文本格式 using System; namespace Legalsoft.Truffer { public interface RBF_fn { double rbf(double r); } } ---------------------------------------------- using System; namespace Legalsoft.Truffer { public class RBF_gauss : RBF…

transformers pipeline出现ConnectionResetError的解决方案

大家好,我是爱编程的喵喵。双985硕士毕业,现担任全栈工程师一职,热衷于将数据思维应用到工作与生活中。从事机器学习以及相关的前后端开发工作。曾在阿里云、科大讯飞、CCF等比赛获得多次Top名次。现为CSDN博客专家、人工智能领域优质创作者。喜欢通过博客创作的方式对所学的…