Rust 异步流式编程详解

2023-09-16 language rust

简介

核心在 futures::stream 代码中,因为 Rust 需要依赖一个异步运行态,这里使用 Tokio 作为示例。

use futures::StreamExt;

#[tokio::main]
async fn main() {
    let mut data = futures::stream::iter(1..10)
        .filter(|x| futures::future::ready(x % 2 == 0))
        .map(|x| x * 2);
    while let Some(x) = data.next().await {
        println!("Data {x}");
    }

    let data = futures::stream::iter(1..10)
        .filter(|x| futures::future::ready(x % 2 == 0))
        .map(|x| x * 2)
        .collect::<Vec<_>>()
        .await;
    println!("All {:?}", data);
}

其特征在 futures-core/src/stream.rs 中定义,如下。

pub trait Stream {
    type Item;
    fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>>;
}

示例

Then VS. Map

通过 then() 调用实际上会执行异步函数,而 map() 则只是转换为 Future 类型,接着 buffered() 则需要上述的 Future 类型去并发执行,如下是常用的示例。

use futures::stream::{StreamExt, TryStreamExt};
use std::time::Duration;
use tokio::time::sleep;

async fn hello(idx: usize) -> Vec<usize> {
    println!("Hello idx={idx}");
    sleep(Duration::from_secs(1)).await;
    (idx..idx + 3).collect()
}

#[tokio::main]
async fn main() {
    let data = futures::stream::iter(1..3)
        .then(|i| hello(i))
        //.then(|i| async move { hello(i).await }) // equal
        .collect::<Vec<_>>()
        .await;
    println!("All {:?}", data);

    let data = futures::stream::iter(1..5)
        .for_each(|c| async move {
            println!("{c}");
        })
        .await;
    println!("{:?}", data);

    let mut x = 0i32;
    let data = stream::repeat(Ok(1))
        .try_for_each(|i| {
            x += i;
            future::ready(if x == 3 { Err(()) } else { Ok(()) })
        })
        .await;
    println!("{:?}", data);

    let data = futures::stream::iter(1..5)
        .map(|i| async move {
            let data = hello(i).await;
            if i == 2 {
                Err::<_, String>("error".to_string())
            } else {
                Ok::<_, String>(data)
            }
        })
        //.buffer_unordered(10)
        .buffered(10)
        .try_for_each(|c| async move {
            println!("{:?}", c);
            Ok(())
        })
        .await;
    println!("{:?}", data);
}

当返回 Err 时会直接退出。

自定义错误

很多三方库需要自定义错误的实现,可以参考如下示例。

use futures::stream::{StreamExt, TryStreamExt};
use std::collections::HashSet;
use std::time::Duration;
use tokio::time::sleep;

#[derive(Debug)]
pub enum Error {
    Internal(String),
}
type Result<T, E = Error> = std::result::Result<T, E>;

async fn hello(idx: usize) -> Result<Vec<usize>> {
    println!("Hello idx={idx}");
    sleep(Duration::from_secs(1)).await;
    Ok((idx..idx + 3).collect())
}

#[tokio::main]
async fn main() {
    let value: HashSet<usize> = vec![1usize, 2, 3].into_iter().collect();

    //let data = futures::stream::iter(1..5)
    let data = futures::stream::iter(value)
        .map(|i| async move {
            let data = hello(i).await?;
            Ok::<_, Error>(data)
        })
        .buffer_unordered(10) // .buffered(10)
        .try_for_each(|c| async move {
            println!("{:?}", c);
            Ok(())
        })
        .await;
    println!("{:?}", data);
}