Tokio 是当前 Rust 异步编程使用最广泛的多线程解决方案,除了异步运行态,还包括了各种周边的实现,例如 HTTP 实现 hyper、gRPC 实现 tonic、trace 实现 tracing 等等。
简介
实现 Rust 异步核心包含了如下部分:
Future
一个零成本异步任务的trait
定义,可暂停的异步任务,封装了异步计算的逻辑。Executor
当Future
就绪后,用来调度Futures
运行。EventSources
唤醒对应Future
开始继续运行,对于异步 IO 来说通常称为Reactor
。
当 Future 被调度到 Executor 执行时,会包含如下三个阶段:
Poll
执行器调用Future
执行,直到Future
被阻塞。Wait
此时Future
会返回Poll::Pending
,接着由Reactor
进行跟踪,当等待事件到来时会唤醒。Wake
事件触发相应Future
,接着会被执行器再次调度执行。
通过 Executor
和 Reactor
的组合就对应了所谓的 Event Loop
实现,只是 Rust 并未绑定到某个具体实现。
安装
使用时在 Cargo.toml
的 [dependencies]
配置中添加 tokio = "1.36.0"
配置项即可,不过建议开始使用 tokio = { version = "1.36.0", features = ["full"] }
,否则可能会报 main function is not allowed to be async
的错误。
在熟悉之后可以选择所需的特性,从而减少编译时间以及大小,如下是常见的特性:
macros
支持宏的使用,最常见的就是main
开始位置的#[tokio::main]
使用,同时依赖rt-multi-thread
特性。
示例
在源码的 examples
目录下有很多相关的示例代码,这里以 hello_world.rs
为例。
use tokio::io::AsyncWriteExt;
use tokio::net::TcpStream;
use std::error::Error;
#[tokio::main]
pub async fn main() -> Result<(), Box<dyn Error>> {
let mut stream = TcpStream::connect("127.0.0.1:6142").await?;
println!("created stream");
let result = stream.write_all(b"hello world\n").await;
println!("wrote to stream; success={:?}", result.is_ok());
Ok(())
}
在上述示例中,增加了 $[tokio::main]
之后,实际上等价于如下实现。
tokio::runtime::Builder::new_multi_thread()
.worker_threads(N)
.enable_all()
.build()
.unwrap()
.block_on(async { ... });
如下以简单的休眠为例,其中的 Duration
是 std::time
中的别名,可以直接使用。
use tokio::time::{sleep_until, Instant, Duration};
#[tokio::main]
async fn main() {
sleep_until(Instant::now() + Duration::from_millis(100)).await;
println!("100 ms have elapsed");
}
任务调度
可以通过 spawn()
调度新任务,包含了 tokio::runtime::Runtime
和 tokio::task
两种方式,使用方式如下。
use std::thread;
use std::time::Duration;
fn main() {
let rt = tokio::runtime::Runtime::new().unwrap();
rt.spawn(async move {
println!("Printing in a future (L1).");
tokio::time::sleep(Duration::from_secs(1)).await;
println!("Printing in a future (L2).");
});
thread::sleep(Duration::from_secs(2));
println!("Runtime terminated.");
}
通过 Builder
会返回 runtime
对象,然后在 block_on()
中等待异步任务完成,注意,这只是等待对应任务,并非是等待 runtime
中的所有任务执行完成,因为这里是在 main
函数,所以基本上也等价。
也可以将上述的 rt.spawn()
更换为 rt.block_on()
阻塞执行,这样就不需要在执行 thread::sleep()
了,然后修改 spawn()
的调用。
use std::time::Duration;
fn main() {
let _rt = tokio::runtime::Runtime::new().unwrap();
tokio::spawn(async move {
println!("Printing in a future (L1).");
tokio::time::sleep(Duration::from_secs(1)).await;
println!("Printing in a future (L2).");
});
thread::sleep(Duration::from_secs(2));
println!("Runtime terminated.");
}
此时会报 there is no reactor running, must be called from the context of a Tokio 1.x runtime
的错误,解决方法也很简单,需要显示通过 let _guard = rt.enter()
设置上下文。
其它
Stream
提供了类似 Stream
实现,使用示例如下,需要引入 tokio-stream
包。
use tokio_stream::StreamExt;
#[tokio::main]
async fn main() {
let mut stream = tokio_stream::iter(&[1, 2, 3]);
while let Some(v) = stream.next().await {
println!("{}", v);
}
}
参考
- Tokio Tutorial 官方相关的教程,同时有很多底层的介绍。