stream - 如何通过 io::Write 特征写入来通过 futures Stream 发送数据?

标签 stream rust future

我有一个需要 &mut io::Write 的函数,我想用它从 actix-web 服务器发送流响应,而不必缓冲整个响应。该函数正在“推送”数据,我无法更改该函数(这是这个问题的整个前提)以使用异步流或其他类型的轮询。

目前我被迫使用 &mut Vec (它实现 io::Write)来缓冲整个结果,然后发送 Vec 作为响应主体。但是,响应可能很大,因此我宁愿在不缓冲的情况下对其进行流式传输。

是否有某种适配器可以实现 io::Write,并根据需要进行写入阻塞以响应背压,并与 actix-web 可用于响应的类型兼容(例如 futures::Stream)?

fn generate(output: &mut io::Write) {
    // ...
}

fn request_handler() -> Result<HttpResponse> {
    thread::spawn(|| generate(/*???*/));
    Ok(HttpResponse::Ok().body(/*???*/))
}

std::sync::mpscfutures::mpsc 要么两端异步,要么两端阻塞,因此如何将它们用作同步和异步之间的适配器结束。

最佳答案

这是可能的。关键部分是futures::sink::Wait :

A sink combinator which converts an asynchronous sink to a blocking sink.

Created by the Sink::wait method, this function transforms any sink into a blocking version. This is implemented by blocking the current thread when a sink is otherwise unable to make progress.

所需要做的就是将此类型包装在实现 io::Write 的结构中:

use futures::{
    sink::{Sink, Wait},
    sync::mpsc,
}; // 0.1.26
use std::{io, thread};

fn generate(_output: &mut io::Write) {
    // ...
}

struct MyWrite<T>(Wait<mpsc::Sender<T>>);

impl<T> io::Write for MyWrite<T>
where
    T: for<'a> From<&'a [u8]> + Send + Sync + 'static,
{
    fn write(&mut self, d: &[u8]) -> io::Result<usize> {
        let len = d.len();
        self.0
            .send(d.into())
            .map(|()| len)
            .map_err(|e| io::Error::new(io::ErrorKind::Other, e))
    }

    fn flush(&mut self) -> io::Result<()> {
        self.0
            .flush()
            .map_err(|e| io::Error::new(io::ErrorKind::Other, e))
    }
}

fn foo() -> impl futures::Stream<Item = Vec<u8>, Error = ()> {
    let (tx, rx) = mpsc::channel(5);

    let mut w = MyWrite(tx.wait());

    thread::spawn(move || generate(&mut w));

    rx
}

关于stream - 如何通过 io::Write 特征写入来通过 futures Stream 发送数据?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/55708392/

相关文章:

generics - 有什么方法可以让 Rust 理解 `fn(T) -> impl Future` 总是返回相同的类型?

c++ - std::future 的错误用法?

java - 比较填充有不同种类对象的两个集合

cocoa - 以编程方式录制发送到内置输出的声音,Mac OS X

c# - IPC 使用 Protobuf 和内存映射文件 C#

node.js - 从nodejs中的stdin读取强制将\r\n转换为\n

rust - 使用特征重载方法

rust - macro_rules 是一个普通的宏吗?

Rust 不打印到终端

java - 如何通过内部函数向 Action 函数返回异常?