Rust 异步编程简介

2023-02-26 language rust

异步编程是一个并发编程模型,当前大部分语言都已经支持,只是实现方式略有不同。

简介

多数的语言将异步 runtime 作为本身的一部分开箱即用,例如 NodeJS、Erlang、GoLang 等,不过 Rust 作为系统级语言,并未局限于一种实现,语言本身提供了 Futureasync/await 等基本定义,而调度运行则交给三方实现,例如 futuresasync-stdsmoltokio 等。

标准库提供了如下三部分内容:

  • Future trait 零成本可暂停的异步任务定义,提供 poll() 接口检查任务是否完成。
  • 优雅的任务创建方法,通过 async/await 可以创建、暂停任务。
  • 通用的 Waker 类型,可用于唤醒暂停的 Future 任务。

这里不包含异步 IO 任务的定义,这些异步任务如何创建、运行,所有这些操作都交由三方库实现。

Future

如下是核心库中对 Future 的定义。

// core/task/poll.rs
pub enum Poll<T> {
    Ready(T),
    Pending,
}

// core/future/future.rs
use std::pin::Pin;
use std::task::{Context, Poll};

pub trait Future {
    type Output;
    fn poll(self: Pin<&mut Self>, cx: &mut Context) -> Poll<Self::Output>;
}

老版本的 poll 函数中 Context 定义为 cx: &mut Context<'_>,这里通过 '_ 标识一个匿名生命周期。这个 Future 比较简单,具有如下功能:

  • 可以执行 poll() 操作。
  • 当执行 poll() 操作时,可能返回 Pending 或者 Ready 状态。
  • 如果返回 Pending 状态,则在后面继续执行 poll() 操作。
  • 如果返回 Ready 则返回对应值,其中的 Output 就是 Future 结束后输出的内容。

Waker

如上的 Context 没有介绍,其中同时包含了核心的 Waker 定义。

// core/task/wake.rs
struct RawWakerVTable {
  clone: unsafe fn(*const ()) -> RawWaker,  // 用来复制Waker数据
  wake: unsafe fn(*const ()),  // 用来唤醒相关对象
  wake_by_ref: unsafe fn(*const ()),  // 类似于wake函数,但不会消耗引用
  drop: unsafe fn(*const ())  // Waker删除时调用
}
struct RawWaker {
  data: *const (),  // 保存了 Executor 相关数据,一般是 Task 相关结构体,后面虚拟表均会传入该变量
  vtable: &'static RawWakerVTable // 函数虚拟表,用来定义 Waker 相关的行为
}
pub struct Waker {
    waker: RawWaker,
}
pub struct Context<'a> {
    waker: &'a Waker,
    _marker: PhantomData<fn(&'a ()) -> &'a ()>,
    _marker2: PhantomData<*mut ()>,
}

详细使用

如下是一个简单的 Future 实现示例,每次执行都是立即结束,并返回一个随机值。

use std::{future::Future, pin::Pin, task::Context}

#[derive(Default)]
struct RandFuture;

impl Future for RandFuture {
	type Output = u16; // 当执行结束时应该返回的对象
	fn poll(self: Pin<&mut Self>, _cx: &mut Context) -> Poll<Self::Output  {
		Poll::ready(rand::random())
	}
}

上述代码可以直接调用 rand::random() 生成随机值,只是这里简单借用了 Future 实现。实际使用时,一般在实现一些比较耗时的 IO 操作时会使用 Future,而且通常是通过 async/await 实现,如下是使用 futures 的简单调用。

use futures::executor::block_on;

async fn hey() {
    println!("Hey World!!!");
}

async fn hello() {
    hey().await;
    println!("Hello World!!!");
}

fn main() {
    block_on(hello());
}

这里的 async/await 是语法糖,通过 async 关键字会返回一个匿名的 Future trait object,这是一个实现了 FutureGenerator,本质是一个状态机。

语法糖处理之后的代码可以查看 HIR 格式,这是 Rust 的中间表示,可以比较直观地查看程序执行起来大致的样子,可以通过 RUSTFLAGS="--emit hir" cargo build 方式生成,此时在 target/debug/deps 目录下会生成一个 .hir 文件,甚至可以生成 SVG 查看。另外,对于简单的示例,也可以在 play.rust-lang.org 中也可以查看。

上述的代码会生成类似如下的最终代码,注意,这里只是为了方便展示。

enum HelloState {
    Start,
    Await1(HeyFuture),
    Done,
}

impl Future for HelloState {
    type Output = ();
    fn poll(&mut self: HelloState, ctx: &mut Context<'_>) -> Poll<Output> {
        match self {
            HelloState::Start => {
                let hey = HeyFuture { ready: false };
                *self = HelloState::Await1(hey);
                self.poll();              // re-poll after first state change
            },
            HelloState::Await1(hey) => {
                match hey.poll(ctx) {      // await by poll, and print Hey World
                    Poll::Pending => {
                        Poll::Pending
                    },
                    Poll::Ready(output) => {
                        println!("Hello World!!!"); // code after await
                        *self = HelloState::Done;
                        let output = ();
                        Poll::Ready(output)
                    }
                }
            },
            HelloState::Done => {
                panic!("can't go here")
            }
        }
    }
}

也就是 async/await 会生成类似状态机的实现,这里会衍生一个重要概念 Leaf FutureNon-Leaf Future 的区别。

简单来说,Future 是分层级的,通常通过 async 创建的代码块是 Non-Leaf Future,而 await 真正等待的则是 Leaf Future,例如 IOTimer 等,只有 Leaf Future 才会注册到 Reactor 上等待事件发生,而这些 Leaf Future 就需要具体的运行态来实现。

基本原理

通过 asycn .await 有效降低了 Rust 中的异步编程,通过 async 会返回一个 Future Object 用来标识一个正在进行的异步计算,如下的两个函数声明等价。

async fn fetch(db: &DB) -> String {...}

fn fetch<'a>(db: &'a DB) -> impl Future<Output = String> + 'a {
    async move {...}
}

也就是说,async 实际上是个语法糖,编译器会自动生成相关的匿名类,也因为这个原因,导致直接在 trait 中定义异步接口变的很复杂,可以参考 why async fn in traits are hard 以及 async fn in trait MVP comes to nightly