rx-java - RxJava Observable 基于内容的缓冲区

标签 rx-java observable

我使用 vertX 和 RxJava 启动了一个项目,但遇到了一个问题,但没有找到解决方案。

我有一个 Observable,它为传入通信发出 WebSocketFrame, 每个 WebSocketFrame 都由有效负载(ByteBuffer)和指示它是消息的第一帧还是最后一帧的标志组成。

我想对此 Observable 进行操作,将其转换为发出包含每条消息的所有帧的 ByteBufferd 的 Observable。

我尝试了 buffer 方法,但它似乎被设计为按任意标准(时间或其他可观察的)重新组合项目。

另一种方法似乎是使用 compose 订阅 WebSocketFrame observable,添加到非结束帧上的缓冲区,并在结束帧上“馈送”ByteBuffer Observable。但我不知道如何手动创建和提供缓冲区。

因此,如果有人已经看到这个问题(恕我直言,这似乎很常见)并且对 RxJava 有足够的了解来提出实现方案,我将不胜感激。

感谢您的阅读。

最佳答案

我猜你必须使用 buffer运算符(也许你可以使用更简单的 buffer ,但我不确定)。另请参阅this other question涵盖大致相同的主题和 this GitHub page进行更多讨论。希望这对您有帮助!

关于rx-java - RxJava Observable 基于内容的缓冲区,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/38308160/

相关文章:

java - 如何终止一个 Observable?

android - 如何在 RxJava 中刷新 observable?

Angular 2 Observable with ChangeDetectionStrategy.OnPush 和异步管道不工作

java - Rx-java 2 : How to set priority between Observables?

Android AsyncTask vs 线程 + 处理程序 vs rxjava

android - RxJava Android toMap 运算符停止执行

javascript - 如何从 Angular 中的 BehaviouralSubject 中提取值

angular - Observable.forkJoin 和数组参数

python - 有没有办法将回调与列表的更改相关联?

ngOnDestroy 中缺少带有 takeUntil : What happens when . next() 的 Angular Observable 销毁