java - RxJava : combine two optional observables

标签 java rx-java reactive-programming rx-java2 reactive

我有两个 Observable,我们称它们为 PeanutButterJelly。我想将它们组合成一个 Sandwich Observable。我可以使用:

Observable<PeanutButter> peanutButterObservable = ...;
Observable<Jelly> jellyObservable = ...;
Observable<Sandwich> sandwichObservable = Observable.combineLatest(
    peanutButterObservable,
    jellyObservable,
    (pb, j) -> makeSandwich(pb, j))

问题是 RX 在发出第一个组合的 Sandwich 之前等待第一个 PeanutButter 和第一个 Jelly 被发出,但是 Jelly 可能永远不会发出,这意味着我永远不会得到第一个 Sandwich

我想合并这两个提要,以便在任一提要发出第一个项目时立即发出组合项目,而不管另一个提要是否尚未发出任何东西,我如何在 RxJava 中做到这一点?

最佳答案

一种可能的方法是使用 startWith 运算符在订阅时从每个流中触发已知值的发射。这样 combineLatest() 将在任一流发出值时触发。您只需要注意在 onNext 消费者中寻找初始/信号值。

像这样...:

@Test
public void sandwiches() {
    final Observable<String> peanutButters = Observable.just("chunky", "smooth")
        .startWith("--initial--");

    final Observable<String> jellies = Observable.just("strawberry", "blackberry", "raspberry")
        .startWith("--initial--");

    Observable.combineLatest(peanutButters, jellies, (peanutButter, jelly) -> {
        return new Pair<>(peanutButter, jelly);
    })
    .subscribe(
        next -> {
            final String peanutButter = next.getFirst();
            final String jelly = next.getSecond();

            if(peanutButter.equals("--initial--") && jelly.equals("--initial--")) {
                // initial emissions
            } else if(peanutButter.equals("--initial--")) {
                // jelly emission
            } else if(jelly.equals("--initial--")) {
                // peanut butter emission
            } else {
                // peanut butter + jelly emissions
            }
        },
        error -> {
            System.err.println("## onError(" + error.getMessage() + ")");
        },
        () -> {
            System.out.println("## onComplete()");
        }
    );
}

关于java - RxJava : combine two optional observables,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/47931923/

相关文章:

java - 为什么 RxJava Observable.doOnDispose 会触发两次?

java - RxJava 棘手的 startWith(Observable)

programming-languages - 函数响应式(Reactive)编程的 'Signal' 表示是否正确?

java - 如何使用嵌入式 JavaFx 小程序更新 Swing 中的饼图 - 空点异常

java - Log4j 挂起我的应用程序

java - 从照片中裁剪自定义形状

java - 从 Java 中的子字符串中高效解析整数

android - RXJava 处理嵌套调用

android - RxJava 链请求和更新 UI

meteor - 类似 Angular 的客户端数据绑定(bind)和与 Meteor 的 react 性?