Rust future -- 将两个 future 结合在一起

标签 rust

我有这个代码

use futures::Map;
use futures::sink::SendAll;
use futures::sink::SinkFromErr;
use futures::stream::Forward;
use futures::sync::mpsc::Receiver;
use futures::sync::mpsc::Sender;
use futures::{Future, Stream, Sink};
use std::boxed::FnBox;
use tokio_core::reactor::Core;
use websocket::async::futures::stream::SplitSink;
use websocket::async::futures::stream::SplitStream;
use websocket::ClientBuilder;
use websocket;

pub fn main(recv: Receiver<String>, send: Sender<websocket::OwnedMessage>) -> Box<FnBox() -> () + Send> {
    Box::new(move || {
        let mut core = Core::new().unwrap();
        let handle = core.handle();

        let f = ClientBuilder::new("wss://...")
            .unwrap()
            .async_connect(None, &handle)
            .from_err::<Error>()
            .map(|(duplex, _)| duplex.split())
            .and_then(|(sink, stream): (SplitSink<_>, SplitStream<_>)| {

                let writer: Map<SendAll<SinkFromErr<SplitSink<_>, _>, _>, _> =
                    sink
                    .sink_from_err()
                    .send_all(recv.map(websocket::OwnedMessage::Text).map_err(Error::Receiver))
                    .map(|_| ());

                // Trying to uncomment these lines:
                // let reader =
                //     stream
                //     .forward(send);
                //
                // reader.join(writer)

                // Comment this out:
                writer
            });

        core.run(f).expect("Unable to run");
    })
}

quick_error! {
    #[derive(Debug)]
    pub enum Error {
        WebSocket(err: websocket::WebSocketError) {
            from()
            description("websocket error")
            display("WebSocket error: {}", err)
            cause(err)
        }
        Receiver(err: ()) {
            description("receiver error")
            display("Receiver error")
        }
    }
}

为了清楚起见,我添加了一些类型注释。这个版本编译,但我想要另一个 future ,从流 (stream) 读取并写入 send。我不能让它编译,我输入的错误是完全无法理解的。所以我的问题是:

  1. 如何编译 forward() 调用? (尝试启用注释掉的代码)
  2. 您是如何想出编译的代码的?以我的经验,由于类型太复杂且类型错误难以理解,因此无法编写和理解大量 futures 代码。

最佳答案

我需要两次 map_err 调用,一次用于映射来自 stream 的 websocket 错误,一次用于映射来自 send 的发送者错误:

pub fn main(recv: Receiver<String>, send: Sender<websocket::OwnedMessage>) -> Box<FnBox() -> () + Send> {
    Box::new(move || {
        let mut core = Core::new().unwrap();
        let handle = core.handle();

        let f = ClientBuilder::new("wss://...")
            .unwrap()
            .async_connect(None, &handle)
            .from_err::<Error>()
            .map(|(duplex, _)| duplex.split())
            .and_then(|(sink, stream): (SplitSink<_>, SplitStream<_>)| {

                let writer: Map<SendAll<SinkFromErr<SplitSink<_>, _>, _>, _> =
                    sink
                    .sink_from_err()
                    .send_all(recv.map(websocket::OwnedMessage::Text).map_err(Error::Receiver))
                    .map(|_| ());

                let reader =
                    stream
                    .map_err(Error::WebSocket)
                    .forward(send.sink_map_err(Error::Sender));

                reader.join(writer)
            });

        core.run(f).expect("Unable to run");
    })
}

关于Rust future -- 将两个 future 结合在一起,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/46569399/

相关文章:

rust - "Sized is not implemented"是什么意思?

rust - 需要拥有 self 所有权的典型用例是什么?

ecmascript-6 - 如何在不每次都初始化模块的情况下在 Vue 组件中加载 WASM 模块?

rust - 如何使用io::Cursor和字节顺序读取混合的二进制/文本文件?

enums - 如何结合枚举变量和值的测试?

rust - 如何在 Rust 中为二叉树编写删除函数?

rust - 迭代递归结构时无法获取可变引用 : cannot borrow as mutable more than once at a time

rust - 如何在 Iron 中使用服务器发送的事件?

floating-point - 如何获取包含 float 的迭代器的最小值或最大值?

json - Rust Serde - 是否可以将两种不同布局中的 json 数据映射回单个结构?