rust - 如何通过读取和转换文件来创建Stream?

标签 rust rust-tokio hyper

我正在尝试读取文件,解密文件并返回数据。因为文件可能很大,所以我想在流中执行此操作。

我找不到实现流的好的模式。我正在尝试做这样的事情:

let stream = stream::unfold(decrypted_init_length, |decrypted_length| async move {
    if decrypted_length < start + length {
        let mut encrypted_chunk = vec![0u8; encrypted_block_size];
        match f.read(&mut encrypted_chunk[..]) {
            Ok(size) => {
                if size > 0 {
                    let decrypted = my_decrypt_fn(&encrypted_chunk[..]);
                    let updated_decrypted_length = decrypted_length + decrypted.len();
                    Some((decrypted, updated_decrypted_length))
                } else {
                    None
                }
            }
            Err(e) => {
                println!("Error {}", e);
                None
            }
        }
    } else {
        None
    }
});

问题是上述异步关闭中不允许f.read并出现以下错误:

89  | |             match f.read(&mut encrypted_chunk[..]) {
    | |                   -
    | |                   |
    | |                   move occurs because `f` has type `std::fs::File`, which does not implement the `Copy` trait
    | |                   move occurs due to use in generator

我不想在闭包本身内部打开f。有没有更好的方法来解决此问题?我可以使用其他的 crate 或特征或方法(即,不是stream::unfold)。

最佳答案

我找到了一个解决方案:在here使用async-stream crate 。
stream::unfold对我不起作用的原因之一是async move闭包不允许外部访问mut变量,例如f文件句柄。

现在使用async-stream,我将代码更改为以下代码,并且可以正常工作:(请注意此 crate 添加的yield)。

use async_stream::try_stream;

<snip>

    try_stream! {
        while decrypted_length < start + length {
            match f.read(&mut encrypted_chunk[..]) {
                Ok(size) => 
                    if size > 0 {
                        println!("read {} bytes", size);
                        let decrypted = my_decrypt_fn(&encrypted_chunk[..size], ..);
                        decrypted_length = decrypted_length + decrypted.len();
                        yield decrypted;
                    } else {
                        break
                    }
                Err(e) => {
                    println!("Error {}", e);
                    break
                }
            }
        }
    }

更新:

我发现async-stream有一些限制,我不能忽略。我最终直接实现了Stream,不再使用async-stream。现在我的代码如下所示:
pub struct DecryptFileStream {
    f: File,
    <other_fields>,
}

impl Stream for DecryptFileStream {
    type Item = io::Result<Vec<u8>>;

    fn poll_next(self: Pin<&mut Self>,
                  _cx: &mut Context<'_>) -> Poll<Option<io::Result<Vec<u8>>>> {
         // read the file `f` of self and business_logic
         // 
         if decrypted.len() > 0 {
             Poll::Ready(Some(Ok(decrypted)))
         } else {
             Poll::Ready(None)
         }
    }
}

//. then use the above stream: 

    let stream = DecryptFileStream::new(...);
    Response::new(Body::wrap_stream(stream))

关于rust - 如何通过读取和转换文件来创建Stream?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/60876434/

相关文章:

rust - 如何使用 Tokio 远程关闭正在运行的任务

lambda - 为什么我可以将对非静态局部变量的引用传递给具有“静态绑定(bind)”的函数?

rust - 如何从产生数据 block 的慢速处理侧线程流式传输 super 请求的主体?

rust - 如何在 Rust 中获取向量中的最小值?

multithreading - 有没有办法在 Rust 中生成具有指定生命周期的线程?

rust - 我们如何在 Rust 中检测主机操作系统类型(而不是目标操作系统)?

rust - 如何从 tokio-proto 连接握手中检索信息?

rust - 如何在不使用 tokio_proto crate 的情况下从 tokio TCP 连接读取数据?

rust - 无法读取通过 hyper::client::Client 发出 HTTP 请求的简单负载:不满足特征绑定(bind) `Body: Future`

vector - 如何创建堆栈分配的类似矢量的容器?