timer - 如何使用 Tokio 产生许多可取消的计时器?

标签 timer rust rust-tokio

如何使用 Tokio 实现固定数量的计时器,这些计时器会跨线程定期重置和取消?当计时器到期时,将执行回调。

类似于 Go 的 time.AfterFunc 的 API 本质上是我想要的:

package main

import (
    "fmt"
    "time"
)

func main() {
    t := time.AfterFunc(time.Hour, func() {
        // happens every 2 seconds with 1 second delay
        fmt.Println("fired")
    })

    for {
        t.Reset(time.Second)
        time.Sleep(time.Second * 2)
    }
}

我发现实现(足够)相似 API 的唯一 crate 是 timer它通过产生 2 个线程以一种非常幼稚的方式实现。当计时器经常重置时,这很快就会变得令人望而却步。

显而易见的答案是使用 Tokio,问题是如何优雅地做到这一点。

一个选项是每次更新计时器时生成一个新的绿色线程,并使用原子取消前一个计时器,方法是在这个原子上调节回调的执行,例如这个伪 Rust:

tokio::run({
    // for every timer spawn with a new "cancel" atomic
    tokio::spawn({
        Delay::new(Instant::now() + Duration::from_millis(1000))
            .map_err(|e| panic!("timer failed; err={:?}", e))
            .and_then(|_| {
                if !cancelled.load(Ordering::Acquire) {
                    println!("fired");
                }
                Ok(())
            })
    })
})

问题是我维护已经取消的计时器的状态,可能持续几分钟。此外,它看起来并不优雅。

除了tokio::time::Delaytokio::time::DelayQueue似乎也适用。特别是,通过使用从“插入”返回的 Key 引用它们来重置和取消计时器的能力。

不清楚如何在多线程应用程序中使用这个库,即:

The return value represents the insertion and is used at an argument to remove and reset. Note that Key is token and is reused once value is removed from the queue either by calling poll after when is reached or by calling remove. At this point, the caller must take care to not use the returned Key again as it may reference a different item in the queue.

这会在通过其键取消计时器的任务与使用 DelayQueue 流中的计时器事件的任务之间创建竞争条件——导致 panic 或取消不相关的计时器。

最佳答案

您可以将来自 futures-rs 的 Select 组合器与 Tokio 一起使用。它返回第一个完成的 future 的结果,然后忽略/停止轮询另一个。

作为第二个 future ,我们可以使用来自 oneshot::channel 的接收器来创建信号以完成我们的组合器 future 。

use futures::sync::oneshot;
use futures::*;
use std::thread;
use std::time::{Duration, Instant};
use tokio::timer::Delay;

fn main() {
    let (interrupter, interrupt_handler) = oneshot::channel::<()>();

    //signal to cancel delayed call
    thread::spawn(move || {
        thread::sleep(Duration::from_millis(500)); //increase this value more than 1000ms to see is delayed call is working or not.
        interrupter
            .send(())
            .expect("Not able to cancel delayed future");
    });

    let delayed = Delay::new(Instant::now() + Duration::from_millis(1000))
        .map_err(|e| panic!("timer failed; err={:?}", e))
        .and_then(|_| {
            println!("Delayed Call Executed!");

            Ok(())
        });

    tokio::run(delayed.select(interrupt_handler).then(|_| Ok(())));
}

Playground

关于timer - 如何使用 Tokio 产生许多可取消的计时器?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/57357978/

相关文章:

Android,添加许多带计时器的按钮

ios - 使用自定义 Timer NSObject 在你的 ios 应用程序中计时事件

rust - 为返回的结果指定错误类型(在SinkExt.with的上下文中)

oop - 有没有办法将 Trait 实现和防御拆分到不同的模块中?

rust - 我是否需要使用某种内部可变性来创建两个Arc之间的循环?

Rust 与 Futures 和 Tokio 并发执行

rust - 如何在warp中将传入流写入文件?

java - 选择 JCombobox 时 GUI 卡住,如何在 ActionListener 中使用条件语句在 Swing 应用程序中使用计时器?

jquery - 如何仅在经过一定时间后才显示消息 jquery - 阻止表单提交

rust - 更新 BTreeSet 中的结构