我正在尝试理解 RxJava。我的测试代码是:
import rx.Observable;
import rx.Subscriber;
import rx.functions.Action1;
import java.util.concurrent.TimeUnit;
public class Hello {
public static void main(String[] args) {
Observable<String> observable = Observable.create(new Observable.OnSubscribe<String>() {
@Override
public void call(Subscriber<? super String> subscriber) {
try {
Thread.sleep(1000);
subscriber.onNext("a");
Thread.sleep(1000);
subscriber.onNext("b");
Thread.sleep(1000);
subscriber.onNext("c");
Thread.sleep(1000);
subscriber.onNext("d");
Thread.sleep(1000);
subscriber.onNext("e");
Thread.sleep(1000);
subscriber.onNext("f");
Thread.sleep(1000);
subscriber.onNext("g");
Thread.sleep(1000);
subscriber.onNext("h");
} catch (InterruptedException e) {
subscriber.onError(e);
}
}
});
observable
.delay(2, TimeUnit.SECONDS)
.subscribe(new Action1<String>() {
@Override
public void call(String string) {
System.out.println(string);
}
});
}
}
没有 .delay(2, TimeUnit.SECONDS)
我有输出:a
b
C
d
电子
F
G
H
但是 .delay(2, TimeUnit.SECONDS)
输出缺少“g”和“h”:
A
b
C
d
电子
f
怎么可能呢?文档说 delay 只是发出由源 Observable 发出的项目,并按指定的延迟时间向前移动
最佳答案
您正在使用的 delay
重载调度在不同的线程上工作并导致隐式竞争条件。所有时间运算符(例如 delay
、buffer
和 window
) 需要使用调度程序来安排稍后的效果,如果您没有意识到并小心使用它们,这可能会导致意外的竞争条件。在这种情况下,延迟运算符将下游工作安排在单独的线程池上。这是测试中的执行顺序(在主线程上)。
- 您的 Observable 已订阅并在
onNext("a")
之前等待 1000 毫秒
- 接下来它被延迟接收。这会在 2 秒后安排下游 onNext。
- 控制流立即返回到等待 1000 毫秒的可观察对象。
- Observable
onNext("b")
延迟。 Delay 将 "b"的 onNext 安排在 2 秒后。 - ....(重复)
- 当您的 observable 调用
onNext("h")
时,它会安排工作,然后立即从订阅返回并终止您的测试(导致安排的工作消失)。
为了让它异步执行,您可以在 trampoline 调度程序实现上安排延迟。
.delay(2, TimeUnit.SECONDS, Schedulers.trampoline())
关于java - RxJava。 Observable.delay 工作奇怪(最后缺少一些项目),我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/32915351/