Rust Web 进阶(一):Rust异步编程(Tokio)

news2024/11/27 20:58:35

这一篇将讲解什么是和为什么进行异步编程,Rust 怎么样进行异步编程以及其机制,并且讲解目前 rust 常用的异步编程的函数库。本章的内容来自杨旭老师的教程:

https://www.bilibili.com/video/BV16r4y187P4/?spm_id_from=333.999.0.0&vd_source=8595fbbf160cc11a0cc07cadacf22951

选择多线程/异步编程的理由

并发 是指程序的不同部分可以同时不按照顺序的执行并且不影响最终结果的能力,比如我们有一个任务是计算 1+1 , 2+2 再把结果累加起来,那么 1+ 1 和 2+2 可以同时运行,或者 2+2 先运行 1+1 后运行,或者 1+1 先运行,结果都是 6 不会发生变化。

并行 同时可以执行多个任务。比如我们有两个任务计算输出 1+1 和 2+2 ,那么他们可以一起运行,之后输出 2 和 4 ,当然我们并不知道哪个任务先完成,所以我们不知道 2 和 4 ,谁会先输出。

本教程中将并发和并行简称统称为并发

我们需要让程序并发/并行执行的原因是:

  • 用户层面:让软件运行的更快,因为我们有多个任务可以同时进行或者更好的调度进行,所以程序的效率可以大大提高。
  • 计算机层面:多CPU 和 多核CPU 的出现以及 CPU调度算法的实现,让异步编程成为了可能,让我们能写出整体性更好的程序。

软件程序处理任务有两种类型:

  • CPU密集型:占用CPU的资源,比如文件压缩,视频编码等。通常可以利用多CPU或者多核心CPU 进行处理。
  • IO 密集型:占用IO资源的任务。从文件系统或者数据库访问数据,处理HTTP/TCP 请求。例如在 web 服务器中,我们通过 CRUD 将数据传递过来,这时要求CPU 等待数据写入磁盘,但是磁盘很慢,CPU就是干等的状态,这时候如果是异步编程就可以让CPU执行其他的任务。

多线程和异步编程的使用场景

假如我们有一个程序处理包含三个操作:处理数据、阻塞、把任务返回的数据打包。如下是同步编程、多线程和异步编程的区别:

  • 同步需要等待程序阻塞结束才能进行下一个任务

  • 多线程可以同时进行多个任务,效率更高

  • 异步编程在 task1 阻塞的时候调用了其他的任务,也提升了效率

请添加图片描述

如上图,多线程看似是效率最高的,但是我们可以看下一个场景:

假设我们有一个 web 服务器,它接收多个请求,我们可以使用多线程来解决问题,针对每一个请求,都开启一个原生的系统线程来处理它,但是它引入了新的问题,请求执行的顺序无法预测,同时会产生死锁和竞争资源等问题,很可能出现我们先删除了数据,再查询数据的情况。

同时我们的Rust 是1:1线程模型,它对总线程数的有限制的,所以多线程不一定适合所有的场景。而这个场景如果使用异步编程来运行的话,对于每个任务,服务器生成一个任务来处理它,由异步运行时安排各个异步任务在可用的 CPU上执行。

异步编程就是 ,在CPU 等待外部事件或者动作的时候,异步运行时会安排其他的可继续执行的任务在 CPU 上执行。而从磁盘或者IO 的中断到达时,异步运行时会识别这件事,安排原来的任务继续执行。一般来说 IO 受限的程序(运行速度依赖于 IO 的速度)比起 CPU 受限的任务 (运行速度依赖于 CPU 的速度),更加适合异步任务的执行。

同步、异步和多线程的代码例子

下面是一些简单的例子,我们编写两个函数,分别阻塞 4 秒和 2 秒后返回,如果我们像这样编写函数同步运行这两个函数,我们会在 4 秒后得到 1 ,再在 2 秒后得到 2,总计运行 6 秒:

use std::thread::sleep;
use std::time::Duration;

fn main() {
    println!("Hello, world!");
    let file1_content = read1();
    println!("{}",file1_content);
    let file2_content = read2();
    println!("{}",file2_content);
}

fn read1() -> String {
    sleep(Duration::new(4,0));
    String::from("1")
}

fn read2() -> String {
    sleep(Duration::new(2,0));
    String::from("2")
}

如果我们将它改造成多线程的运行模式,我们会在 2 秒后得到 2 ,再在 2 秒后得到 1,总计运行 4 秒:

use std::thread::{sleep, self};
use std::time::Duration;

