我有一个循环,我在其中做一些工作,并使用Sender
发送结果。工作需要时间,如果失败,我需要重试。重试时,接收器可能已关闭,重试可能会浪费时间。因此,我需要一种无需发送消息即可检查Receiver
是否可用的方法。
在理想的世界中,我希望我的代码在伪代码中看起来像这样:
let (tx, rx) = tokio::sync::mpsc::channel(1);
tokio::spawn(async move {
// do som stuff with rx and drop it after some time
rx.recv(...).await;
});
let mut attempts = 0;
loop {
if tx.is_closed() {
break;
}
if let Ok(result) = do_work().await {
attempts = 0;
let _ = tx.send(result).await;
} else {
if attempts >= 10 {
break;
} else {
attempts += 1;
continue;
}
}
};
问题在于Sender
没有is_closed
方法。它确实具有pub fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), ClosedError>>
,但是我不知道Context
是什么,或者在哪里可以找到它。当我没有发送值时,如何检查发送者是否可以发送?
最佳答案
Sender
具有try_send方法:
Attempts to immediately send a message on this Sender
This method differs from send by returning immediately if the channel's buffer is full or no receiver is waiting to acquire some data. Compared with send, this function has two failure cases instead of one (one for disconnection, one for a full buffer).
使用它而不是
send
并检查错误:if let Err(TrySendError::Closed(_)) = tx.send(result).await {
break;
}
可以通过使用
poll_fn
crate 中的futures
来执行所需的操作。它将返回Poll
的函数改编为返回Future
use futures::future::poll_fn; // 0.3.5
use std::future::Future;
use tokio::sync::mpsc::{channel, error::ClosedError, Sender}; // 0.2.22
use tokio::time::delay_for; // 0.2.22
fn wait_until_ready<'a, T>(
sender: &'a mut Sender<T>,
) -> impl Future<Output = Result<(), ClosedError>> + 'a {
poll_fn(move |cx| sender.poll_ready(cx))
}
#[tokio::main]
async fn main() {
let (mut tx, mut rx) = channel::<i32>(1);
tokio::spawn(async move {
// Receive one value and close the channel;
let val = rx.recv().await;
println!("{:?}", val);
});
wait_until_ready(&mut tx).await.unwrap();
tx.send(123).await.unwrap();
wait_until_ready(&mut tx).await.unwrap();
delay_for(std::time::Duration::from_secs(1)).await;
tx.send(456).await.unwrap(); // 456 likely never printed out,
// despite having a positive readiness response
// and the send "succeeding"
}
但是请注意,在一般情况下,这容易受到TOCTOU的影响。即使Sender
的poll_ready
在 channel 中保留了一个插槽供以后使用,也可能在准备检查和实际发送之间关闭了接收端。我试图在代码中指出这一点。
关于rust - 如何查找tokio::sync::mpsc::Receiver是否已关闭?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/64100173/