rust - 轮询许多不同类型的 future

标签 rust async-await

我试图了解如何实现对不同类型的多个 future 进行轮询。对于上下文,我调用的 API 将返回类似以下内容:

[{"type": "source_a", "id": 123}, {"type": "source_b", "id": 234}, ...]

每个type API 响应中需要调用另一个 API,每个 API 返回不同的数据类型。我编写的代码的工作原理如下:

async fn get_data(sources: Vec<Source>) -> Data {
    let mut data = Default::default();

    for source in sources {
        if source.kind == "source_a" {
            let source_data = get_source_a(source).await;

            process_source_a(source_data, &mut data);
        } else if source.kind == "source_b" {
            ...
        }
    }

    data
}

这不会同时运行,它只会一次获取一个源并处理它们。我如何重写它,以便同时获取每个源,然后在数据可用时进行处理?说到 Rustily,我想我想要的是执行一个可变借用的闭包 data当 future 准备好时。我应该看类似 Arc<RefCell<Data>> 的东西吗? ?

最佳答案

要并行处理 future,您需要等待类似 join_all 的内容。 ,这将同时运行它们并在它们全部完成后返回。为此,您必须解决两个问题:

  • join_all 需要相同类型的 future,因此您需要将它们装箱或以其他方式统一它们。
  • data需要被多个异步 block 访问,因此需要通过ArcMutex来保护。

第一个问题可以简单地通过将异步 fns 作为任务生成来解决,这具有潜在并行运行它们的额外优势(除了它们同时运行之外)。下面的示例使用 tokio::spawn ,但对于 async_std 几乎完全相同。由于没有可重现的示例,我无法测试代码,但它可能如下所示:

async fn get_data(sources: Vec<Source>) -> Data {
    let data = Arc::new(Mutex::new(Data::default()));
    let mut tasks = vec![];

    for source in sources {
        if source.kind == "source_a" {
            let data = Arc::clone(&data);
            tasks.push(tokio::task::spawn(async move {
                let source_data = get_source_a(source).await;
                process_source_a(source_data, &mut data.lock().unwrap());
            }));
        } else if source.kind == "source_b" {
            // ...
        }
    }

    // Wait for all sources to finish and propagate the panic if any.
    // With async_std this wouldn't require the `for_each()`.
    futures::future::join_all(tasks)
        .await
        .for_each(|x| x.unwrap());

    // As all tasks are done, there should be no references to `data` at
    // this point, so we can extract it out of the Arc<Mutex<_>> wrapping.
    data.try_unwrap().unwrap().into_inner()
}

关于rust - 轮询许多不同类型的 future,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/66810034/

相关文章:

c# - Task.Continuewith(在当前线程上)

javascript - 异步函数里面有for of循环和await,它会等待吗?

Javascript,一个带有回调处理程序的对象,如何将其重写为await/async风格?

math - 在 rust : what is `-inf` ? 上计算时我有 `-inf`

rust - 赋予调用者对彼此依赖的局部变量的所有权

rust - String.iter() 在 Rust 中做什么?

c# - 调用异步方法时到底发生了什么?

rust - 使用泛型时无法借用自己的结构成员作为可变成员

assembly - 如何获取 VESA BIOS 信息

c# - 如何使多个 API 调用更快?