我使用 combineLatest() 组合 3 个可观察数据流。所有这些组合在一起,以便同时显示 UI 中的所有数据。现在,有一种情况,其中一个可观察对象不会发出任何东西,因为获取的数据可能为空。
是否有 RxJava 运算符让订阅者知道不会因为空数据而发出任何消息?
编辑
private fun retrieveData() {
Observable.combineLatest(getCurrentUser.execute(), getLatestGoal.execute(), getLatestLog.execute(),
Function3<User, Goal, Log, PersonalViewModel> { user, goal, log -> mapToViewModel(user, goal, log) })
.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.doOnSubscribe { /*todo: animation*/ }
.doOnNext { view.setViewModel(it) }
.doOnComplete { view.stopLoading() }
.doOnError { /*todo: error message*/ }
.subscribe()
}
第三个流:当用户有 nog 日志时,getLatestLog.execute() 什么也不发出。当此流不发出时,整个 View 将不可见。
数据是从 FireBase 实时数据库中获取的。 ChildEventListener 有一个如下所示的方法:
override fun onChildAdded(dataSnapshot: DataSnapshot?, p1: String?) {
val log = dataSnapshot?.getValue(Log::class.java)
log?.let { subscriber.onNext(it) }
subscriber.onComplete()
firebaseDatabase.reference.removeEventListener(this)
}
最佳答案
如果您手头有 Java8 或一些 Optionals,您可以使用这个结构:
@Test
void name() {
TestScheduler scheduler = new TestScheduler();
Observable<Optional<Integer>> o1$ =
Observable.just(Optional.ofNullable(4)).mergeWith(Observable.never());
Observable<Optional<Integer>> o2$ =
Observable.just(Optional.ofNullable(2)).mergeWith(Observable.never());
Observable<Optional<Integer>> o3$ =
Observable.<Optional<Integer>>never()
.timeout(1000, TimeUnit.MILLISECONDS, scheduler)
.onErrorResumeNext(
throwable -> {
return Observable.<Optional<Integer>>never()
.mergeWith(Observable.just(Optional.empty()));
});
Observable<Tuple3<Optional<Integer>, Optional<Integer>, Optional<Integer>>> result =
Observable.combineLatest(
o1$,
o2$,
o3$,
(integer, integer2, integer3) -> Tuple.of(integer, integer2, integer3))
.filter(t -> t._1.isPresent() && t._2.isPresent() && t._3.isPresent());
TestObserver<Tuple3<Optional<Integer>, Optional<Integer>, Optional<Integer>>> test =
result.test();
scheduler.advanceTimeTo(10000, TimeUnit.SECONDS);
test.assertNotComplete().assertNoErrors().assertNoValues();
}
你可能不会,空值不允许通过 observables-pipelines 发出。因此,我们需要一些其他构造来表示 null。在 Java8 中有一个名为 Optional 的构造(vavr 称它为 Option -> 也称为 Java8)。
在这个例子中,o3$-Observable 不会发射任何东西。它也可能出错,也许这更像你的情况。我们将捕获错误(在这种情况下:超时异常)并返回一个带有 Optional.empty 的 Observable。
在组合回调中,我们组合了所有三个值。在后面的步骤中,我们过滤掉所有具有有效值的元组(具有值的可选)。
只有当所有三个值都用一个值发出时,您才会发出一个值。
当您不能使用 Optional-class 时,您还可以定义一个 INVALID-Object,如下例所示:
class So51217041 {
private static Integer INVALID_VALUE = 42;
@Test
void name() {
Observable<Integer> o1$ = Observable.just(4).mergeWith(Observable.never());
Observable<Integer> o2$ = Observable.just(2).mergeWith(Observable.never());
Observable<Integer> o3$ =
Observable.<Integer>never()
.onErrorResumeNext(
throwable -> {
return Observable.<Integer>never().mergeWith(Observable.just(INVALID_VALUE));
});
Observable<Tuple3<Integer, Integer, Integer>> result =
Observable.combineLatest(
o1$,
o2$,
o3$,
(integer, integer2, integer3) -> Tuple.of(integer, integer2, integer3))
.filter(t -> t._3 != INVALID_VALUE); // yeah I know, I want to compare reference, not the content
TestObserver<Tuple3<Integer, Integer, Integer>> test = result.test();
test.assertNotComplete().assertNoErrors().assertNoValues();
}
}
此外,当您希望流以 INVALID 或 NULL 开始时,CombineLatest 至少发出一个值,您可以使用 Observable#startWith(INVALID) 或 Observable#startWith(Optional.empty())。这将保证可观察对象至少会发出一个值。
关于java - RxJava : how to handle combineLatest() when one of the streams emits nothing,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/51217041/