java - RXJava - 当其中之一的数据可用时永久切换可观察对象

标签 java reactive-programming rx-java2

我有以下最小的(不是)工作示例:

import org.junit.Test;
import io.reactivex.Observable;
import com.google.common.util.concurrent.SettableFuture;

@Test
public void testTest() {
    final Observable<Long> initialValues = Observable.fromArray(100L, 200L, 300L);
    final SettableFuture<Long> future = SettableFuture.create();
    final Observable<Long> newValues = Observable.fromFuture(future);

    new Thread(
            () -> {
                try {
                    Thread.sleep(1000);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
                future.set(400L);
            }
    ).start();

    initialValues.takeUntil(newValues).concatWith(newValues).subscribe(System.out::println, Throwable::printStackTrace);
}

我期待以下输出:

100

200

300

但是输出是

400

我的逻辑有问题吗?我正在尝试完成以下行为:

假设有 2 个可观察量:A、B。从 A 中获取值,直到 B 开始发布值,然后仅在 B 上切换。

最佳答案

fromFutureblocking订阅它的线程,因此 initialValues 永远不会有机会运行,因为 takeUntil 首先订阅 newValues 然后被阻塞。试试这个:

initialValues
.takeUntil(newValues.subscribeOn(Schedulers.io()))
.concatWith(newValues)
.subscribe(System.out::println, Throwable::printStackTrace);

关于java - RXJava - 当其中之一的数据可用时永久切换可观察对象,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/50003380/

相关文章:

java - 组件类中的 Autowiring bean 为 null

java - 如何设置环境变量以使javac能够找到导入的包?

c# - 如何获取 N 个热 Observable<decimal> 实例的 "last"项的总和?

android - Rxjava2、Retrofit2 - 在两个类之间传输数据。

没有数据绑定(bind)的android MVVM

java - 可以从作为参数传递的类创建对象吗?

java - 计算器不会接受用户输入

java - rxjava 中的异常处理

angular - 使用 Observables,延迟后显示加载指示器,但如果加载及时完成则取消?

android - 与 Room 和 LiveData 的多对多关系