java - RxJava : how to handle combineLatest() when one of the streams emits nothing

标签 java android rx-java rx-java2 rx-android

我使用 combineLatest() 组合 3 个可观察数据流。所有这些组合在一起,以便同时显示 UI 中的所有数据。现在,有一种情况,其中一个可观察对象不会发出任何东西,因为获取的数据可能为空。

是否有 RxJava 运算符让订阅者知道不会因为空数据而发出任何消息?

编辑

private fun retrieveData() {
    Observable.combineLatest(getCurrentUser.execute(), getLatestGoal.execute(), getLatestLog.execute(),
            Function3<User, Goal, Log, PersonalViewModel> { user, goal, log -> mapToViewModel(user, goal, log) })
            .subscribeOn(Schedulers.io())
            .observeOn(AndroidSchedulers.mainThread())
            .doOnSubscribe { /*todo: animation*/ }
            .doOnNext { view.setViewModel(it) }
            .doOnComplete { view.stopLoading() }
            .doOnError { /*todo: error message*/ }
            .subscribe()
}

第三个流:当用户有 nog 日志时,getLatestLog.execute() 什么也不发出。当此流不发出时,整个 View 将不可见。

数据是从 FireBase 实时数据库中获取的。 ChildEventListener 有一个如下所示的方法:

override fun onChildAdded(dataSnapshot: DataSnapshot?, p1: String?) {
                val log = dataSnapshot?.getValue(Log::class.java)
                log?.let { subscriber.onNext(it) }
                subscriber.onComplete()
                firebaseDatabase.reference.removeEventListener(this)
            }

最佳答案

如果您手头有 Java8 或一些 Optionals,您可以使用这个结构:

  @Test
  void name() {
    TestScheduler scheduler = new TestScheduler();

    Observable<Optional<Integer>> o1$ =
        Observable.just(Optional.ofNullable(4)).mergeWith(Observable.never());
    Observable<Optional<Integer>> o2$ =
        Observable.just(Optional.ofNullable(2)).mergeWith(Observable.never());

    Observable<Optional<Integer>> o3$ =
        Observable.<Optional<Integer>>never()
            .timeout(1000, TimeUnit.MILLISECONDS, scheduler)
            .onErrorResumeNext(
                throwable -> {
                  return Observable.<Optional<Integer>>never()
                      .mergeWith(Observable.just(Optional.empty()));
                });

    Observable<Tuple3<Optional<Integer>, Optional<Integer>, Optional<Integer>>> result =
        Observable.combineLatest(
                o1$,
                o2$,
                o3$,
                (integer, integer2, integer3) -> Tuple.of(integer, integer2, integer3))
            .filter(t -> t._1.isPresent() && t._2.isPresent() && t._3.isPresent());

    TestObserver<Tuple3<Optional<Integer>, Optional<Integer>, Optional<Integer>>> test =
        result.test();

    scheduler.advanceTimeTo(10000, TimeUnit.SECONDS);

    test.assertNotComplete().assertNoErrors().assertNoValues();
  }

你可能不会,空值不允许通过 observables-pipelines 发出。因此,我们需要一些其他构造来表示 null。在 Java8 中有一个名为 Optional 的构造(vavr 称它为 Option -> 也称为 Java8)。

在这个例子中,o3$-Observable 不会发射任何东西。它也可能出错,也许这更像你的情况。我们将捕获错误(在这种情况下:超时异常)并返回一个带有 Optional.empty 的 Observable。

在组合回调中,我们组合了所有三个值。在后面的步骤中,我们过滤掉所有具有有效值的元组(具有值的可选)。

只有当所有三个值都用一个值发出时,您才会发出一个值。

当您不能使用 Optional-class 时,您还可以定义一个 INVALID-Object,如下例所示:

class So51217041 {
  private static Integer INVALID_VALUE = 42;

  @Test
  void name() {
    Observable<Integer> o1$ = Observable.just(4).mergeWith(Observable.never());
    Observable<Integer> o2$ = Observable.just(2).mergeWith(Observable.never());

    Observable<Integer> o3$ =
        Observable.<Integer>never()
            .onErrorResumeNext(
                throwable -> {
                  return Observable.<Integer>never().mergeWith(Observable.just(INVALID_VALUE));
                });

    Observable<Tuple3<Integer, Integer, Integer>> result =
        Observable.combineLatest(
                o1$,
                o2$,
                o3$,
                (integer, integer2, integer3) -> Tuple.of(integer, integer2, integer3))
            .filter(t -> t._3 != INVALID_VALUE); // yeah I know, I want to compare reference, not the content

    TestObserver<Tuple3<Integer, Integer, Integer>> test = result.test();

    test.assertNotComplete().assertNoErrors().assertNoValues();
  }
}

此外,当您希望流以 INVALID 或 NULL 开始时,CombineLatest 至少发出一个值,您可以使用 Observable#startWith(INVALID) 或 Observable#startWith(Optional.empty())。这将保证可观察对象至少会发出一个值。

关于java - RxJava : how to handle combineLatest() when one of the streams emits nothing,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/51217041/

相关文章:

java - 如何在 eclipse e4 中将值从一个 View 发送到另一 View ?

android - Android 应用程序中数据库 Assets 的正确路径

android - 将 recyclerview 中的图片添加到新 Activity 中

android - Kotlin - 使用 RxJava 的改造请求给出空响应

java - Observable OnCompleted 无法更新 UI

java - 从正则表达式获取变量

java - 黑莓手机无法上传大于1KB的文件

检查 Integer 值是否为 null 时发生 Java 异常

java - 是否可以针对不同的屏幕尺寸定义不同的样式

android - 使用 RxJava 获取对象,转换包含列表,并使用列表