我有一个类似于以下程序:
struct MyEvent { /* some fields */ }
struct MyStruct { /* some fields */ }
struct MyStreamer { /* holds some state */ }
impl MyStreamer {
pub fn stream_objects<'a, 'b: 'a>(
&'a self,
event_stream: Pin<Box<dyn Stream<Item = MyEvent> + Send + 'b>>,
) -> Pin<Box<dyn Stream<Item = Arc<MyStruct>> + Send + 'a>> { /* implementation */ }
}
目标是处理事件并构建MyStruct
流。然后,我有两个使用者使用MyStruct
流,而我正努力复制它。我正在尝试编写以下函数(另请参见我的尝试实现):
pub fn duplicate_stream<'a, 'b: 'a>(
&'a self,
struct_stream: Pin<Box<dyn Stream<Item = Arc<MyStruct>> + Send + 'b>>,
) -> (
Pin<Box<dyn Stream<Item = Arc<MyStruct>> + Send + 'b>>,
Pin<Box<dyn Stream<Item = Arc<MyStruct>> + Send + 'b>>
) {
let (s1, r1) = mpsc::unbounded::<Arc<MyStruct>>();
let (s2, r2) = mpsc::unbounded::<Arc<MyStruct>>();
let s = s1.fanout(s2);
let _handle = tokio::spawn(async move { struct_stream.map(Ok).forward(s).await });
(r1.boxed(), r2.boxed())
}
在这一点上,我被告知以下内容: |
155 | struct_stream: Pin<Box<dyn Stream<Item = Arc<MyStruct>> + Send + 'b>>,
| ----------------------------------------------- this data with lifetime `'b`...
...
165 | let _handle = tokio::spawn(async move { struct_stream.map(Ok).forward(s).await });
| ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ ...is captured here...
|
note: ...and is required to live as long as `'static` here
--> *file name here*
|
165 | let _handle = tokio::spawn(async move { struct_stream.map(Ok).forward(s).await });
| ^^^^^^^^^^^^
我可以删除生存期,但是会推断出static
,并且在调用方中出现错误。我很好奇理解最好的方法是克隆流的所有元素并获得两个相同的流。使用
tokio::spawn
和mpsc
channel 似乎需要将static
的生存期更改很多。
最佳答案
我认为这是XY problem
编译器是正确的,您可能应该考虑生存期,因为扇出状态还可以。tokio::spawn
需要'static
,并且您已为'b
指定了struct_stream
生存期。也许将struct_stream
包装在Arc/Rc
中?
关于rust - 如何复制Rust Stream,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/65266993/