我使用 vertX 和 RxJava 启动了一个项目,但遇到了一个问题,但没有找到解决方案。
我有一个 Observable,它为传入通信发出 WebSocketFrame, 每个 WebSocketFrame 都由有效负载(ByteBuffer)和指示它是消息的第一帧还是最后一帧的标志组成。
我想对此 Observable 进行操作,将其转换为发出包含每条消息的所有帧的 ByteBufferd 的 Observable。
我尝试了 buffer
方法,但它似乎被设计为按任意标准(时间或其他可观察的)重新组合项目。
另一种方法似乎是使用 compose
订阅 WebSocketFrame observable,添加到非结束帧上的缓冲区,并在结束帧上“馈送”ByteBuffer Observable。但我不知道如何手动创建和提供缓冲区。
因此,如果有人已经看到这个问题(恕我直言,这似乎很常见)并且对 RxJava 有足够的了解来提出实现方案,我将不胜感激。
感谢您的阅读。
最佳答案
我猜你必须使用 buffer
运算符(也许你可以使用更简单的 buffer
,但我不确定)。另请参阅this other question涵盖大致相同的主题和 this GitHub page进行更多讨论。希望这对您有帮助!
关于rx-java - RxJava Observable 基于内容的缓冲区,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/38308160/