rust - 如何查找tokio::sync::mpsc::Receiver是否已关闭?

标签 rust future rust-tokio

我有一个循环,我在其中做一些工作,并使用Sender发送结果。工作需要时间,如果失败,我需要重试。重试时,接收器可能已关闭,重试可能会浪费时间。因此,我需要一种无需发送消息即可检查Receiver是否可用的方法。
在理想的世界中,我希望我的代码在伪代码中看起来像这样:

let (tx, rx) = tokio::sync::mpsc::channel(1);

tokio::spawn(async move {
   // do som stuff with rx and drop it after some time
    rx.recv(...).await;
});

let mut attempts = 0;
loop {
    if tx.is_closed() {
       break;
    }
    if let Ok(result) = do_work().await {
        attempts = 0;
        let _ = tx.send(result).await;
    } else {
        if attempts >= 10 {
            break;
        } else {
            attempts += 1;
            continue;
        }
    }
};
问题在于Sender没有is_closed方法。它确实具有pub fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), ClosedError>>,但是我不知道Context是什么,或者在哪里可以找到它。
当我没有发送值时,如何检查发送者是否可以发送?

最佳答案

Sender具有try_send方法:

Attempts to immediately send a message on this Sender

This method differs from send by returning immediately if the channel's buffer is full or no receiver is waiting to acquire some data. Compared with send, this function has two failure cases instead of one (one for disconnection, one for a full buffer).


使用它而不是send并检查错误:
if let Err(TrySendError::Closed(_)) = tx.send(result).await {
    break;
}

可以通过使用poll_fn crate 中的futures来执行所需的操作。它将返回Poll的函数改编为返回Future
use futures::future::poll_fn; // 0.3.5
use std::future::Future;
use tokio::sync::mpsc::{channel, error::ClosedError, Sender}; // 0.2.22
use tokio::time::delay_for; // 0.2.22

fn wait_until_ready<'a, T>(
    sender: &'a mut Sender<T>,
) -> impl Future<Output = Result<(), ClosedError>> + 'a {
    poll_fn(move |cx| sender.poll_ready(cx))
}

#[tokio::main]
async fn main() {
    let (mut tx, mut rx) = channel::<i32>(1);

    tokio::spawn(async move {
        // Receive one value and close the channel;
        let val = rx.recv().await;
        println!("{:?}", val);
    });

    wait_until_ready(&mut tx).await.unwrap();
    tx.send(123).await.unwrap();

    wait_until_ready(&mut tx).await.unwrap();
    delay_for(std::time::Duration::from_secs(1)).await;
    tx.send(456).await.unwrap(); // 456 likely never printed out,
                                 // despite having a positive readiness response
                                 // and the send "succeeding"
}
但是请注意,在一般情况下,这容易受到TOCTOU的影响。即使Senderpoll_ready在 channel 中保留了一个插槽供以后使用,也可能在准备检查和实际发送之间关闭了接收端。我试图在代码中指出这一点。

关于rust - 如何查找tokio::sync::mpsc::Receiver是否已关闭?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/64100173/

相关文章:

java - 使循环多线程昂贵,Java

java - 在 Java 中链接可变数量的 Promise (CompletableFuture)

rust - 闭包参数的生命周期注解

rust - 检查宏中定义的项目中的功能标志

multithreading - Rust 中大量线程的性能下降

rust - 如何使用 tokio async TcpStream 将 bevy 游戏连接到外部 TCP 服务器?

rust - 如何共享tokio::net::TcpStream同时作用于它?

generics - 用泛型全面实现特征

java - 并行计算

rust - 如何使用 Rust 跟踪获取/存储跨度持续时间?