asynchronous - 当 future 包装它被丢弃时,如何停止运行同步代码?

标签 asynchronous rust future

我有调用同步代码的异步代码需要一段时间才能运行,所以我遵循了 What is the best approach to encapsulate blocking I/O in future-rs? 中概述的建议。 .但是,我的异步代码有一个超时,之后我不再对同步计算的结果感兴趣:

use std::{thread, time::Duration};
use tokio::{task, time}; // 0.2.10

// This takes 1 second
fn long_running_complicated_calculation() -> i32 {
    let mut sum = 0;
    for i in 0..10 {
        thread::sleep(Duration::from_millis(100));
        eprintln!("{}", i);
        sum += i;
        // Interruption point
    }
    sum
}

#[tokio::main]
async fn main() {
    let handle = task::spawn_blocking(long_running_complicated_calculation);
    let guarded = time::timeout(Duration::from_millis(250), handle);

    match guarded.await {
        Ok(s) => panic!("Sum was calculated: {:?}", s),
        Err(_) => eprintln!("Sum timed out (expected)"),
    }
}

运行此代码显示超时触发,但同步代码 还有继续运行:

0
1
Sum timed out (expected)
2
3
4
5
6
7
8
9

当 future 包装它被丢弃时,如何停止运行同步代码?

我不希望编译器能够神奇地停止我的同步代码。我用“中断点”注释了一行,我需要手动进行某种检查以提前退出我的函数,但我不知道如何轻松获得 spawn_blocking 的结果的通知(或 ThreadPool::spawn_with_handle ,对于纯基于 future 的代码)已被删除。

最佳答案

您可以传递一个原子 bool 值,然后使用它来将任务标记为需要取消。 (我不确定我是否为 Ordering/load 调用使用了合适的 store,这可能需要更多考虑)

这是您的代码的修改版本,输出

0
1
Sum timed out (expected)
2
Interrupted...

use std::sync::atomic::{AtomicBool, Ordering};
use std::sync::Arc;
use std::{thread, time::Duration};
use tokio::{task, time}; // 0.2.10

// This takes 1 second
fn long_running_complicated_calculation(flag: &AtomicBool) -> i32 {
    let mut sum = 0;
    for i in 0..10 {
        thread::sleep(Duration::from_millis(100));
        eprintln!("{}", i);
        sum += i;
        // Interruption point
        if !flag.load(Ordering::Relaxed) {
            eprintln!("Interrupted...");
            break;
        }
    }
    sum
}

#[tokio::main]
async fn main() {
    let some_bool = Arc::new(AtomicBool::new(true));

    let some_bool_clone = some_bool.clone();
    let handle =
        task::spawn_blocking(move || long_running_complicated_calculation(&some_bool_clone));
    let guarded = time::timeout(Duration::from_millis(250), handle);

    match guarded.await {
        Ok(s) => panic!("Sum was calculated: {:?}", s),
        Err(_) => {
            eprintln!("Sum timed out (expected)");
            some_bool.store(false, Ordering::Relaxed);
        }
    }
}

playground

在当前 Tokio 删除 future /句柄时自动发生这种情况是不可能的。 http://github.com/tokio-rs/tokio/issues/1830 中正在为此做一些工作。和 http://github.com/tokio-rs/tokio/issues/1879 .

但是,您可以通过将 future 包装在自定义类型中来获得类似的东西。

这是一个看起来与原始代码几乎相同的示例,但在模块中添加了一个简单的包装器类型。如果我实现 Future<T> 会更符合人体工程学在包装类型上,它只是转发到包裹的 handle ,但事实证明这很烦人。
mod blocking_cancelable_task {
    use std::sync::atomic::{AtomicBool, Ordering};
    use std::sync::Arc;
    use tokio::task;

    pub struct BlockingCancelableTask<T> {
        pub h: Option<tokio::task::JoinHandle<T>>,
        flag: Arc<AtomicBool>,
    }

    impl<T> Drop for BlockingCancelableTask<T> {
        fn drop(&mut self) {
            eprintln!("Dropping...");
            self.flag.store(false, Ordering::Relaxed);
        }
    }

    impl<T> BlockingCancelableTask<T>
    where
        T: Send + 'static,
    {
        pub fn new<F>(f: F) -> BlockingCancelableTask<T>
        where
            F: FnOnce(&AtomicBool) -> T + Send + 'static,
        {
            let flag = Arc::new(AtomicBool::new(true));
            let flag_clone = flag.clone();
            let h = task::spawn_blocking(move || f(&flag_clone));
            BlockingCancelableTask { h: Some(h), flag }
        }
    }

    pub fn spawn<F, T>(f: F) -> BlockingCancelableTask<T>
    where
        T: Send + 'static,
        F: FnOnce(&AtomicBool) -> T + Send + 'static,
    {
        BlockingCancelableTask::new(f)
    }
}

use std::sync::atomic::{AtomicBool, Ordering};
use std::{thread, time::Duration};
use tokio::time; // 0.2.10

// This takes 1 second
fn long_running_complicated_calculation(flag: &AtomicBool) -> i32 {
    let mut sum = 0;
    for i in 0..10 {
        thread::sleep(Duration::from_millis(100));
        eprintln!("{}", i);
        sum += i;
        // Interruption point
        if !flag.load(Ordering::Relaxed) {
            eprintln!("Interrupted...");
            break;
        }
    }
    sum
}

#[tokio::main]
async fn main() {
    let mut h = blocking_cancelable_task::spawn(long_running_complicated_calculation);
    let guarded = time::timeout(Duration::from_millis(250), h.h.take().unwrap());
    match guarded.await {
        Ok(s) => panic!("Sum was calculated: {:?}", s),
        Err(_) => {
            eprintln!("Sum timed out (expected)");
        }
    }
}

playground

关于asynchronous - 当 future 包装它被丢弃时,如何停止运行同步代码?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/59977693/

相关文章:

javascript - 如何将函数值存储到变量nodejs服务器get-folder-size中

rust - Rust无法识别借用在循环结束时结束

rust - 为什么 Iterator::take_while 取得迭代器的所有权?

java - 有没有办法让线程池线程退出处理给定的任务?

php - FaceBook 注册插件异步验证表单(检查用户名可用性)

c# - 一次设置最大运行任务时等待多个异步任务

javascript - 如何使用 WebGL 异步加载图像、创建纹理、渲染和保存图像?

macos - 将 Rust 应用程序从 macOS 交叉编译到 Raspberry Pi 2 时出现 "linking with arm-linux-gnueabihf-gcc failed"

asynchronous - 在未来的实现中手动轮询流

scala - 如何等待多个 future ?