rust - 如何复制Rust Stream

标签 rust rust-tokio

我有一个类似于以下程序:

struct MyEvent { /* some fields */ }
struct MyStruct { /* some fields */ }
struct MyStreamer { /* holds some state */ }

impl MyStreamer {
    pub fn stream_objects<'a, 'b: 'a>(
        &'a self,
        event_stream: Pin<Box<dyn Stream<Item = MyEvent> + Send + 'b>>,
    ) -> Pin<Box<dyn Stream<Item = Arc<MyStruct>> + Send + 'a>> { /* implementation */ }
}
目标是处理事件并构建MyStruct流。然后,我有两个使用者使用MyStruct流,而我正努力复制它。
我正在尝试编写以下函数(另请参见我的尝试实现):
pub fn duplicate_stream<'a, 'b: 'a>(
    &'a self,
    struct_stream: Pin<Box<dyn Stream<Item = Arc<MyStruct>> + Send + 'b>>,
) -> (
   Pin<Box<dyn Stream<Item = Arc<MyStruct>> + Send + 'b>>,
   Pin<Box<dyn Stream<Item = Arc<MyStruct>> + Send + 'b>>
) {
        let (s1, r1) = mpsc::unbounded::<Arc<MyStruct>>();
        let (s2, r2) = mpsc::unbounded::<Arc<MyStruct>>();

        let s = s1.fanout(s2);

        let _handle = tokio::spawn(async move { struct_stream.map(Ok).forward(s).await });

        (r1.boxed(), r2.boxed())
}
在这一点上,我被告知以下内容:
    |
155 |         struct_stream: Pin<Box<dyn Stream<Item = Arc<MyStruct>> + Send + 'b>>,
    |                               ----------------------------------------------- this data with lifetime `'b`...
...
165 |         let _handle = tokio::spawn(async move { struct_stream.map(Ok).forward(s).await });
    |                                               ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ ...is captured here...
    |
note: ...and is required to live as long as `'static` here
   --> *file name here*
    |
165 |         let _handle = tokio::spawn(async move { struct_stream.map(Ok).forward(s).await });
    |                       ^^^^^^^^^^^^
我可以删除生存期,但是会推断出static,并且在调用方中出现错误。
我很好奇理解最好的方法是克隆流的所有元素并获得两个相同的流。使用tokio::spawnmpsc channel 似乎需要将static的生存期更改很多。

最佳答案

我认为这是XY problem
编译器是正确的,您可能应该考虑生存期,因为扇出状态还可以。tokio::spawn需要'static,并且您已为'b指定了struct_stream生存期。也许将struct_stream包装在Arc/Rc中?

关于rust - 如何复制Rust Stream,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/65266993/

相关文章:

rust - 指定 Cargo 项目所需的 rustc 版本

rust - 如何在循环中更新可变引用?

rust - 具有阴影和 String -> &str 转换的 'let' 范围

rust - tokio_core::net::UdpCodec 在关联类型上具有生命周期

rust - 如何在循环中生成异步方法?

rust - 某些 tokio 功能的导入未解决

c++ - 简单的 rust 通用/模板添加功能

rust - 运行当前目录之外的 Rust 程序

rust - 什么是 futures::future::lazy?

rust - 可中止 : Dangling Futures?