我有以下最小的(不是)工作示例:
import org.junit.Test;
import io.reactivex.Observable;
import com.google.common.util.concurrent.SettableFuture;
@Test
public void testTest() {
final Observable<Long> initialValues = Observable.fromArray(100L, 200L, 300L);
final SettableFuture<Long> future = SettableFuture.create();
final Observable<Long> newValues = Observable.fromFuture(future);
new Thread(
() -> {
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
future.set(400L);
}
).start();
initialValues.takeUntil(newValues).concatWith(newValues).subscribe(System.out::println, Throwable::printStackTrace);
}
我期待以下输出:
100
200
300
但是输出是
400
我的逻辑有问题吗?我正在尝试完成以下行为:
假设有 2 个可观察量:A、B。从 A 中获取值,直到 B 开始发布值,然后仅在 B 上切换。
最佳答案
fromFuture
是 blocking订阅它的线程,因此 initialValues
永远不会有机会运行,因为 takeUntil
首先订阅 newValues
然后被阻塞。试试这个:
initialValues
.takeUntil(newValues.subscribeOn(Schedulers.io()))
.concatWith(newValues)
.subscribe(System.out::println, Throwable::printStackTrace);
关于java - RXJava - 当其中之一的数据可用时永久切换可观察对象,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/50003380/