背景
我有许多 RxJava Observables(从 Jersey 客户端生成,或者使用 Observable.just(someObject)
生成)。它们都应该只发出一个值。我有一个模拟所有 Jersey 客户端并使用 Observable.just(someObject)
的组件测试,我看到了与运行生产代码时相同的行为。
我有几个类作用于这些 observables,执行一些计算(以及一些副作用 - 我可能会让它们稍后直接返回值)并返回空的 void observables。
有一次,在一个这样的类中,我试图压缩我的几个源可观察量然后映射它们 - 如下所示:
public Observable<Void> doCalculation() {
return Observable.zip(
getObservable1(),
getObservable2(),
getObservable3(),
UnifyingObject::new
).concatMap(unifyingObject -> unifyingObject.processToNewObservable())
}
// in Unifying Object
public Observable<Void> processToNewObservable() {
// ... do some calculation ...
return Observable.empty();
}
计算类然后全部合并并等待:
// Wait for rule computations to complete
List<Observable<Void>> calculations = ...;
Observable.zip(calculations, results -> results)
.toBlocking().lastOrDefault(null);
问题
问题是,processToNewObservable()
永远不会被执行。通过消除过程,我可以看到问题出在 getObservable1()
- 如果我用 Observable.just(null)
替换它,一切都会按照我想象的那样执行(但在我想要一个真实值的地方有一个空值)。
重申一下,getObservable1()
在生产代码中从 Jersey 客户端返回一个 Observable,但该客户端是一个 Mockito 模拟,在我的代码中返回 Observable.just(someValue)
测试。
调查
如果我将 getObservable1()
转换为阻塞,然后将第一个值包装在 just()
中,同样,一切都按照我的想象执行(但我不不想引入阻塞步骤):
Observable.zip(
Observable.just(getObservable1().toBlocking().first()),
getObservable2(),
getObservable3(),
UnifyingObject::new
).concatMap(unifyingObject -> unifyingObject.processToNewObservable())
我的第一个想法是,也许其他东西正在消耗我的 observable 发出的值,而 zip
看到它已经完成,因此确定压缩它们的结果应该是一个空的 observable .我已经尝试将 .cache()
添加到我认为相关的每个可观察源上,但是,这并没有改变行为。
我还尝试在 zip 之前在 getObservable1 上添加 next/error/complete/finally 处理程序(不将其转换为阻塞),但它们都没有执行:
getObservable1()
.doOnNext(...)
.doOnCompleted(...)
.doOnError(...)
.finallyDo(...);
Observable.zip(
getObservable1(),
getObservable2(),
getObservable3(),
UnifyingObject::new
).concatMap(unifyingObject -> unifyingObject.processToNewObservable())
问题
我是 RxJava 的新手,所以我很确定我错过了一些基本的东西。问题是:我能做什么蠢事?如果从我目前所说的来看这不是很明显,我可以做些什么来帮助诊断问题?
最佳答案
Observable 必须发射才能启动链。您必须将管道视为 Observable 发射时将发生什么的声明。
您没有分享实际观察到的内容,但 Observable.just() 导致 Observable 立即发出包装对象。
关于java - 为什么我的 RxJava Observable 除非阻塞,否则不会发出或完成?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/34257631/