Rust 异步编程简介

2023-02-26 language rust

异步编程是一个并发编程模型,当前大部分语言都已经支持,只是实现方式略有不同。

简介

多数的语言将异步 runtime 作为本身的一部分开箱即用,例如 NodeJS、Erlang、GoLang 等,不过 Rust 作为系统级语言,并未局限于一种实现,语言本身提供了 Futureasync/await 等基本定义,而调度运行则交给三方实现,例如 futuresasync-stdsmoltokio 等。

标准库提供了如下三部分内容:

  • Future trait 零成本可暂停的异步任务定义,提供 poll() 接口检查任务是否完成。
  • 优雅的任务创建方法,通过 async/await 可以创建、暂停任务。
  • 通用的 Waker 类型,可用于唤醒暂停的 Future 任务。

这里不包含异步 IO 任务的定义,这些异步任务如何创建、运行,所有这些操作都交由三方库实现。

Future

如下是核心库中对 Future 的定义。

// core/task/poll.rs
pub enum Poll<T> {
    Ready(T),
    Pending,
}

// core/future/future.rs
use std::pin::Pin;
use std::task::{Context, Poll};

pub trait Future {
    type Output;
    fn poll(self: Pin<&mut Self>, cx: &mut Context) -> Poll<Self::Output>;
}

老版本的 poll 函数中 Context 定义为 cx: &mut Context<'_>,这里通过 '_ 标识一个匿名生命周期。这个 Future 比较简单,具有如下功能:

  • 可以执行 poll() 操作。
  • 当执行 poll() 操作时,可能返回 Pending 或者 Ready 状态。
  • 如果返回 Pending 状态,则在后面继续执行 poll() 操作。
  • 如果返回 Ready 则返回对应值,其中的 Output 就是 Future 结束后输出的内容。

Waker

如上的 Context 没有介绍,其中同时包含了核心的 Waker 定义。

// core/task/wake.rs
struct RawWakerVTable {
  clone: unsafe fn(*const ()) -> RawWaker,  // 用来复制Waker数据
  wake: unsafe fn(*const ()),  // 用来唤醒相关对象
  wake_by_ref: unsafe fn(*const ()),  // 类似于wake函数,但不会消耗引用
  drop: unsafe fn(*const ())  // Waker删除时调用
}
struct RawWaker {
  data: *const (),  // 保存了 Executor 相关数据,一般是 Task 相关结构体,后面虚拟表均会传入该变量
  vtable: &'static RawWakerVTable // 函数虚拟表,用来定义 Waker 相关的行为
}
pub struct Waker {
    waker: RawWaker,
}
pub struct Context<'a> {
    waker: &'a Waker,
    _marker: PhantomData<fn(&'a ()) -> &'a ()>,
    _marker2: PhantomData<*mut ()>,
}

详细使用

如下是一个简单的 Future 实现示例,每次执行都是立即结束,并返回一个随机值。

use std::{future::Future, pin::Pin, task::Context}

#[derive(Default)]
struct RandFuture;

impl Future for RandFuture {
	type Output = u16; // 当执行结束时应该返回的对象
	fn poll(self: Pin<&mut Self>, _cx: &mut Context) -> Poll<Self::Output  {
		Poll::ready(rand::random())
	}
}

上述代码可以直接调用 rand::random() 生成随机值,只是这里简单借用了 Future 实现。实际使用时,一般在实现一些比较耗时的 IO 操作时会使用 Future,而且通常是通过 async/await 实现,如下是使用 futures 的简单调用。

use futures::executor::block_on;

async fn hey() {
    println!("Hey World!!!");
}

async fn hello() {
    hey().await;
    println!("Hello World!!!");
}

fn main() {
    block_on(hello());
}

这里的 async/await 是语法糖,通过 async 关键字会返回一个匿名的 Future trait object,这是一个实现了 FutureGenerator,本质是一个状态机。

语法糖处理之后的代码可以查看 HIR 格式,这是 Rust 的中间表示,可以比较直观地查看程序执行起来大致的样子,可以通过 RUSTFLAGS="--emit hir" cargo build 方式生成,此时在 target/debug/deps 目录下会生成一个 .hir 文件,甚至可以生成 SVG 查看。另外,对于简单的示例,也可以在 play.rust-lang.org 中也可以查看。

上述的代码会生成类似如下的最终代码,注意,这里只是为了方便展示。

enum HelloState {
    Start,
    Await1(HeyFuture),
    Done,
}

impl Future for HelloState {
    type Output = ();
    fn poll(&mut self: HelloState, ctx: &mut Context<'_>) -> Poll<Output> {
        match self {
            HelloState::Start => {
                let hey = HeyFuture { ready: false };
                *self = HelloState::Await1(hey);
                self.poll();              // re-poll after first state change
            },
            HelloState::Await1(hey) => {
                match hey.poll(ctx) {      // await by poll, and print Hey World
                    Poll::Pending => {
                        Poll::Pending
                    },
                    Poll::Ready(output) => {
                        println!("Hello World!!!"); // code after await
                        *self = HelloState::Done;
                        let output = ();
                        Poll::Ready(output)
                    }
                }
            },
            HelloState::Done => {
                panic!("can't go here")
            }
        }
    }
}

也就是 async/await 会生成类似状态机的实现,这里会衍生一个重要概念 Leaf FutureNon-Leaf Future 的区别。

