rust - 如何便宜地发送延迟消息?

标签 rust rust-tokio

我的要求很简单,在很多程序中都是很合理的要求。就是在指定的时间后向我的Channel发送指定的消息。

我已经检查了 tokiodelayintervaltimeout 相关的主题,但没有一个看起来很容易实现。

我现在想到的是 spawn 一个异步任务,然后 waitsleep 一段时间,最后发送消息。

但是,显然,生成异步任务是一项相对繁重的操作。有更好的解决方案吗?

async fn my_handler(sender: mpsc::Sender<i32>, dur: Duration) {
    tokio::spawn(async {
        time::sleep(dur).await;
        sender.send(0).await;
    }
}

最佳答案

您可以尝试添加第二个 channel 和一个持续运行的任务来缓冲消息,直到它们被接收。实现这个比听起来更复杂,我希望我在这里处理取消:

fn make_timed_channel<T: Ord + Send + Sync + 'static>() -> (Sender<(Instant, T)>, Receiver<T>) {
    // Ord is an unnecessary requirement arising from me stuffing both the Instant and the T into the Binary heap
    // You could drop this requirement by using the priority_queue crate instead

    let (sender1, receiver1) = mpsc::channel::<(Instant, T)>(42);
    let (sender2, receiver2) = mpsc::channel::<T>(42);
    let mut receiver1 = Some(receiver1);
    tokio::spawn(async move {
        let mut buf = std::collections::BinaryHeap::<Reverse<(Instant, T)>>::new();
        loop {
            // Pretend we're a bounded channel or exit if the upstream closed
            if buf.len() >= 42 || receiver1.is_none() {
                match buf.pop() {
                    Some(Reverse((time, element))) => {
                        sleep_until(time).await;
                        if sender2.send(element).await.is_err() {
                            break;
                        }
                    }
                    None => break,
                }
            }
            // We have some deadline to send a message at
            else if let Some(Reverse((then, _))) = buf.peek() {
                if let Ok(recv) = timeout_at(*then, receiver1.as_mut().unwrap().recv()).await {
                    match recv {
                        Some(recv) => buf.push(Reverse(recv)),
                        None => receiver1 = None,
                    }
                } else {
                    if sender2.send(buf.pop().unwrap().0 .1).await.is_err() {
                        break;
                    }
                }
            }
            // We're empty, wait around
            else {
                match receiver1.as_mut().unwrap().recv().await {
                    Some(recv) => buf.push(Reverse(recv)),
                    None => receiver1 = None,
                }
            }
        }
    });
    (sender1, receiver2)
}

Playground

这是否比生成任务更有效,您必须进行基准测试。 (我对此表示怀疑。Tokio iirc 有一些比 BinaryHeap 更高级的解决方案,用于等待在下一次超时时唤醒,例如)

如果您不需要 Receiver<T>,您可以进行一项优化但就是.poll().await可以调用:您可以删除第二个 channel 并维护 BinaryHeap在自定义接收器中。

关于rust - 如何便宜地发送延迟消息?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/73895393/

相关文章:

git - 我如何压缩 zlib 数据并找出有多少输入字节?

python - 通过Rust的派生生成Python Tkinter进程不会持续提供stdout

http - 如何在 Rust 中使用 hyper、tokio 和 futures 为 HTTP 请求设置超时?

rust - 如何复制Rust Stream

random - 我可以有效地从 HashSet 中随机抽样吗?

rust - 为什么在使用非文字模式时无法访问此匹配模式?

return - 无限循环中的返回和中断有什么区别?

rust - 使用 Hyper 同时获取多个 URL

rust - 从函数返回 future 值

rust - 如何监控reqwest客户端上传进度