rust - 不能在 mpsc::channel 上使用 Stream::take_while: bool: future 不满足

标签 rust future rust-tokio

<分区>

我想在一个线程中运行事件循环并处理来自 UDP 套接字的数据,直到另一个线程发出停止工作的信号。

这对我来说是一项艰巨的任务,所以我想从一个更简单的任务开始: 一个线程启动事件循环并等待另一个线程发出结束信号:

use futures::{future, future::Future, stream::Stream, sync::mpsc};
use std::{io, io::BufRead, thread};

fn main() {
    let (mut tx, rx) = mpsc::channel::<bool>(1);

    let thr = thread::spawn(|| {
        let mut runtime = tokio::runtime::current_thread::Runtime::new().unwrap();
        runtime.spawn(
            future::lazy(|| {
                println!("event loop started");
                Ok(())
            })
            .and_then(rx.take_while(|x| *x == true).into_future()),
        );

        runtime.run()
    });

    let stdin = io::stdin();
    for line in stdin.lock().lines() {
        let line = line.unwrap();
        println!("{}", line);
        if line == "exit" {
            tx.try_send(false).unwrap();
            break;
        }
    }
    thr.join().unwrap().unwrap();
}

此代码无法编译:

error[E0277]: the trait bound `bool: futures::future::Future` is not satisfied
  --> src/main.rs:14:26
   |
14 |             .and_then(rx.take_while(|x| *x == true).into_future()),
   |                          ^^^^^^^^^^ the trait `futures::future::Future` is not implemented for `bool`
   |
   = note: required because of the requirements on the impl of `futures::future::IntoFuture` for `bool`

error[E0599]: no method named `into_future` found for type `futures::stream::take_while::TakeWhile<futures::sync::mpsc::Receiver<bool>, [closure@src/main.rs:14:37: 14:51], bool>` in the current scope
  --> src/main.rs:14:53
   |
14 |             .and_then(rx.take_while(|x| *x == true).into_future()),
   |                                                     ^^^^^^^^^^^
   |
   = note: the method `into_future` exists but the following trait bounds were not satisfied:
           `futures::stream::take_while::TakeWhile<futures::sync::mpsc::Receiver<bool>, [closure@src/main.rs:14:37: 14:51], bool> : futures::stream::Stream`
           `&mut futures::stream::take_while::TakeWhile<futures::sync::mpsc::Receiver<bool>, [closure@src/main.rs:14:37: 14:51], bool> : futures::stream::Stream`

如何修复编译错误?

最佳答案

阅读并理解您尝试使用的方法的文档和函数签名:

fn take_while<P, R>(self, pred: P) -> TakeWhile<Self, P, R>
where
    P: FnMut(&Self::Item) -> R,
    R: IntoFuture<Item = bool, Error = Self::Error>,
    Self: Sized, 

take_while 接受一个闭包,该闭包返回某种必须可转换为 future 的类型; bool 不可转换为 future 。最简单的方法是通过 future::ok :

let thr = thread::spawn(|| {
    let mut runtime = tokio::runtime::current_thread::Runtime::new().unwrap();
    runtime.spawn({
        rx.take_while(|&x| future::ok(x))
            .for_each(|x| {
                println!("{}", x);
                future::ok(())
            })
    });

    runtime.run()
});

另见:

But my problem also in joining future::lazy and rx.take_while

这与您询问的问题无关。再次,我们查看文档,这次是 Future::and_then :

fn and_then<F, B>(self, f: F) -> AndThen<Self, B, F>
where
    F: FnOnce(Self::Item) -> B,
    B: IntoFuture<Error = Self::Error>,
    Self: Sized, 

类似于take_while,它需要一个闭包并且闭包必须返回可以转换为 future 的东西。您的代码不提供闭包。

然后看Stream::into_future .这将返回 a type that implements Future并返回一个元组。元组中的第一项是流中的单个值,第二项是流本身,以允许获取更多值。

为了让所有项目和错误类型正确,我自由使用了 map(drop)map_err(drop) — 你会想要做一些事情更适合您的数据和错误处理。

runtime.spawn({
    future::lazy(|| {
        println!("event loop started");
        Ok(())
    })
    .and_then(|_| {
        rx.take_while(|&x| future::ok(x))
            .into_future()
            .map(drop)
            .map_err(drop)
    })
    .map(drop)
});

真的,你应该只使用 oneshot channel ;这要简单得多:

use futures::{
    future::{self, Future},
    sync::oneshot,
};
use std::thread;

fn main() {
    let (tx, rx) = oneshot::channel();

    let thr = thread::spawn(|| {
        let mut runtime = tokio::runtime::current_thread::Runtime::new().unwrap();

        runtime.spawn({
            future::lazy(|| {
                println!("event loop started");
                Ok(())
            })
            .and_then(|_| rx.map_err(drop))
        });

        runtime.run()
    });

    let lines = ["hello", "goodbye", "exit"];
    for &line in &lines {
        if line == "exit" {
            tx.send(()).unwrap();
            break;
        }
    }

    thr.join().unwrap().unwrap();
}

关于rust - 不能在 mpsc::channel 上使用 Stream::take_while: bool: future 不满足,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/54242732/

相关文章:

rust - 什么是 Rust 类型关键字?

swift - Promise.onSuccess 立即调用

c++ - 在 C++11 中使用 futures、async 和 thread 实现搜索

rust - Rust 单元测试错误 : "The async keyword is missing from the function declaration"

rust-tokio - 如何将超响应正文写入文件?

asynchronous - 从 UdpSocket 异步读取

rust - 如何将不同的类实例分配给 Rust 中的变量?

types - 强类型枚举作为 rust 中的联合 : How to determine type of value and retrieve it; How to do "constructors"

rust - 有没有办法在不重复变量名的情况下将命名参数传递给格式宏?

rust - 匹配 future 类型