rust - 如何在不阻塞父任务的情况下在另一个任务中生成长时间运行的 Tokio 任务?

标签 rust rust-tokio

我正在尝试构建一个可以管理来自 websocket 的提要但能够在多个提要之间切换的对象。

有一个 Feed 特征:

trait Feed {
    async fn start(&mut self);
    async fn stop(&mut self);
}

实现 Feed 的结构体有 3 个:ABC

start 被调用时,它会启动一个无限循环,监听来自 websocket 的消息并在收到消息时处理每条消息。

我想实现一个 FeedManager,它维护单个事件提要但可以接收命令以切换它正在使用的提要源。

enum FeedCommand {
    Start(String),
    Stop,
}

struct FeedManager {
    active_feed_handle: tokio::task::JoinHandle,
    controller: mpsc::Receiver<FeedCommand>,
}

impl FeedManager {
    async fn start(&self) {
        while let Some(command) = self.controller.recv().await {
            match command {
                FeedCommand::Start(feed_type) => {
                    // somehow tell the active feed to stop (need channel probably) or kill the task?

                    if feed_type == "A" {
                        // replace active feed task with a new tokio task for consuming feed A
                    } else if feed_type == "B" {
                        // replace active feed task with a new tokio task for consuming feed B
                    } else {
                        // replace active feed task with a new tokio task for consuming feed C
                    }
                }
            }
        }
    }
}

我正在努力了解如何正确管理所有 Tokio 任务。 FeedManager 的核心循环是永远监听传入的新命令,但它需要能够在不阻塞的情况下生成另一个长期存在的任务(以便它可以监听命令)。

我的第一次尝试是:

if feed_type == "A" {
    self.active_feed_handle = tokio::spawn(async {
        A::new().start().await;
    });

    self.active_feed_handle.await
}
  • 句柄上的 .await 会导致核心循环不再接受命令,对吧?
  • 我可以省略最后一个 .await 并让任务继续运行吗?
  • 我是否需要以某种方式清理当前事件的任务?

最佳答案

您可以通过生成任务来生成长时间运行的 Tokio 任务而不阻塞父任务 — 这是任务存在的主要原因。如果你不.await任务,那么你就不会等待任务:

use std::time::Duration;
use tokio::{task, time}; // 1.3.0

#[tokio::main]
async fn main() {
    task::spawn(async {
        time::sleep(Duration::from_secs(100)).await;
        eprintln!(
            "You'll likely never see this printed \
            out because the parent task has exited \
            and so has the entire program"
        );
    });
}

另见:

关于rust - 如何在不阻塞父任务的情况下在另一个任务中生成长时间运行的 Tokio 任务?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/66895709/

相关文章:

rust - 如何在 tokio_core::io::Codec::decode(...) 中实现零拷贝?

rust - 我什么时候应该对新 cargo 使用 --bin 选项?

rust - 有没有办法只在 rustdoc 验证示例时启用 Cargo 功能?

rust - 如何将 min_by_key 或 max_by_key 与迭代期间创建的值的引用一起使用?

reference - 为什么在Rust中需要显式的生存期?

rust - 如何使用reqwest执行并行异步HTTP GET请求?

asynchronous - 在未来的实现中手动轮询流

rust - 你如何访问 Rust 中的枚举值?

rust - 为什么任务超时时不 panic ?

rust - 如何使用 futures::sync::mpsc::channel 实现阻塞队列机制?