我试图了解如何实现对不同类型的多个 future 进行轮询。对于上下文,我调用的 API 将返回类似以下内容:
[{"type": "source_a", "id": 123}, {"type": "source_b", "id": 234}, ...]
每个type
API 响应中需要调用另一个 API,每个 API 返回不同的数据类型。我编写的代码的工作原理如下:
async fn get_data(sources: Vec<Source>) -> Data {
let mut data = Default::default();
for source in sources {
if source.kind == "source_a" {
let source_data = get_source_a(source).await;
process_source_a(source_data, &mut data);
} else if source.kind == "source_b" {
...
}
}
data
}
这不会同时运行,它只会一次获取一个源并处理它们。我如何重写它,以便同时获取每个源,然后在数据可用时进行处理?说到 Rustily,我想我想要的是执行一个可变借用的闭包 data
当 future 准备好时。我应该看类似 Arc<RefCell<Data>>
的东西吗? ?
最佳答案
要并行处理 future,您需要等待类似 join_all
的内容。 ,这将同时运行它们并在它们全部完成后返回。为此,您必须解决两个问题:
join_all
需要相同类型的 future,因此您需要将它们装箱或以其他方式统一它们。data
需要被多个异步 block 访问,因此需要通过Arc
和Mutex
来保护。
第一个问题可以简单地通过将异步 fns 作为任务生成来解决,这具有潜在并行运行它们的额外优势(除了它们同时运行之外)。下面的示例使用 tokio::spawn
,但对于 async_std
几乎完全相同。由于没有可重现的示例,我无法测试代码,但它可能如下所示:
async fn get_data(sources: Vec<Source>) -> Data {
let data = Arc::new(Mutex::new(Data::default()));
let mut tasks = vec![];
for source in sources {
if source.kind == "source_a" {
let data = Arc::clone(&data);
tasks.push(tokio::task::spawn(async move {
let source_data = get_source_a(source).await;
process_source_a(source_data, &mut data.lock().unwrap());
}));
} else if source.kind == "source_b" {
// ...
}
}
// Wait for all sources to finish and propagate the panic if any.
// With async_std this wouldn't require the `for_each()`.
futures::future::join_all(tasks)
.await
.for_each(|x| x.unwrap());
// As all tasks are done, there should be no references to `data` at
// this point, so we can extract it out of the Arc<Mutex<_>> wrapping.
data.try_unwrap().unwrap().into_inner()
}
关于rust - 轮询许多不同类型的 future,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/66810034/