简介
核心在 futures::stream
代码中,因为 Rust 需要依赖一个异步运行态,这里使用 Tokio 作为示例。
use futures::StreamExt;
#[tokio::main]
async fn main() {
let mut data = futures::stream::iter(1..10)
.filter(|x| futures::future::ready(x % 2 == 0))
.map(|x| x * 2);
while let Some(x) = data.next().await {
println!("Data {x}");
}
let data = futures::stream::iter(1..10)
.filter(|x| futures::future::ready(x % 2 == 0))
.map(|x| x * 2)
.collect::<Vec<_>>()
.await;
println!("All {:?}", data);
}
其特征在 futures-core/src/stream.rs
中定义,如下。
pub trait Stream {
type Item;
fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>>;
}
示例
Then VS. Map
通过 then()
调用实际上会执行异步函数,而 map()
则只是转换为 Future
类型,接着 buffered()
则需要上述的 Future
类型去并发执行,如下是常用的示例。
use futures::stream::{StreamExt, TryStreamExt};
use std::time::Duration;
use tokio::time::sleep;
async fn hello(idx: usize) -> Vec<usize> {
println!("Hello idx={idx}");
sleep(Duration::from_secs(1)).await;
(idx..idx + 3).collect()
}
#[tokio::main]
async fn main() {
let data = futures::stream::iter(1..3)
.then(|i| hello(i))
//.then(|i| async move { hello(i).await }) // equal
.collect::<Vec<_>>()
.await;
println!("All {:?}", data);
let data = futures::stream::iter(1..5)
.for_each(|c| async move {
println!("{c}");
})
.await;
println!("{:?}", data);
let mut x = 0i32;
let data = stream::repeat(Ok(1))
.try_for_each(|i| {
x += i;
future::ready(if x == 3 { Err(()) } else { Ok(()) })
})
.await;
println!("{:?}", data);
let data = futures::stream::iter(1..5)
.map(|i| async move {
let data = hello(i).await;
if i == 2 {
Err::<_, String>("error".to_string())
} else {
Ok::<_, String>(data)
}
})
//.buffer_unordered(10)
.buffered(10)
.try_for_each(|c| async move {
println!("{:?}", c);
Ok(())
})
.await;
println!("{:?}", data);
}
当返回 Err
时会直接退出。
自定义错误
很多三方库需要自定义错误的实现,可以参考如下示例。
use futures::stream::{StreamExt, TryStreamExt};
use std::collections::HashSet;
use std::time::Duration;
use tokio::time::sleep;
#[derive(Debug)]
pub enum Error {
Internal(String),
}
type Result<T, E = Error> = std::result::Result<T, E>;
async fn hello(idx: usize) -> Result<Vec<usize>> {
println!("Hello idx={idx}");
sleep(Duration::from_secs(1)).await;
Ok((idx..idx + 3).collect())
}
#[tokio::main]
async fn main() {
let value: HashSet<usize> = vec![1usize, 2, 3].into_iter().collect();
//let data = futures::stream::iter(1..5)
let data = futures::stream::iter(value)
.map(|i| async move {
let data = hello(i).await?;
Ok::<_, Error>(data)
})
.buffer_unordered(10) // .buffered(10)
.try_for_each(|c| async move {
println!("{:?}", c);
Ok(())
})
.await;
println!("{:?}", data);
}