rust - 跨多个异步调用超时的最佳方法?

标签 rust rust-tokio

不是询问如何在单个超时中包装同时进行的多个异步调用,如下所示:

tokio::time::timeout(
    Duration::from_millis(10), 
    tokio::join!(call1, call2, ...)
).await

上下文是我有一个异步 fn,它由一个循环组成,该循环进行一系列异步调用,并且根据这些调用的返回值,它可以处于两种状态之一:A 和非 A。

async fn foo() {
    // boolean (true when in state A, false otherwise)
    let mut state_a = true;
    loop {
        state_a = call_1().await;  // for simplicity the results are all bools
        state_a = call_2().await;
        ...
        state_a = call_n().await;
    }
}

当处于状态A时,一切都是稳定的。但是,当处于非 A 状态时,返回状态 A 有一个截止时间(例如 5 秒),如果时间过去,那么我们必须继续(返回到循环开始)。

向此代码添加计时器逻辑以实现上述目标的最简洁方法是什么?我能想到的就是在未来设置一个 tokio::time::Timeout 5 秒,每个调用都包含在其中。但是每次进入状态 not-A 时我都必须启动它并且还要在每次调用时检查我们是否处于状态 A 来决定是否换行,代码很快就会变得异常困惑。有没有更干净的方法?

最佳答案

这是使用常规控制流的一个。状态存储在timeout中,None代表A,Some代表not-A。 (playground)

use core::future::Future;
use core::time::Duration;
use tokio::time::timeout_at;
use tokio::time::Instant;

pub async fn foo() {
    let mut timeout: Option<Instant> = None;
    loop {
        if !run_and_update_timeout(&mut timeout, call_1()).await {
            continue;
        }
        if !run_and_update_timeout(&mut timeout, call_2()).await {
            continue;
        }
        if !run_and_update_timeout(&mut timeout, call_n()).await {
            continue;
        }
    }
}

/// Returns true when the next future should run, returns false when the loop should be continued.
async fn run_and_update_timeout<F>(timeout: &mut Option<Instant>, future: F) -> bool
where
    F: Future<Output = bool>,
{
    if let &mut Some(t) = timeout {
        // In state !A, timeout active
        let call_or_timeout = timeout_at(t, future).await;
        match call_or_timeout {
            // Future completed first and succeeded, clear timeout
            Ok(true) => *timeout = None,
            // Future completed and failed, move on to next future
            Ok(false) => (),
            // Timeout finished first, continue the loop
            Err(_) => {
                *timeout = None;
                return false;
            }
        }
    } else {
        // In state A, no timeout active
        if !future.await {
            // If this future fails, activate timeout
            *timeout = Some(Instant::now() + Duration::from_secs(5));
        }
    }
    true
}

与 Chayim 相比,此方法的优点是每个 future 只检查一次状态,而不是在每个 Pending 上检查一次,但作为交换,状态在其中一个 future 内无法更改.

您的问题中没有指定的一件事是继续时状态会发生什么。我写这个是为了将状态重置为 A,但您可能想保留 not-A 并仅重置计时器。

关于rust - 跨多个异步调用超时的最佳方法?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/76631005/

相关文章:

rust - 使用rust 生命周期说明符的问题

rust - 如何在 .unzip() 返回的每个迭代器上使用 .collect()?

rust - Rust借阅检查器是否在本地或全局分析程序?

Rust 与 Futures 和 Tokio 并发执行

rust - IntoIterator 作为函数参数不接受适配器结构

rust - 借用内容的作业

rust - 如何实现 future 功能的流

rust - tokio::spawn以“静态生存期”捕获

tcp - 在Tokio中使用TcpStream读取碎片化的TCP数据包

rust - 在删除 Rust Future 时 panic 运行异步代码