Rust Tokio 简介

2023-12-16 rust language

Tokio 是当前 Rust 异步编程使用最广泛的多线程解决方案,除了异步运行态,还包括了各种周边的实现,例如 HTTP 实现 hyper、gRPC 实现 tonic、trace 实现 tracing 等等。

简介

实现 Rust 异步核心包含了如下部分:

  • Future 一个零成本异步任务的 trait 定义,可暂停的异步任务,封装了异步计算的逻辑。
  • ExecutorFuture 就绪后,用来调度 Futures 运行。
  • EventSources 唤醒对应 Future 开始继续运行,对于异步 IO 来说通常称为 Reactor

当 Future 被调度到 Executor 执行时,会包含如下三个阶段:

  • Poll 执行器调用 Future 执行,直到 Future 被阻塞。
  • Wait 此时 Future 会返回 Poll::Pending,接着由 Reactor 进行跟踪,当等待事件到来时会唤醒。
  • Wake 事件触发相应 Future,接着会被执行器再次调度执行。

通过 ExecutorReactor 的组合就对应了所谓的 Event Loop 实现,只是 Rust 并未绑定到某个具体实现。

安装

使用时在 Cargo.toml[dependencies] 配置中添加 tokio = "1.36.0" 配置项即可,不过建议开始使用 tokio = { version = "1.36.0", features = ["full"] },否则可能会报 main function is not allowed to be async 的错误。

在熟悉之后可以选择所需的特性,从而减少编译时间以及大小,如下是常见的特性:

  • macros 支持宏的使用,最常见的就是 main 开始位置的 #[tokio::main] 使用,同时依赖 rt-multi-thread 特性。

示例

在源码的 examples 目录下有很多相关的示例代码,这里以 hello_world.rs 为例。

use tokio::io::AsyncWriteExt;
use tokio::net::TcpStream;

use std::error::Error;

#[tokio::main]
pub async fn main() -> Result<(), Box<dyn Error>> {
    let mut stream = TcpStream::connect("127.0.0.1:6142").await?;
    println!("created stream");

    let result = stream.write_all(b"hello world\n").await;
    println!("wrote to stream; success={:?}", result.is_ok());

    Ok(())
}

在上述示例中,增加了 $[tokio::main] 之后,实际上等价于如下实现。

tokio::runtime::Builder::new_multi_thread()
      .worker_threads(N)
      .enable_all()
      .build()
      .unwrap()
      .block_on(async { ... });

如下以简单的休眠为例,其中的 Durationstd::time 中的别名,可以直接使用。

use tokio::time::{sleep_until, Instant, Duration};

#[tokio::main]
async fn main() {
    sleep_until(Instant::now() + Duration::from_millis(100)).await;
    println!("100 ms have elapsed");
}

任务调度

可以通过 spawn() 调度新任务,包含了 tokio::runtime::Runtimetokio::task 两种方式,使用方式如下。

use std::thread;
use std::time::Duration;

fn main() {
    let rt = tokio::runtime::Runtime::new().unwrap();
    rt.spawn(async move {
        println!("Printing in a future (L1).");
        tokio::time::sleep(Duration::from_secs(1)).await;
        println!("Printing in a future (L2).");
    });

    thread::sleep(Duration::from_secs(2));
    println!("Runtime terminated.");
}

通过 Builder 会返回 runtime 对象,然后在 block_on() 中等待异步任务完成,注意,这只是等待对应任务,并非是等待 runtime 中的所有任务执行完成,因为这里是在 main 函数,所以基本上也等价。

也可以将上述的 rt.spawn() 更换为 rt.block_on() 阻塞执行,这样就不需要在执行 thread::sleep() 了,然后修改 spawn() 的调用。

use std::time::Duration;

fn main() {
    let _rt = tokio::runtime::Runtime::new().unwrap();
    tokio::spawn(async move {
        println!("Printing in a future (L1).");
        tokio::time::sleep(Duration::from_secs(1)).await;
        println!("Printing in a future (L2).");
    });

    thread::sleep(Duration::from_secs(2));
    println!("Runtime terminated.");
}

此时会报 there is no reactor running, must be called from the context of a Tokio 1.x runtime 的错误,解决方法也很简单,需要显示通过 let _guard = rt.enter() 设置上下文。

其它

Stream

提供了类似 Stream 实现,使用示例如下,需要引入 tokio-stream 包。

use tokio_stream::StreamExt;

#[tokio::main]
async fn main() {
    let mut stream = tokio_stream::iter(&[1, 2, 3]);
    while let Some(v) = stream.next().await {
        println!("{}", v);
    }
}

参考

  • Tokio Tutorial 官方相关的教程,同时有很多底层的介绍。