我有两个 Observable
,我们称它们为 PeanutButter
和 Jelly
。我想将它们组合成一个 Sandwich
Observable
。我可以使用:
Observable<PeanutButter> peanutButterObservable = ...;
Observable<Jelly> jellyObservable = ...;
Observable<Sandwich> sandwichObservable = Observable.combineLatest(
peanutButterObservable,
jellyObservable,
(pb, j) -> makeSandwich(pb, j))
问题是 RX 在发出第一个组合的 Sandwich
之前等待第一个 PeanutButter
和第一个 Jelly
被发出,但是 Jelly
可能永远不会发出,这意味着我永远不会得到第一个 Sandwich
。
我想合并这两个提要,以便在任一提要发出第一个项目时立即发出组合项目,而不管另一个提要是否尚未发出任何东西,我如何在 RxJava 中做到这一点?
最佳答案
一种可能的方法是使用 startWith
运算符在订阅时从每个流中触发已知值的发射。这样 combineLatest()
将在任一流发出值时触发。您只需要注意在 onNext
消费者中寻找初始/信号值。
像这样...:
@Test
public void sandwiches() {
final Observable<String> peanutButters = Observable.just("chunky", "smooth")
.startWith("--initial--");
final Observable<String> jellies = Observable.just("strawberry", "blackberry", "raspberry")
.startWith("--initial--");
Observable.combineLatest(peanutButters, jellies, (peanutButter, jelly) -> {
return new Pair<>(peanutButter, jelly);
})
.subscribe(
next -> {
final String peanutButter = next.getFirst();
final String jelly = next.getSecond();
if(peanutButter.equals("--initial--") && jelly.equals("--initial--")) {
// initial emissions
} else if(peanutButter.equals("--initial--")) {
// jelly emission
} else if(jelly.equals("--initial--")) {
// peanut butter emission
} else {
// peanut butter + jelly emissions
}
},
error -> {
System.err.println("## onError(" + error.getMessage() + ")");
},
() -> {
System.out.println("## onComplete()");
}
);
}
关于java - RxJava : combine two optional observables,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/47931923/