这一篇将讲解什么是和为什么进行异步编程,Rust 怎么样进行异步编程以及其机制,并且讲解目前 rust 常用的异步编程的函数库。本章的内容来自杨旭老师的教程:
https://www.bilibili.com/video/BV16r4y187P4/?spm_id_from=333.999.0.0&vd_source=8595fbbf160cc11a0cc07cadacf22951
选择多线程/异步编程的理由
并发 是指程序的不同部分可以同时不按照顺序的执行并且不影响最终结果的能力,比如我们有一个任务是计算 1+1 , 2+2 再把结果累加起来,那么 1+ 1 和 2+2 可以同时运行,或者 2+2 先运行 1+1 后运行,或者 1+1 先运行,结果都是 6 不会发生变化。
并行 同时可以执行多个任务。比如我们有两个任务计算输出 1+1 和 2+2 ,那么他们可以一起运行,之后输出 2 和 4 ,当然我们并不知道哪个任务先完成,所以我们不知道 2 和 4 ,谁会先输出。
本教程中将并发和并行简称统称为并发
我们需要让程序并发/并行执行的原因是:
- 用户层面:让软件运行的更快,因为我们有多个任务可以同时进行或者更好的调度进行,所以程序的效率可以大大提高。
- 计算机层面:多CPU 和 多核CPU 的出现以及 CPU调度算法的实现,让异步编程成为了可能,让我们能写出整体性更好的程序。
软件程序处理任务有两种类型:
- CPU密集型:占用CPU的资源,比如文件压缩,视频编码等。通常可以利用多CPU或者多核心CPU 进行处理。
- IO 密集型:占用IO资源的任务。从文件系统或者数据库访问数据,处理HTTP/TCP 请求。例如在 web 服务器中,我们通过 CRUD 将数据传递过来,这时要求CPU 等待数据写入磁盘,但是磁盘很慢,CPU就是干等的状态,这时候如果是异步编程就可以让CPU执行其他的任务。
多线程和异步编程的使用场景
假如我们有一个程序处理包含三个操作:处理数据、阻塞、把任务返回的数据打包。如下是同步编程、多线程和异步编程的区别:
-
同步需要等待程序阻塞结束才能进行下一个任务
-
多线程可以同时进行多个任务,效率更高
-
异步编程在 task1 阻塞的时候调用了其他的任务,也提升了效率
如上图,多线程看似是效率最高的,但是我们可以看下一个场景:
假设我们有一个 web 服务器,它接收多个请求,我们可以使用多线程来解决问题,针对每一个请求,都开启一个原生的系统线程来处理它,但是它引入了新的问题,请求执行的顺序无法预测,同时会产生死锁和竞争资源等问题,很可能出现我们先删除了数据,再查询数据的情况。
同时我们的Rust 是1:1线程模型,它对总线程数的有限制的,所以多线程不一定适合所有的场景。而这个场景如果使用异步编程来运行的话,对于每个任务,服务器生成一个任务来处理它,由异步运行时安排各个异步任务在可用的 CPU上执行。
异步编程就是 ,在CPU 等待外部事件或者动作的时候,异步运行时会安排其他的可继续执行的任务在 CPU 上执行。而从磁盘或者IO 的中断到达时,异步运行时会识别这件事,安排原来的任务继续执行。一般来说 IO 受限的程序(运行速度依赖于 IO 的速度)比起 CPU 受限的任务 (运行速度依赖于 CPU 的速度),更加适合异步任务的执行。
同步、异步和多线程的代码例子
下面是一些简单的例子,我们编写两个函数,分别阻塞 4 秒和 2 秒后返回,如果我们像这样编写函数同步运行这两个函数,我们会在 4 秒后得到 1 ,再在 2 秒后得到 2,总计运行 6 秒:
use std::thread::sleep;
use std::time::Duration;
fn main() {
println!("Hello, world!");
let file1_content = read1();
println!("{}",file1_content);
let file2_content = read2();
println!("{}",file2_content);
}
fn read1() -> String {
sleep(Duration::new(4,0));
String::from("1")
}
fn read2() -> String {
sleep(Duration::new(2,0));
String::from("2")
}
如果我们将它改造成多线程的运行模式,我们会在 2 秒后得到 2 ,再在 2 秒后得到 1,总计运行 4 秒:
use std::thread::{sleep, self};
use std::time::Duration;
fn main() {
println!("Hello, world!");
let sp1 = thread::spawn(||{
let file1_content = read1();
println!("{}",file1_content);
});
let sp2 = thread::spawn(||{
let file2_content = read2();
println!("{}",file2_content);
});
sp1.join().unwrap();
sp2.join().unwrap();
}
fn read1() -> String {
sleep(Duration::new(4,0));
String::from("1")
}
fn read2() -> String {
sleep(Duration::new(2,0));
String::from("2")
}
最后我们将它改造成异步的,我们需要引入一个依赖:
[dependencies]
tokio = {version = "1", features = ["full"]}
之后将我们的代码改写成这样,运行后我们可以得到和多线程一样的效果,但是这两个任务可以执行在同一个线程上,也可以执行在不同的线程上,这依赖于我们异步运行时的调度:
use std::thread::sleep;
use std::time::Duration;
#[tokio::main]
async fn main() {
println!("Hello, world!");
let sp1 = tokio::spawn(async {
read1().await;
});
let sp2 = tokio::spawn(async {
read2().await;
});
let _ = tokio::join!(sp1, sp2);
}
async fn read1() -> String {
sleep(Duration::new(4, 0));
println!("1");
String::from("1")
}
async fn read2() -> String {
sleep(Duration::new(2, 0));
println!("2");
String::from("2")
}
理解 Rust 的异步编程
- async
在 rust 中,我们可以使用 async 和 await 关键字来进行异步编程,如果一个函数是 async (异步) 的,那么你需要在调用它的时候加上 await 关键字,在 await 的过程中,CPU可以去分配别的任务,直到 这个进程运行完毕再继续执行后续的操作。同样如果你的函数里使用了需要 await 的异步函数,那么你调用这些函数的函数也是异步的:
比如下面的例子, async_read 是一个可以读取文本内容的异步函数,显然,读取文本是一个 IO 操作,它在进行 IO 操作的过程中可以让CPU 去做别的操作,所以它是异步的。
因为是异步的,所以调用它的时候要使用 await 关键字,这会让它等待直到 IO 完毕后继续执行这个任务。
因为使用了异步的 async_read 函数,那么我们的 hello 函数也是 async 的,所以我们要加上修饰。
async fn hello() {
let content = async_read("a.txt").await;
println!("{}", content);
let content = async_read("b.txt").await;
println!("{}", content);
}
- Future
Rust 的异步是由 Future 机制实现的, Future 是由异步计算或者函数计算产生的值,Rust 异步函数都会返回 Future。
Future 里具有一个 poll 方法,用于检查异步任务是不是完成了,它返回一个枚举 Poll,具有 Pending 和 Ready(val)两个值,Pending 表示没有完成,而 Ready(val)表示异步方法完成了,val 是返回的值。
pub trait Future {
type Output;
pub fn poll(self: Pin<&mut Self>, ctx: &mut Context<'_>) -> Poll<Self::Output>;
}
pub enum Poll<T> {
Ready(T),
Pending,
}
异步执行器是异步运行时的一部分,它会管理一个 Future 的集合,并通过调用 Future 上的 poll 方法来驱动他们完成,在 Rust 中 Async 是一个语法糖,在加上了 async 之后,就相当于告诉异步执行器它返回一个 Future ,这个 Future 会被驱动直到完成。所以我们在上一个部分中的例子相当于是这样的:
fn read1() -> impl Future<Output = String> {
async {
sleep(Duration::new(4, 0));
println!("1");
String::from("1")
}
}
我们可以将我们的代码改造成这样,其中一个任务使用我们的自己编写的 Future ,它只会返回 Pending ,现在我们运行我们的项目,sp1 它永远不会结束,返回 poll 之后就不再运行了,而 sp2 可以正常运行:
use std::future::Future;
use std::task::Poll;
use std::thread::sleep;
use std::time::Duration;
struct ReadFile {}
impl Future for ReadFile {
type Output = String;
fn poll(
self: std::pin::Pin<&mut Self>,
_cx: &mut std::task::Context<'_>,
) -> std::task::Poll<Self::Output> {
println!("poll");
Poll::Pending
}
}
#[tokio::main]
async fn main() {
println!("Hello, world!");
let sp1 = tokio::spawn(async {
let future1 = ReadFile{};
future1.await
});
let sp2 = tokio::spawn(async {
read2().await;
});
let _ = tokio::join!(sp1, sp2);
}
fn read2() -> impl Future<Output = String> {
async {
sleep(Duration::new(2, 0));
println!("2");
String::from("2")
}
}
- Waker
那么我们应该怎么样判定一个程序应该被 poll 呢?Rust 使用 Waker 来解决异步执行的问题,当一个任务被 poll 但是返回 Pending 的时候,它就会被注册到 Waker 中,Waker 有一个 wake( ) 方法,告诉异步执行器关联的任务被唤醒了,当 wake( ) 方法被调用了,异步执行器就会被通知再次调用这个任务的 poll 方法。
我们把程序改造成如下的样子,现在我们为我们的程序添加了唤醒,但是我们还是返回 Pending ,此时程序会源源不断的输出 poll ,因为我们的程序执行 poll 方法之后返回 Pending ,那么我们的程序应该被放到 Waker 中,但是我们又执行了 waker().wake_by_ref () 方法,它会告诉异步运行时,我们的项目准备好了,此时我们的程序再次调用 poll 方法,但是得到的还是 Pending ,所以程序会一直输出 poll ,永远不会停止。
use std::future::Future;
use std::task::Poll;
use std::thread::sleep;
use std::time::Duration;
struct ReadFile {}
impl Future for ReadFile {
type Output = String;
fn poll(
self: std::pin::Pin<&mut Self>,
_cx: &mut std::task::Context<'_>,
) -> std::task::Poll<Self::Output> {
println!("poll");
_cx.waker().wake_by_ref();
Poll::Pending
}
}
#[tokio::main]
async fn main() {
println!("Hello, world!");
let sp1 = tokio::spawn(async {
let future1 = ReadFile{};
future1.await
});
let sp2 = tokio::spawn(async {
read2().await;
});
let _ = tokio::join!(sp1, sp2);
}
fn read2() -> impl Future<Output = String> {
async {
sleep(Duration::new(2, 0));
println!("2");
String::from("2")
}
}
为了能够使得项目异步任务正常结束,我们编写一个逻辑,设定任务的执行时间,如果时间未到,我们就让系统 sleep 直到任务结束,再调用 wake() 函数唤醒进程,现在我们的代码可以达到和 同步、异步和多线程的代码例子 这一节的代码一样的效果了,但是我们通过手动编码的方式模拟了它的运行流程,使得我们可以更加清楚的了解 rust 异步的运行机制:
use std::future::Future;
use std::task::Poll;
use std::thread::sleep;
use std::time::{Duration, Instant};
struct ReadFile {
time: Instant,
}
impl Future for ReadFile {
type Output = String;
fn poll(
self: std::pin::Pin<&mut Self>,
_cx: &mut std::task::Context<'_>,
) -> std::task::Poll<Self::Output> {
if Instant::now() >= self.time {
println!("ready");
Poll::Ready(String::from("1"))
} else {
println!("poll");
let waker = _cx.waker().clone();
let time = self.time;
std::thread::spawn(move || {
let current_time = Instant::now();
if current_time < time {
std::thread::sleep(time - current_time);
}
waker.wake();
});
Poll::Pending
}
}
}
#[tokio::main]
async fn main() {
println!("Hello, world!");
let sp1 = tokio::spawn(async {
let future1 = ReadFile {
time: Instant::now() + Duration::from_millis(4000),
};
println!("{:?}", future1.await);
});
let sp2 = tokio::spawn(async {
read2().await;
});
let _ = tokio::join!(sp1, sp2);
}
fn read2() -> impl Future<Output = String> {
async {
sleep(Duration::new(2, 0));
println!("2");
String::from("2")
}
}
Tokio
Rust 只提供关于异步的的最小集合的相关概念,异步的调度,也就是异步运行时的操作需要我们依赖社区提供,比较常用的就是上文提到的 tokio 这个库,官网: https://tokio.rs/
以下是 Tokie 的组成部分:
- Tokio 运行时需要理解 OS 内核的方法来开启 IO 操作
- Tokie 运行时会注册异步的处理程序,以便在事件发生时作为 IO 操作的一部分进行调用
- Tokie 反应器负责从内核监听这些事件并且与 Tokio 其他部分通信
- Tokie 执行器会把一个 Future 取得更多进展的时候,调用 poll 来驱动它的运行
- Future 调用 Tokie Waker 上的 wake 方法,Waker 就会通知执行器,然后把 Future 放回队列,再次调用 poll ,直到 Future 完成
整个 Tokio 的简化运行过程大概是:
- Main 函数在 Tokio 运行时上生成任务 1
- 任务1 有一个 Future ,从一个大文件读取数据
- 从文件读取内容的请求交给系统内核的文件子系统
- 此时,任务2 被交给 Tokio 运行时安排处理
- 当任务1 操作结束的时候,文件子系统会触发一个中断,他被 Tokio 效应器识别
- Tokio 通知任务 1 准备好了
- 任务1 通知他注册的 Waker 说明他可以产生一个值了
- Waker 通知 Tokio 执行器来调用任务1 的poll
- Tokio 执行器安排任务 1 进行处理,调用 poll
- 任务 1 产生一个值