fn main() {
    println!("Hello, world!");

    let sp1 = thread::spawn(||{
        let file1_content = read1();
        println!("{}",file1_content);
    });
    let sp2 = thread::spawn(||{
        let file2_content = read2();
        println!("{}",file2_content);
    });
  
   sp1.join().unwrap();
   sp2.join().unwrap();

}

fn read1() -> String {
    sleep(Duration::new(4,0));
    String::from("1")
}

fn read2() -> String {
    sleep(Duration::new(2,0));
    String::from("2")
}

最后我们将它改造成异步的,我们需要引入一个依赖:

[dependencies]
tokio = {version = "1", features = ["full"]}

之后将我们的代码改写成这样,运行后我们可以得到和多线程一样的效果,但是这两个任务可以执行在同一个线程上,也可以执行在不同的线程上,这依赖于我们异步运行时的调度:

use std::thread::sleep;
use std::time::Duration;

#[tokio::main]
async fn main() {
    println!("Hello, world!");

    let sp1 = tokio::spawn(async {
        read1().await;
    });
    let sp2 = tokio::spawn(async {
        read2().await;
    });

    let _ = tokio::join!(sp1, sp2);
}

async fn read1() -> String {
    sleep(Duration::new(4, 0));
    println!("1");
    String::from("1")
}

async fn read2() -> String {
    sleep(Duration::new(2, 0));
    println!("2");
    String::from("2")
}

理解 Rust 的异步编程

  • async

在 rust 中,我们可以使用 async 和 await 关键字来进行异步编程,如果一个函数是 async (异步) 的,那么你需要在调用它的时候加上 await 关键字,在 await 的过程中,CPU可以去分配别的任务,直到 这个进程运行完毕再继续执行后续的操作。同样如果你的函数里使用了需要 await 的异步函数,那么你调用这些函数的函数也是异步的:

比如下面的例子, async_read 是一个可以读取文本内容的异步函数,显然,读取文本是一个 IO 操作,它在进行 IO 操作的过程中可以让CPU 去做别的操作,所以它是异步的。

因为是异步的,所以调用它的时候要使用 await 关键字,这会让它等待直到 IO 完毕后继续执行这个任务。

因为使用了异步的 async_read 函数,那么我们的 hello 函数也是 async 的,所以我们要加上修饰。

async fn hello() {
    let content = async_read("a.txt").await;
    println!("{}", content);
    let content = async_read("b.txt").await;
    println!("{}", content);
}
  • Future

Rust 的异步是由 Future 机制实现的, Future 是由异步计算或者函数计算产生的值,Rust 异步函数都会返回 Future。

Future 里具有一个 poll 方法,用于检查异步任务是不是完成了,它返回一个枚举 Poll,具有 Pending 和 Ready(val)两个值,Pending 表示没有完成,而 Ready(val)表示异步方法完成了,val 是返回的值。

pub trait Future {
    type Output;
    pub fn poll(self: Pin<&mut Self>, ctx: &mut Context<'_>) -> Poll<Self::Output>;
}

pub enum Poll<T> {
    Ready(T),
    Pending,
}

异步执行器是异步运行时的一部分,它会管理一个 Future 的集合,并通过调用 Future 上的 poll 方法来驱动他们完成,在 Rust 中 Async 是一个语法糖,在加上了 async 之后,就相当于告诉异步执行器它返回一个 Future ,这个 Future 会被驱动直到完成。所以我们在上一个部分中的例子相当于是这样的:

fn read1() -> impl Future<Output = String> {
    async {
        sleep(Duration::new(4, 0));
        println!("1");
        String::from("1")
    }
}

我们可以将我们的代码改造成这样,其中一个任务使用我们的自己编写的 Future ,它只会返回 Pending ,现在我们运行我们的项目,sp1 它永远不会结束,返回 poll 之后就不再运行了,而 sp2 可以正常运行:

use std::future::Future;
use std::task::Poll;
use std::thread::sleep;
use std::time::Duration;

struct ReadFile {}

impl Future for ReadFile {
    type Output = String;

