java - 为什么我的 RxJava Observable 除非阻塞,否则不会发出或完成?

标签 java rx-java reactivex

背景

我有许多 RxJava Observables(从 Jersey 客户端生成,或者使用 Observable.just(someObject) 生成)。它们都应该只发出一个值。我有一个模拟所有 Jersey 客户端并使用 Observable.just(someObject) 的组件测试,我看到了与运行生产代码时相同的行为。

我有几个类作用于这些 observables,执行一些计算(以及一些副作用 - 我可能会让它们稍后直接返回值)并返回空的 void observables。

有一次,在一个这样的类中,我试图压缩我的几个源可观察量然后映射它们 - 如下所示:

public Observable<Void> doCalculation() {
    return Observable.zip(
        getObservable1(),
        getObservable2(),
        getObservable3(),
        UnifyingObject::new
    ).concatMap(unifyingObject -> unifyingObject.processToNewObservable())
}

// in Unifying Object
public Observable<Void> processToNewObservable() {
    // ... do some calculation ...
    return Observable.empty();
}

计算类然后全部合并并等待:

// Wait for rule computations to complete
List<Observable<Void>> calculations = ...;
Observable.zip(calculations, results -> results)
        .toBlocking().lastOrDefault(null);

问题

问题是,processToNewObservable() 永远不会被执行。通过消除过程,我可以看到问题出在 getObservable1() - 如果我用 Observable.just(null) 替换它,一切都会按照我想象的那样执行(但在我想要一个真实值的地方有一个空值)。

重申一下,getObservable1() 在生产代码中从 Jersey 客户端返回一个 Observable,但该客户端是一个 Mockito 模拟,在我的代码中返回 Observable.just(someValue)测试。

调查

如果我将 getObservable1() 转换为阻塞,然后将第一个值包装在 just() 中,同样,一切都按照我的想象执行(但我不不想引入阻塞步骤):

Observable.zip(
    Observable.just(getObservable1().toBlocking().first()),
    getObservable2(),
    getObservable3(),
    UnifyingObject::new
).concatMap(unifyingObject -> unifyingObject.processToNewObservable())

我的第一个想法是,也许其他东西正在消耗我的 observable 发出的值,而 zip 看到它已经完成,因此确定压缩它们的结果应该是一个空的 observable .我已经尝试将 .cache() 添加到我认为相关的每个可观察源上,但是,这并没有改变行为。

我还尝试在 zip 之前在 getObservable1 上添加 next/error/complete/finally 处理程序(不将其转换为阻塞),但它们都没有执行:

getObservable1()
    .doOnNext(...)
    .doOnCompleted(...)
    .doOnError(...)
    .finallyDo(...);

Observable.zip(
    getObservable1(),
    getObservable2(),
    getObservable3(),
    UnifyingObject::new
).concatMap(unifyingObject -> unifyingObject.processToNewObservable())

问题

我是 RxJava 的新手,所以我很确定我错过了一些基本的东西。问题是:我能做什么蠢事?如果从我目前所说的来看这不是很明显,我可以做些什么来帮助诊断问题?

最佳答案

Observable 必须发射才能启动链。您必须将管道视为 Observable 发射时将发生什么的声明。

您没有分享实际观察到的内容,但 Observable.just() 导致 Observable 立即发出包装对象。

关于java - 为什么我的 RxJava Observable 除非阻塞,否则不会发出或完成?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/34257631/

相关文章:

android - RecyclerView 使用 RxJava 无限滚动

rx-java - RXJava/Kotlin - 链接单个结果为一个

javascript - 在函数返回之前访问它自己的对象属性

java - RxJava - 为什么执行器只使用一个线程

javascript - 如何在 rxjs 中为每个订阅在可观察管道上执行一次初始化逻辑

java - POSTed JSON 中的实体关系与 Spring Boot

java - scribe 在 oauth 2.0 中不支持 refresh_token 对吗?

java - 无法解析方法值?

android - 使用 RxJava 在 Activity 之间共享 Observable 并取消订阅

java - intellij 中的 URL myweb_war_exploded 出现问题