简单来说,Future 是分层级的,通常通过 async 创建的代码块是 Non-Leaf Future,而 await 真正等待的则是 Leaf Future,例如 IOTimer 等,只有 Leaf Future 才会注册到 Reactor 上等待事件发生,而这些 Leaf Future 就需要具体的运行态来实现。

基本原理

通过 asycn .await 有效降低了 Rust 中的异步编程,通过 async 会返回一个 Future Object 用来标识一个正在进行的异步计算,如下的两个函数声明等价。

async fn fetch(db: &DB) -> String {...}

fn fetch<'a>(db: &'a DB) -> impl Future<Output = String> + 'a {
    async move {...}
}

也就是说,async 实际上是个语法糖,编译器会自动生成相关的匿名类,也因为这个原因,导致直接在 trait 中定义异步接口变的很复杂,可以参考 why async fn in traits are hard 以及 async fn in trait MVP comes to nightly

Pin VS. Unpin

其中 Pin 是为了标识某个变量不能在内存中移动,而 Unpin 则允许移动。考虑异步编程中引用了某个变量,例如:

async {
    let mut data = [0; 256];
    let fetch_future = fetch_data(&mut data);
    fetch_future.await;
    println!("{:?}", data);
}

会转换为类似如下的数据结构。

struct DataBuff<'a> {
    buff: &'a mut[u8], // 指向下面的data
}
struct AsyncFuture {
    data: [u8; 256];
    fetch_future: DataBuff<'_>,
}

也就是在结构体内部会有指针指向自己,如果 AsyncFuture 被移动,那么对应的 data 地址也发生了变动,但是,引用地址却没有修改,从而会导致不合法访问。解决方式就是让 Future 不移动即可。

基本概念

Rust 1.33 版本中引入了 Pin 相关的 API 实现,包括了:std::pin::Pin std::marker::{Unpin, PhantomPinned} impl !Unpin for T,部分实现如下。

// core/src/pin.rs
pub struct Pin<Ptr> {
    pub __pointer: Ptr,
}

impl<Ptr: Deref> Deref for Pin<Ptr> {
    type Target = Ptr::Target;
    fn deref(&self) -> &Ptr::Target {
        Pin::get_ref(Pin::as_ref(self))
    }
}

impl<Ptr: DerefMut<Target: Unpin>> DerefMut for Pin<Ptr> {
    fn deref_mut(&mut self) -> &mut Ptr::Target {
        Pin::get_mut(Pin::as_mut(self))
    }
}

pub auto trait Unpin {}
pub struct PhantomPinned;
impl !Unpin for PhantomPinned {}

Pin 是一个结构体,并实现了 Deref DerefMut 特征,这也就意味着是一个智能指针,其内部只能包含一个指针 (只能是指针),假设其指向的是 T 类型,只要 T 没有实现 Unpin 特征,那么就能够保证 T 永远不会被移动。

这也就意味着,如果 T 实现了 Unpin 特征,那么 Pin 的作用也就完全失效了,此时 Pin<P<T>> 就等价于 P<T> 了。

但是编译器会默认给所有类实现 Unpin 特征,其中有两个例外,实现的是 !Unpin:A) PhantomPinned 以及作为成员的类;B) 为 async 自动生成的 impl Future 结构体。后者很容易理解,而前者实际就是为了用户可以 Pin 自定义类型。

使用 Safe Rust 的时候,不被移动的核心原理就是,避免暴露可变指针 &mut

示例

当某个类型声明了为 Pin 之后,可以确保实现 !Unpin 的结构体不发生移动。

use std::{marker::PhantomPinned, pin::Pin};

struct Data(u32);
impl Data {
    fn dump(&self) {
        println!("{} {:p}", self.0, self);
    }
}

struct PinData(u32, PhantomPinned);
impl PinData {
    // 通过Pin能确保打印的是相同值
    fn dump(self: Pin<&Self>) {
        println!("{} {:p}", self.0, self);
    }
}

fn main() {
    let heap = Box::new(Data(42)); // on heap
    heap.dump();
    let stack = *heap; // moved back to stack
    stack.dump();

    let pinned = Box::pin(Data(42));
    pinned.as_ref().dump();
    let unpined = Pin::into_inner(pinned);
    let stack = *unpined;
    let pinned = Box::pin(stack);
    pinned.as_ref().dump();

    let pinned = Box::pin(PinData(42, PhantomPinned));
    pinned.as_ref().dump();
    //let unpined = Pin::into_inner(pinned);  // 这里执行会失败
}

总结

  • 实现了 Unpin 特征,可以通过 Pin::get_mut() 或者 &mut T (因为实现了 DerefMut 特征) 获取,不确定具体使用场景。
  • 如果不想自定义结构体移动,添加 _marker: PhantomPinned 即可,或者应用 nightly 中的 !Unpin 特征。
  • 对于 Unsafe 代码可以强制通过 get_unchecked_mut() 获取 &mut T 类型,不过需要保证如下的契约。
  • 简单来说,Pin 用来标识 T 不会再被移动,Unpin 意味着允许移动,!Unpin 不允许取消 Pin 动作。
  • 使用 PhantomPinned 的作用是,一旦结构体被 Pin 了,那么就不能 Unpin

上述说的契约为,对于 Pin<P<T>> 类型:

  • T 实现了 Unpin,那么 P<T> 从被 Pin 到销毁,需要保证 P<T> 不被订住。
  • T 实现了 !Unpin,那么 P<T> 从被 Pin 到销毁,需要保证 P<T> 被订住。

参考

  • 对于 Pin 异步编程中的 Pinning 章节介绍,最后有相关的总结。