    fn poll(
        self: std::pin::Pin<&mut Self>,
        _cx: &mut std::task::Context<'_>,
    ) -> std::task::Poll<Self::Output> {
        println!("poll");
        Poll::Pending
    }
}
#[tokio::main]
async fn main() {
    println!("Hello, world!");

    let sp1 = tokio::spawn(async {
        let future1 = ReadFile{};
        future1.await
    });
    let sp2 = tokio::spawn(async {
        read2().await;
    });

    let _ = tokio::join!(sp1, sp2);
}


fn read2() -> impl Future<Output = String> {
    async {
        sleep(Duration::new(2, 0));
        println!("2");
        String::from("2")
    }
}
  • Waker

那么我们应该怎么样判定一个程序应该被 poll 呢?Rust 使用 Waker 来解决异步执行的问题,当一个任务被 poll 但是返回 Pending 的时候,它就会被注册到 Waker 中,Waker 有一个 wake( ) 方法,告诉异步执行器关联的任务被唤醒了,当 wake( ) 方法被调用了,异步执行器就会被通知再次调用这个任务的 poll 方法。

我们把程序改造成如下的样子,现在我们为我们的程序添加了唤醒,但是我们还是返回 Pending ,此时程序会源源不断的输出 poll ,因为我们的程序执行 poll 方法之后返回 Pending ,那么我们的程序应该被放到 Waker 中,但是我们又执行了 waker().wake_by_ref () 方法,它会告诉异步运行时,我们的项目准备好了,此时我们的程序再次调用 poll 方法,但是得到的还是 Pending ,所以程序会一直输出 poll ,永远不会停止。

use std::future::Future;
use std::task::Poll;
use std::thread::sleep;
use std::time::Duration;

struct ReadFile {}

impl Future for ReadFile {
    type Output = String;

    fn poll(
        self: std::pin::Pin<&mut Self>,
        _cx: &mut std::task::Context<'_>,
    ) -> std::task::Poll<Self::Output> {
        println!("poll");
        _cx.waker().wake_by_ref();
        Poll::Pending
    }
}
#[tokio::main]
async fn main() {
    println!("Hello, world!");

    let sp1 = tokio::spawn(async {
        let future1 = ReadFile{};
        future1.await
    });
    let sp2 = tokio::spawn(async {
        read2().await;
    });

    let _ = tokio::join!(sp1, sp2);
}

fn read2() -> impl Future<Output = String> {
    async {
        sleep(Duration::new(2, 0));
        println!("2");
        String::from("2")
    }
}

为了能够使得项目异步任务正常结束,我们编写一个逻辑,设定任务的执行时间,如果时间未到,我们就让系统 sleep 直到任务结束,再调用 wake() 函数唤醒进程,现在我们的代码可以达到和 同步、异步和多线程的代码例子 这一节的代码一样的效果了,但是我们通过手动编码的方式模拟了它的运行流程,使得我们可以更加清楚的了解 rust 异步的运行机制:

use std::future::Future;
use std::task::Poll;
use std::thread::sleep;
use std::time::{Duration, Instant};

struct ReadFile {
    time: Instant,
}

impl Future for ReadFile {
    type Output = String;

    fn poll(
        self: std::pin::Pin<&mut Self>,
        _cx: &mut std::task::Context<'_>,
    ) -> std::task::Poll<Self::Output> {
        if Instant::now() >= self.time {
            println!("ready");
            Poll::Ready(String::from("1"))
        } else {
            println!("poll");
            let waker = _cx.waker().clone();
            let time = self.time;
            std::thread::spawn(move || {
                let current_time = Instant::now();
                if current_time < time {
                    std::thread::sleep(time - current_time);
                }
                waker.wake();
            });
            Poll::Pending
        }
    }
}
#[tokio::main]
async fn main() {
    println!("Hello, world!");

    let sp1 = tokio::spawn(async {
        let future1 = ReadFile {
            time: Instant::now() + Duration::from_millis(4000),
        };
        println!("{:?}", future1.await);
    });
    let sp2 = tokio::spawn(async {
        read2().await;
    });

    let _ = tokio::join!(sp1, sp2);
}

fn read2() -> impl Future<Output = String> {
    async {
        sleep(Duration::new(2, 0));
        println!("2");
        String::from("2")
    }
}

Tokio

Rust 只提供关于异步的的最小集合的相关概念,异步的调度,也就是异步运行时的操作需要我们依赖社区提供,比较常用的就是上文提到的 tokio 这个库,官网: https://tokio.rs/

以下是 Tokie 的组成部分:

  • Tokio 运行时需要理解 OS 内核的方法来开启 IO 操作
  • Tokie 运行时会注册异步的处理程序,以便在事件发生时作为 IO 操作的一部分进行调用
  • Tokie 反应器负责从内核监听这些事件并且与 Tokio 其他部分通信
  • Tokie 执行器会把一个 Future 取得更多进展的时候,调用 poll 来驱动它的运行
  • Future 调用 Tokie Waker 上的 wake 方法,Waker 就会通知执行器,然后把 Future 放回队列,再次调用 poll ,直到 Future 完成

请添加图片描述

整个 Tokio 的简化运行过程大概是:

