java - RxJava : combining multiple streams of objects based on a certain condition

标签 java rx-java

我有多个通知源:A、B、C(它们都生成不同类型的对象,但具有一些共同的属性),当满足特定条件时,我想将它们合并到新的 Stream 中。

流示例:

A: [ ObjectA(id=1), ObjectA(id=2), ObjectA(id=3), ObjectA(id=5), ObjectA(id=4)]
B: [ ObjectB(id=1), ObjectB(id=2), ObjectB(id=3), ObjectB(id=4), ObjectB(id=5)]
C: [ ObjectC(id=1), ObjectC(id=2), ObjectC(id=3), ObjectC(id=4), ObjectC(id=5)]

所需的输出:

Result: [ ObjectABC(id=1), ObjectABC(id=2), ObjectABC(id=3), ObjectA(id=4), ObjectA(id=5)]

ObjectAObjectBObjectC 具有以下属性时,每个 ObjectABC 将被创建并添加到结果流中收到相同的 id。

如果没有找到匹配,它应该等到找到具有相同 Id 的三元组。

在示例中,当收到的当前三元组项目不匹配时(例如第四次迭代 A 的第四个项目的 Id = 5 ),应保留它,直到来自其他流的匹配项目在一段时间后被处理或丢弃时间。

这可以通过 RxJava 实现吗?

最佳答案

我手头没有 IDE,但这应该可以解释逻辑:

// First, extract all of the IDs as they arrive
final Observable<Integer> ids = as.map(i -> i.id).distinct();

// Then, for each ID, extract the ObjectA, ObjectB and ObjectC instances
// and zip them together. 
final Observable<ObjectABC> abcs = ids.flatMap(id -> Observable.zip(
    as.filter(a -> a.id.equals(id)).take(1), 
    bs.filter(b -> b.id.equals(id)).take(1), 
    cs.filter(c -> c.id.equals(id)).take(1),
    (a, b, c) -> new ObjectABC(a, b, c));

如果您想强制排序:

abcs.sort(Comparator.comparing(abc -> abc.id)); 

关于java - RxJava : combining multiple streams of objects based on a certain condition,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/45401165/

相关文章:

java - 应用程序重启时的文件读取偏移量

java - 在java中的链表中删除节点之前如何存储节点?

java - Java+Kotlin Android 项目中 Kotlin 文件中 Unresolved 引用错误

java - 以功能 react 方式合并 2 个列表

java - 我如何将来自不同 observable 的 rx.Observable 搜索结果合并到一个 Observable 中?

java - java中添加到arrayList时,arrayList中的所有元素均为null

java - 你如何修剪 BigDecimal 除法结果

rx-java - 无法解析 rxjava 2 中的 Observable.from 方法

android - RxJava 分别和一起订阅多个 observable

java - RxJava 发出多个错误