rust - 从AsyncRead转发到 future 0.3 mpsc::UnboundedSender <T>时的错误处理

标签 rust rust-tokio

我想将实现tokio::process::child::ChildStdouttokio::io::AsyncRead的输出转发到实现futures::channel::mpsc::UnboundedSender<MyType>futures::sink::Sink
我正在使用一个定制的编解码器,该编解码器会生成MyType的项目,但为了忠于MRE中的M,我将使用Tokio的LinesCodec并针对这个问题说MyType = String

use futures::StreamExt; // 0.3.8
use tokio; // 1.0.1
use tokio_util; // 0.6.0

#[tokio::main]
pub async fn main() {
    let (mut tx, rx) = futures::channel::mpsc::unbounded::<String>();

    let mut process = tokio::process::Command::new("dmesg")
        .arg("-w")
        .stdout(std::process::Stdio::piped())
        .spawn()
        .unwrap();

    let stdout = process.stdout.take().unwrap();
    let codec = tokio_util::codec::LinesCodec::new();
    let framed_read = tokio_util::codec::FramedRead::new(stdout, codec);

    let forward = framed_read.forward(tx);

    // read from the other end of the channel
    tokio::spawn(async move {
        while let Some(line) = rx.next().await {
            eprintln!("{}", line);
        }
    });

    //forward.await;
}
但是,编译器报告错误类型不匹配:
error[E0271]: type mismatch resolving `<futures::channel::mpsc::UnboundedSender<String> as futures::Sink<String>>::Error == LinesCodecError`
  --> src/main.rs:19:31
   |
19 |     let forward = framed_read.forward(tx);
   |                               ^^^^^^^ expected struct `futures::channel::mpsc::SendError`, found enum `LinesCodecError`
假设我在这里没有做根本上错误的事情,如何正确处理/转换这些错误类型?
似乎有一个similar question asked before,但似乎是相反的情况, future 0.1,我怀疑它可能已经过时了,因为Rust异步生态系统中的变化如此之快。

最佳答案

流中的项目可能会失败(LinesCodecError),并且将值发送到 channel 可能会失败(SendError),但是整个转发过程只会导致单个错误类型。
您可以使用 SinkExt::sink_err_into TryStreamExt::err_into 将错误转换为兼容的统一类型。在这里,我选择了Box<dyn Error>:

type Error = Box<dyn std::error::Error>;

let forward = framed_read.err_into::<Error>().forward(tx.sink_err_into::<Error>());
在许多情况下,您将创建一个自定义错误类型。您也可能不需要像上面的示例那样使用turbofish,因为类型推断可能会在某个时候开始。
也可以看看:
  • How do you define custom `Error` types in Rust?
  • 关于rust - 从AsyncRead转发到 future 0.3 mpsc::UnboundedSender <T>时的错误处理,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/65703532/

    相关文章:

    generics - 取决于特征的通用实现

    rust - 即使我有#[tokio::main],为什么仍然出现错误 “there is no reactor running, must be called from the context of Tokio runtime”?

    postgresql - 如何防止BB8连接在多次重复后断开

    rust - 特征 `std::future::Future` 未针对 `std::result::Result<reqwest::Response, reqwest::Error>` 实现

    loops - 在 Rust 中是否有与 JavaScript 的 forEach 等效的东西?

    rust - 为什么 tokio 在以奇怪的方式调用时只运行 n 个任务中的 n-1 个?

    rust - 如何从 Rust 中的 crate 为整个程序设置目标三元组?

    rust - clap::App 多次移动所有权的方法调用

    rust - 如何通过读取和转换文件来创建Stream?