  • Main 函数在 Tokio 运行时上生成任务 1
  • 任务1 有一个 Future ,从一个大文件读取数据
  • 从文件读取内容的请求交给系统内核的文件子系统
  • 此时,任务2 被交给 Tokio 运行时安排处理
  • 当任务1 操作结束的时候,文件子系统会触发一个中断,他被 Tokio 效应器识别
  • Tokio 通知任务 1 准备好了
  • 任务1 通知他注册的 Waker 说明他可以产生一个值了
  • Waker 通知 Tokio 执行器来调用任务1 的poll
  • Tokio 执行器安排任务 1 进行处理,调用 poll
  • 任务 1 产生一个值

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/402521.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

ADSP21489之CCES开发笔记(七)

一、SPORT简介&#xff1a;ADI21489具有八个独立的同步串行端口(SPORT)&#xff0c;可为各种外围设备提供I/O接口。他们是称为SPORT0至SPORT7&#xff0c;每个SPORT都有自己的一组控制寄存器和数据缓冲器&#xff0c;具有一定范围的时钟和帧同步选项。 二、特性&#xff1a;支持…

【云原生】Apisix接入Nacos、K8s服务注册中心自动获取服务

背景我在K8s中部署了两个使用SpringCLoudK8s框架的微服务&#xff0c;每个服务既暴露了HTTP接口方便测试&#xff0c;也暴露了gRpc接口更接近生产&#xff0c;他们的端口如下所示&#xff1a;常规功能上游我们可以在这里配置我们的服务HTTP配置一个HTTP服务&#xff0c;输入地址…

设计模式 - 行为型 - 模板模式学习

现象&#xff1a; 设计模式 - 行为型 - 模板模式学习 介绍&#xff1a; 模板模式的设计思路&#xff0c;在抽象类中定义抽象方法的执行顺序&#xff0c; 并将抽象方法设定为只有子类实现&#xff0c;但不提供独立访问的方法 只能通过已经被安排好的定义方法去执行 可以控制整…

如何快速生成数据字典SQL语句

如何快速生成数据字典SQL语句 一、首先我们找到需要生成的数据字典的国家标准 以民族数据字典为例 打开浏览器搜索民族字典代码表得到如下数据&#xff0c;并把得到的数据存入Excel表格中 国标民族数据字典 国标民族字典 第七次全国人口普查民族代码表与民族国标代码 第七次…

JVM垃圾回收之GCRoots可达性分析

已经死亡的对象&#xff0c;不可达的对象&#xff0c;肯定会被回收。 什么样的对象会被回收&#xff1f; 判定的算法有两种&#xff1a;引用计数法和可达性分析算法。 引用计数法&#xff1a;&#xff08;不使用这种&#xff09; 给对象中添加一个引用计数器&#xff0c;每当…

springboot项目打jar包发布上线、查看日志和进程号

目录前言一、Maven打包1.1 删除test文件和对应依赖&#xff08;不建议&#xff09;1.2 pom.xml中配置跳过测试1.3 使用idea打包1.4 使用maven命令打包二、启动jar包2.1 简单启动2.2 后台运行并打印日志2.3 脚本启动三、查看日志3.1 tail命令查看日志3.2 cat命令查看日志四、其他…

员工为什么对绩效考核不满意?管理者应该怎么做?

绩效考核是公司管理员工的重要工具&#xff0c;员工通过绩效考核可以衡量自己的工作效果和完成任务的能力&#xff0c;能够帮助管理者更好的了解员工的工作情况和绩效表现。 但是&#xff0c;现实中很多员工对绩效考核“不满意”&#xff0c;认为绩效考核不公正、不透明、不准确…

docker基础命令-阳哥

docker基础篇-阳哥 文章目录docker基础篇-阳哥centos7最小安装准备工作网络设置安装必备工具1.1 安装工具1.5 优化ssh连接1.1 修改ssh服务的配置文件1.2 找到对应的行数修改如下1.3 修改完成之后重启ssh服务1.6 永久修改主机名sed命令sed命令替换文本_xbd_zc的博客-CSDN博客1.镜…

《2023年化妆品原料成分趋势报告》| 解码化妆品备案数据,洞悉2023年潜力原料成分

回顾2022年&#xff0c;是中国化妆品行业“历史转折年”。备案制度的全面改革&#xff0c;直接改变了产品备案新格局。法律法规对新品备案提出了详实的要求&#xff0c;新品出炉也设置了更高的门槛&#xff0c;所以我们清晰地看到2022年整体的化妆品备案数据大幅度下滑&#xf…

精心梳理的11个在线常用工具,提高开发效率

1、Hutool工具类——Java开发常用工具类 参考文档&#xff1a;https://hutool.cn/docs/index.html#/ 2、在线工具——各种工具整合 我主要用于时间戳转换&#xff0c;进制转换等。 地址&#xff1a;https://tool.lu/ 3、蛙蛙工具——各种文本字符等整合工具 https://www…

python@pyside样式化

文章目录refWidget类创建样式化文件qss引用样式并启动应用ref Styling the Widgets Application - Qt for PythonQt Style Sheets Reference | Qt Widgets 5.15.12 Widget类创建 创建一个简单界面(菜单主要内容)它们是水平布局 主要内容包括一段文本和一个按钮,它们是垂直布…

AI大模型,驶向产业何方?

技术更迭&#xff0c;已不是壁垒&#xff0c;国产式AI需要的是产品的创新思维&#xff0c;以及对需求的产品变现能力。 作者|斗斗 出品|产业家 “AI炒了那么多年&#xff0c;第一次感觉它真的要来了。”国内某论坛中&#xff0c;带有ChatGPT的词条下&#xff0c;几乎都会出…

【Java开发】设计模式 01:单例模式

1 单例模式介绍单例模式&#xff08;Singleton Pattern&#xff09;是Java中最为基础的设计模式。这种类型的设计模式属于创建型模式&#xff0c;它提供了一种创建对象的最佳方式。这种模式涉及到一个单一的类&#xff0c;该类负责创建自己的对象&#xff0c;同时确保只有单个对…

打怪升级之使用csv文件发送UDP数据

面临的困难 如果你想做一个基于UDP包的数据处理问题的话&#xff0c;比较好的办法是使用csv文件来进行数据的保存&#xff08;csv文件比较简单&#xff0c;方便进行各种处理&#xff09;。 CSV文件格式简单&#xff0c;一方面它可以直接被excel处理&#xff0c;另一方面它完全…

PDF转word在线转换方法!操作简单又高效

相信很多已经工作的人都知道&#xff0c;PDF文件格式的优点在于兼容性强、安全性高&#xff0c;而且查看和传输给他人都很方便。但是&#xff0c;这种格式的文件也有不太方便的地方&#xff0c;那就是不能对文件内容进行编辑和修改。对于许多人来说&#xff0c;如果想要编辑修改…

接口自动化神器推荐:免费开源Lim接口测试平台

前言 对于传统的实现接口自动化的方案往往是搭建自动化框架&#xff0c;通过excel编写用例来驱动执行&#xff0c;例如常见的万金油技术栈组合&#xff1a;excel&#xff08;编写用例&#xff09;、pytest(用例执行)、allure(测试报告)等。 很多公司往往是通过自动化框架而非…

3-2 SpringCloud快速开发入门:Ribbon 实现客户端负载均衡

接上一章节Ribbon 是什么&#xff0c;这里讲讲Ribbon 实现客户端负载均衡 Ribbon 实现客户端负载均衡 由于 Spring Cloud Ribbon 的封装&#xff0c; 我们在微服务架构中使用客户端负载均衡调用非常简单&#xff0c; 只需要如下两步&#xff1a; 1、启动多个服务提供者实例并…

Matplotlib 绘图实用大全

本文只介绍最简单基本的画图方法 预设 要想画出来的图有些逼格&#xff0c;首先应该进行如下设置 plt.rcParams[font.sans-serif][SimHei] #画图时显示中文字体 plt.rcParams[axes.unicode_minus] False #防止因修改成中文字符&#xff0c;导致某些 unicode 字符不能…

正则表达式高阶技巧之环视的组合(使用python实现)

环视的组合介绍环视的组合环视中包含环视并列多个环视注意&#xff1a;环视作为多选分支排列在多选结构中断言与反向引用之间的关系介绍 在我们日常使用的编程语言都是支持环视的&#xff0c;但是语言不同&#xff0c;支持程度也就不同&#xff0c;下面具体介绍一下在python中…

【ChatGPT前世今生】前置知识Seq2Seq入门理解

【ChatGPT前世今生】前置知识Seq2Seq入门理解1、环境准备与依赖包安装2、数据集准备3、数据集预处理与读取4、定义Seq2Seq模型的基础类5、预处理训练数据集6、定义训练过程7、定义验证过程8、执行训练与验证过程9、展示模型的结果&#xff0c;进行进一步分析最近一段时间&#…