rust - 在异步任务中读取 stdin 时为 "blocking annotated I/O must be called from the context of the Tokio runtime"

标签 rust rust-tokio

我正在尝试从异步任务中的标准输入读取数据,该任务由 tokio::spawn 生成。这 执行者被包装为

let mut executor = tokio::runtime::current_thread::Runtime::new().unwrap();

然后使用 executor.task(...) 运行主任务,这会产生其他任务 使用 tokio::spawn()

fn main 然后调用 executor.run().unwrap(); 等待所有任务完成。

问题是我什么时候做

let mut stdin = tokio::io::stdin();
let mut read_buf: [u8; 1024] = [0; 1024];
...
stdin.read(&mut read_buf).await

我收到“必须从 Tokio 运行时的上下文调用阻塞注释 I/O”错误。

依赖关系:

futures-preview = { version = "0.3.0-alpha.18",  features = ["async-await", "nightly"] }
tokio = "0.2.0-alpha.2"
tokio-net = "0.2.0-alpha.2"
tokio-sync = "0.2.0-alpha.2"

完整代码:

extern crate futures;
extern crate tokio;
extern crate tokio_net;
extern crate tokio_sync;

use std::io::Write;
use std::net::SocketAddr;
use tokio::io::AsyncReadExt;
use tokio::net::tcp::split::{TcpStreamReadHalf, TcpStreamWriteHalf};
use tokio::net::TcpListener;
use tokio_sync::oneshot;

use futures::select;

use futures::future::FutureExt;

#[derive(Debug)]
enum AppErr {
    CantBindAddr(std::io::Error),
    CantAccept(std::io::Error),
}

fn main() {
    let mut executor = tokio::runtime::current_thread::Runtime::new().unwrap();

    executor.spawn(async {
        match server_task().await {
            Ok(()) => {}
            Err(err) => {
                println!("Error: {:?}", err);
            }
        }
    });

    executor.run().unwrap(); // ignores RunError
}

async fn server_task() -> Result<(), AppErr> {
    let addr: SocketAddr = "127.0.0.1:8080".parse().unwrap();
    let mut listener = TcpListener::bind(&addr).map_err(AppErr::CantBindAddr)?;

    loop {
        print!("Waiting for incoming connection...");
        let _ = std::io::stdout().flush();
        let (socket, _) = listener.accept().await.map_err(AppErr::CantAccept)?;
        println!("{:?} connected.", socket);
        let (read, write) = socket.split();

        let (abort_in_task_snd, abort_in_task_rcv) = oneshot::channel();
        let (abort_out_task_snd, abort_out_task_rcv) = oneshot::channel();

        tokio::spawn(handle_incoming(read, abort_in_task_rcv, abort_out_task_snd));
        tokio::spawn(handle_outgoing(
            write,
            abort_out_task_rcv,
            abort_in_task_snd,
        ));
    }
}

async fn handle_incoming(
    mut conn: TcpStreamReadHalf,
    abort_in: oneshot::Receiver<()>,
    abort_out: oneshot::Sender<()>,
) {
    println!("handle_incoming");

    let mut read_buf: [u8; 1024] = [0; 1024];
    let mut abort_in_fused = abort_in.fuse();

    loop {
        select! {
            abort_ret = abort_in_fused => {
                // TODO match abort_ret {..}
                println!("abort signalled, handle_incoming returning");
                return;
            },
            bytes = conn.read(&mut read_buf).fuse() => {
                match bytes {
                    Err(io_err) => {
                        println!("io error when reading input stream: {:?}", io_err);
                        return;
                    }
                    Ok(bytes) => {
                        println!("read {} bytes: {:?}", bytes, &read_buf[0..bytes]);
                    }
                }
            }
        }
    }
}

async fn handle_outgoing(
    conn: TcpStreamWriteHalf,
    abort_in: oneshot::Receiver<()>,
    abort_out: oneshot::Sender<()>,
) {
    println!("handle_outgoing");

    let mut stdin = tokio::io::stdin();
    let mut read_buf: [u8; 1024] = [0; 1024];
    let mut abort_in_fused = abort_in.fuse();

    loop {
        select! {
            abort_ret = abort_in_fused => {
                println!("abort signalled, handle_outgoing returning");
                return;
            }
            input = stdin.read(&mut read_buf).fuse() => {
                match input {
                    Err(io_err) => {
                        println!("io error when reading stdin: {:?}", io_err);
                        return;
                    }
                    Ok(bytes) => {
                        println!("handle_outgoing read {} bytes", bytes);
                        // TODO
                    }
                }
            },
        }
    }
}

问题:

  • 我的任务生成正确吗?我可以在 main 中安全地执行 tokio::spawn 吗? 任务传递给 executor.spawn()?
  • 我在这个程序中读取标准输入的方式有什么问题?

谢谢

最佳答案

Tokio stdin 阻塞了执行器池中的封闭线程,因为它被注释为来自 tokio-executor 的 blockingFrom the reference :

When the blocking function enters, it hands off the responsibility of processing the current work queue to another thread.

您的代码无法正常工作,因为您使用的执行器在单个线程中多路复用任务(tokio::runtime::current_thread::Runtime::new())。因此将没有其他线程为执行者执行其他任务。

如果你正确配置你的运行时(多线程线程池)你的代码将工作正常:

fn main() {
    let rt = tokio::runtime::Runtime::new().unwrap();
    let mut executor = rt.executor();

    executor.spawn(async {
        match server_task().await {
            Ok(()) => {}
            Err(err) => {
                println!("Error: {:?}", err);
            }
        }
    });

    rt.shutdown_on_idle();
}

另请参阅: How can I stop reading from a tokio::io::lines stream?

关于rust - 在异步任务中读取 stdin 时为 "blocking annotated I/O must be called from the context of the Tokio runtime",我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/57590175/

相关文章:

rust - 如何从产生数据 block 的慢速处理侧线程流式传输 super 请求的主体?

rust - 匹配来自外部库的依赖于版本的枚举值

rust - 如何为给定类型选择不同的 std::cmp::Ord(或其他特征)实现?

c++ - Rust 找不到 Microsoft C++ 构建工具

rust - 可转换为 isize 的枚举的特征

Rust 异步循环函数阻止另一个 future 执行

hash - 为什么 hash() 和 hasher.write() 的结果不一样?

asynchronous - 如何在 Tokio future 链的多个分支中使用 TcpStream?

rust - 如何使用 Tokio 远程关闭正在运行的任务

rust - 如何通过读取和转换文件来创建Stream?