异步编程是一个并发编程模型,当前大部分语言都已经支持,只是实现方式略有不同。
简介
多数的语言将异步 runtime 作为本身的一部分开箱即用,例如 NodeJS、Erlang、GoLang 等,不过 Rust 作为系统级语言,并未局限于一种实现,语言本身提供了 Future
、async/await
等基本定义,而调度运行则交给三方实现,例如 futures
、async-std
、smol
、tokio
等。
标准库提供了如下三部分内容:
Future trait
零成本可暂停的异步任务定义,提供poll()
接口检查任务是否完成。- 优雅的任务创建方法,通过
async/await
可以创建、暂停任务。 - 通用的
Waker
类型,可用于唤醒暂停的Future
任务。
这里不包含异步 IO 任务的定义,这些异步任务如何创建、运行,所有这些操作都交由三方库实现。
Future
如下是核心库中对 Future
的定义。
// core/task/poll.rs
pub enum Poll<T> {
Ready(T),
Pending,
}
// core/future/future.rs
use std::pin::Pin;
use std::task::{Context, Poll};
pub trait Future {
type Output;
fn poll(self: Pin<&mut Self>, cx: &mut Context) -> Poll<Self::Output>;
}
老版本的 poll 函数中 Context 定义为 cx: &mut Context<'_>
,这里通过 '_
标识一个匿名生命周期。这个 Future 比较简单,具有如下功能:
- 可以执行
poll()
操作。 - 当执行
poll()
操作时,可能返回Pending
或者Ready
状态。 - 如果返回
Pending
状态,则在后面继续执行poll()
操作。 - 如果返回
Ready
则返回对应值,其中的Output
就是Future
结束后输出的内容。
Waker
如上的 Context
没有介绍,其中同时包含了核心的 Waker
定义。
// core/task/wake.rs
struct RawWakerVTable {
clone: unsafe fn(*const ()) -> RawWaker, // 用来复制Waker数据
wake: unsafe fn(*const ()), // 用来唤醒相关对象
wake_by_ref: unsafe fn(*const ()), // 类似于wake函数,但不会消耗引用
drop: unsafe fn(*const ()) // Waker删除时调用
}
struct RawWaker {
data: *const (), // 保存了 Executor 相关数据,一般是 Task 相关结构体,后面虚拟表均会传入该变量
vtable: &'static RawWakerVTable // 函数虚拟表,用来定义 Waker 相关的行为
}
pub struct Waker {
waker: RawWaker,
}
pub struct Context<'a> {
waker: &'a Waker,
_marker: PhantomData<fn(&'a ()) -> &'a ()>,
_marker2: PhantomData<*mut ()>,
}
详细使用
如下是一个简单的 Future
实现示例,每次执行都是立即结束,并返回一个随机值。
use std::{future::Future, pin::Pin, task::Context}
#[derive(Default)]
struct RandFuture;
impl Future for RandFuture {
type Output = u16; // 当执行结束时应该返回的对象
fn poll(self: Pin<&mut Self>, _cx: &mut Context) -> Poll<Self::Output {
Poll::ready(rand::random())
}
}
上述代码可以直接调用 rand::random()
生成随机值,只是这里简单借用了 Future
实现。实际使用时,一般在实现一些比较耗时的 IO 操作时会使用 Future
,而且通常是通过 async/await
实现,如下是使用 futures
的简单调用。
use futures::executor::block_on;
async fn hey() {
println!("Hey World!!!");
}
async fn hello() {
hey().await;
println!("Hello World!!!");
}
fn main() {
block_on(hello());
}
这里的 async/await
是语法糖,通过 async
关键字会返回一个匿名的 Future trait object
,这是一个实现了 Future
的 Generator
,本质是一个状态机。
语法糖处理之后的代码可以查看 HIR 格式,这是 Rust 的中间表示,可以比较直观地查看程序执行起来大致的样子,可以通过
RUSTFLAGS="--emit hir" cargo build
方式生成,此时在target/debug/deps
目录下会生成一个.hir
文件,甚至可以生成 SVG 查看。另外,对于简单的示例,也可以在 play.rust-lang.org 中也可以查看。
上述的代码会生成类似如下的最终代码,注意,这里只是为了方便展示。
enum HelloState {
Start,
Await1(HeyFuture),
Done,
}
impl Future for HelloState {
type Output = ();
fn poll(&mut self: HelloState, ctx: &mut Context<'_>) -> Poll<Output> {
match self {
HelloState::Start => {
let hey = HeyFuture { ready: false };
*self = HelloState::Await1(hey);
self.poll(); // re-poll after first state change
},
HelloState::Await1(hey) => {
match hey.poll(ctx) { // await by poll, and print Hey World
Poll::Pending => {
Poll::Pending
},
Poll::Ready(output) => {
println!("Hello World!!!"); // code after await
*self = HelloState::Done;
let output = ();
Poll::Ready(output)
}
}
},
HelloState::Done => {
panic!("can't go here")
}
}
}
}
也就是 async/await
会生成类似状态机的实现,这里会衍生一个重要概念 Leaf Future
和 Non-Leaf Future
的区别。
简单来说,Future
是分层级的,通常通过 async
创建的代码块是 Non-Leaf Future
,而 await
真正等待的则是 Leaf Future
,例如 IO
、Timer
等,只有 Leaf Future
才会注册到 Reactor
上等待事件发生,而这些 Leaf Future
就需要具体的运行态来实现。
基本原理
通过 asycn
.await
有效降低了 Rust 中的异步编程,通过 async
会返回一个 Future Object 用来标识一个正在进行的异步计算,如下的两个函数声明等价。
async fn fetch(db: &DB) -> String {...}
fn fetch<'a>(db: &'a DB) -> impl Future<Output = String> + 'a {
async move {...}
}
也就是说,async
实际上是个语法糖,编译器会自动生成相关的匿名类,也因为这个原因,导致直接在 trait
中定义异步接口变的很复杂,可以参考 why async fn in traits are hard 以及 async fn in trait MVP comes to nightly。
Pin VS. Unpin
其中 Pin
是为了标识某个变量不能在内存中移动,而 Unpin
则允许移动。考虑异步编程中引用了某个变量,例如:
async {
let mut data = [0; 256];
let fetch_future = fetch_data(&mut data);
fetch_future.await;
println!("{:?}", data);
}
会转换为类似如下的数据结构。
struct DataBuff<'a> {
buff: &'a mut[u8], // 指向下面的data
}
struct AsyncFuture {
data: [u8; 256];
fetch_future: DataBuff<'_>,
}
也就是在结构体内部会有指针指向自己,如果 AsyncFuture
被移动,那么对应的 data
地址也发生了变动,但是,引用地址却没有修改,从而会导致不合法访问。解决方式就是让 Future
不移动即可。
基本概念
在 Rust 1.33
版本中引入了 Pin 相关的 API 实现,包括了:std::pin::Pin
std::marker::{Unpin, PhantomPinned}
impl !Unpin for T
,部分实现如下。
// core/src/pin.rs
pub struct Pin<Ptr> {
pub __pointer: Ptr,
}
impl<Ptr: Deref> Deref for Pin<Ptr> {
type Target = Ptr::Target;
fn deref(&self) -> &Ptr::Target {
Pin::get_ref(Pin::as_ref(self))
}
}
impl<Ptr: DerefMut<Target: Unpin>> DerefMut for Pin<Ptr> {
fn deref_mut(&mut self) -> &mut Ptr::Target {
Pin::get_mut(Pin::as_mut(self))
}
}
pub auto trait Unpin {}
pub struct PhantomPinned;
impl !Unpin for PhantomPinned {}
Pin
是一个结构体,并实现了 Deref
DerefMut
特征,这也就意味着是一个智能指针,其内部只能包含一个指针 (只能是指针),假设其指向的是 T
类型,只要 T
没有实现 Unpin
特征,那么就能够保证 T
永远不会被移动。
这也就意味着,如果 T
实现了 Unpin
特征,那么 Pin
的作用也就完全失效了,此时 Pin<P<T>>
就等价于 P<T>
了。
但是编译器会默认给所有类实现 Unpin
特征,其中有两个例外,实现的是 !Unpin
:A) PhantomPinned
以及作为成员的类;B) 为 async
自动生成的 impl Future
结构体。后者很容易理解,而前者实际就是为了用户可以 Pin
自定义类型。
使用 Safe Rust
的时候,不被移动的核心原理就是,避免暴露可变指针 &mut
。
示例
当某个类型声明了为 Pin
之后,可以确保实现 !Unpin
的结构体不发生移动。
use std::{marker::PhantomPinned, pin::Pin};
struct Data(u32);
impl Data {
fn dump(&self) {
println!("{} {:p}", self.0, self);
}
}
struct PinData(u32, PhantomPinned);
impl PinData {
// 通过Pin能确保打印的是相同值
fn dump(self: Pin<&Self>) {
println!("{} {:p}", self.0, self);
}
}
fn main() {
let heap = Box::new(Data(42)); // on heap
heap.dump();
let stack = *heap; // moved back to stack
stack.dump();
let pinned = Box::pin(Data(42));
pinned.as_ref().dump();
let unpined = Pin::into_inner(pinned);
let stack = *unpined;
let pinned = Box::pin(stack);
pinned.as_ref().dump();
let pinned = Box::pin(PinData(42, PhantomPinned));
pinned.as_ref().dump();
//let unpined = Pin::into_inner(pinned); // 这里执行会失败
}
总结
- 实现了
Unpin
特征,可以通过Pin::get_mut()
或者&mut T
(因为实现了DerefMut
特征) 获取,不确定具体使用场景。 - 如果不想自定义结构体移动,添加
_marker: PhantomPinned
即可,或者应用nightly
中的!Unpin
特征。 - 对于 Unsafe 代码可以强制通过
get_unchecked_mut()
获取&mut T
类型,不过需要保证如下的契约。 - 简单来说,
Pin
用来标识T
不会再被移动,Unpin
意味着允许移动,!Unpin
不允许取消Pin
动作。 - 使用
PhantomPinned
的作用是,一旦结构体被Pin
了,那么就不能Unpin
。
上述说的契约为,对于 Pin<P<T>>
类型:
T
实现了Unpin
,那么P<T>
从被 Pin 到销毁,需要保证P<T>
不被订住。T
实现了!Unpin
,那么P<T>
从被 Pin 到销毁,需要保证P<T>
被订住。
参考
- 对于 Pin 异步编程中的 Pinning 章节介绍,最后有相关的总结。