我的要求很简单,在很多程序中都是很合理的要求。就是在指定的时间后向我的Channel
发送指定的消息。
我已经检查了 tokio
与 delay
、interval
或 timeout
相关的主题,但没有一个看起来很容易实现。
我现在想到的是 spawn
一个异步任务,然后 wait
或 sleep
一段时间,最后发送消息。
但是,显然,生成异步任务是一项相对繁重的操作。有更好的解决方案吗?
async fn my_handler(sender: mpsc::Sender<i32>, dur: Duration) {
tokio::spawn(async {
time::sleep(dur).await;
sender.send(0).await;
}
}
最佳答案
您可以尝试添加第二个 channel 和一个持续运行的任务来缓冲消息,直到它们被接收。实现这个比听起来更复杂,我希望我在这里处理取消:
fn make_timed_channel<T: Ord + Send + Sync + 'static>() -> (Sender<(Instant, T)>, Receiver<T>) {
// Ord is an unnecessary requirement arising from me stuffing both the Instant and the T into the Binary heap
// You could drop this requirement by using the priority_queue crate instead
let (sender1, receiver1) = mpsc::channel::<(Instant, T)>(42);
let (sender2, receiver2) = mpsc::channel::<T>(42);
let mut receiver1 = Some(receiver1);
tokio::spawn(async move {
let mut buf = std::collections::BinaryHeap::<Reverse<(Instant, T)>>::new();
loop {
// Pretend we're a bounded channel or exit if the upstream closed
if buf.len() >= 42 || receiver1.is_none() {
match buf.pop() {
Some(Reverse((time, element))) => {
sleep_until(time).await;
if sender2.send(element).await.is_err() {
break;
}
}
None => break,
}
}
// We have some deadline to send a message at
else if let Some(Reverse((then, _))) = buf.peek() {
if let Ok(recv) = timeout_at(*then, receiver1.as_mut().unwrap().recv()).await {
match recv {
Some(recv) => buf.push(Reverse(recv)),
None => receiver1 = None,
}
} else {
if sender2.send(buf.pop().unwrap().0 .1).await.is_err() {
break;
}
}
}
// We're empty, wait around
else {
match receiver1.as_mut().unwrap().recv().await {
Some(recv) => buf.push(Reverse(recv)),
None => receiver1 = None,
}
}
}
});
(sender1, receiver2)
}
这是否比生成任务更有效,您必须进行基准测试。 (我对此表示怀疑。Tokio iirc 有一些比 BinaryHeap
更高级的解决方案,用于等待在下一次超时时唤醒,例如)
如果您不需要 Receiver<T>
,您可以进行一项优化但就是.poll().await
可以调用:您可以删除第二个 channel 并维护 BinaryHeap
在自定义接收器中。
关于rust - 如何便宜地发送延迟消息?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/73895393/