stream - 我如何在 Rust 中使用 `flatmap` 流?

标签 stream rust rust-actix

我有一个 rusoto_core::ByteStream 它实现了 futures' Stream trait :

let chunks = vec![b"1234".to_vec(), b"5678".to_vec()];
let stream = ByteStream::new(stream::iter_ok(chunks));

我想将其传递给 actix_web's HttpResponseBuilder::streaming 方法。

use actix_web::dev::HttpResponseBuilder; // 0.7.18
use rusoto_core::ByteStream; // 0.36.0

fn example(stream: ByteStream, builder: HttpResponseBuilder) {
    builder.streaming(stream);
}

当我尝试这样做时,我收到以下错误:

error[E0271]: type mismatch resolving `<rusoto_core::stream::ByteStream as futures::stream::Stream>::Item == bytes::bytes::Bytes`
 --> src/main.rs:5:13
  |
5 |     builder.streaming(stream);
  |             ^^^^^^^^^ expected struct `std::vec::Vec`, found struct `bytes::bytes::Bytes`
  |
  = note: expected type `std::vec::Vec<u8>`
             found type `bytes::bytes::Bytes`

我相信原因是streaming()期望一个 S: Stream<Item = Bytes, Error> (即 Item = Bytes )但是我的 ByteStreamItem = Vec<u8> .我该如何解决?

我认为解决方案是 flatmap我的ByteStream不知何故,但我找不到这样的流方法。

这是一个例子 streaming()可以使用:

let text = "123";
let (tx, rx_body) = mpsc::unbounded();
let _ = tx.unbounded_send(Bytes::from(text.as_bytes()));

HttpResponse::Ok()
    .streaming(rx_body.map_err(|e| error::ErrorBadRequest("bad request")))

最佳答案

How can I flatmap streams in Rust?

平面映射将迭代器的迭代器转换为单个迭代器(或流而不是迭代器)。

future 0.3

Futures 0.3 没有直接平面图,但它有 StreamExt::flatten , 可以在 StreamExt::map 之后使用.

use futures::{stream, Stream, StreamExt}; // 0.3.1

fn into_many(i: i32) -> impl Stream<Item = i32> {
    stream::iter(0..i)
}

fn nested() -> impl Stream<Item = i32> {
    let stream_of_number = into_many(5);
    let stream_of_stream_of_number = stream_of_number.map(into_many);
    let flat_stream_of_number = stream_of_stream_of_number.flatten();

    // Returns: 0, 0, 1, 0, 1, 2, 0, 1, 2, 3
    flat_stream_of_number
}

future 0.1

Futures 0.1 没有直接平面图,但它有 Stream::flatten , 可以在 Stream::map 之后使用.

use futures::{stream, Stream}; // 0.1.25

fn into_many(i: i32) -> impl Stream<Item = i32, Error = ()> {
    stream::iter_ok(0..i)
}

fn nested() -> impl Stream<Item = i32, Error = ()> {
    let stream_of_number = into_many(5);
    let stream_of_stream_of_number = stream_of_number.map(into_many);
    let flat_stream_of_number = stream_of_stream_of_number.flatten();

    // Returns: 0, 0, 1, 0, 1, 2, 0, 1, 2, 3
    flat_stream_of_number
}

但是,这并不能解决您的问题

streaming() expects a S: Stream<Item = Bytes, Error> (i.e., Item = Bytes) but my ByteStream has Item = Vec<u8>

是的,这就是问题所在。使用 Bytes::from 通过 Stream::map 转换您的流 Item从一种类型到另一种类型:

use bytes::Bytes; // 0.4.11
use futures::Stream; // 0.1.25 

fn example(stream: ByteStream, mut builder: HttpResponseBuilder) {
    builder.streaming(stream.map(Bytes::from));
}

关于stream - 我如何在 Rust 中使用 `flatmap` 流?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/54441319/

相关文章:

delphi - 将GIF从资源加载到动态表单

php - 通过 PHP 提供 C 程序,无需写入/上传权限

rust - 如何创建和写入内存映射文件?

rust - 如何在actix_web单元测试中获取响应的主体?

rust - 如何访问 Actix-web 中 Future 中的 HttpRequest 数据?

VB.NET 应用程序卡住且未运行

c# - 如何循环读取文件的每一行?

rust - 如何进行批量渲染?

pointers - 分配给 *mut T 和 &mut T 有什么区别?

rust - 如何在 actix-web 中执行异步函数?