java - RxJava。 Observable.delay 工作奇怪(最后缺少一些项目)

标签 java rx-java

我正在尝试理解 RxJava。我的测试代码是:

import rx.Observable;
import rx.Subscriber;
import rx.functions.Action1;

import java.util.concurrent.TimeUnit;

public class Hello {
    public static void main(String[] args) {

            Observable<String> observable = Observable.create(new Observable.OnSubscribe<String>() {
            @Override
            public void call(Subscriber<? super String> subscriber) {
                try {
                    Thread.sleep(1000);
                    subscriber.onNext("a");
                    Thread.sleep(1000);
                    subscriber.onNext("b");
                    Thread.sleep(1000);
                    subscriber.onNext("c");
                    Thread.sleep(1000);
                    subscriber.onNext("d");
                    Thread.sleep(1000);
                    subscriber.onNext("e");
                    Thread.sleep(1000);
                    subscriber.onNext("f");
                    Thread.sleep(1000);
                    subscriber.onNext("g");
                    Thread.sleep(1000);
                    subscriber.onNext("h");
                } catch (InterruptedException e) {
                    subscriber.onError(e);
                }
            }
        });

        observable
                .delay(2, TimeUnit.SECONDS)
                .subscribe(new Action1<String>() {
                    @Override
                    public void call(String string) {
                        System.out.println(string);
                    }
                });
    }
}

没有 .delay(2, TimeUnit.SECONDS) 我有输出:a b C d 电子 F G H 但是 .delay(2, TimeUnit.SECONDS) 输出缺少“g”和“h”: A b C d 电子 f

怎么可能呢?文档说 delay 只是发出由源 Observable 发出的项目,并按指定的延迟时间向前移动

最佳答案

您正在使用的 delay 重载调度在不同的线程上工作并导致隐式竞争条件。所有时间运算符(例如 delaybuffer window) 需要使用调度程序来安排稍后的效果,如果您没有意识到并小心使用它们,这可能会导致意外的竞争条件。在这种情况下,延迟运算符将下游工作安排在单独的线程池上。这是测试中的执行顺序(在主线程上)。

  1. 您的 Observable 已订阅并在 onNext("a")
  2. 之前等待 1000 毫秒
  3. 接下来它被延迟接收。这会在 2 秒后安排下游 onNext。
  4. 控制流立即返回到等待 1000 毫秒的可观察对象。
  5. Observable onNext("b") 延迟。 Delay 将 "b"的 onNext 安排在 2 秒后。
  6. ....(重复)
  7. 当您的 observable 调用 onNext("h") 时,它会安排工作,然后立即从订阅返回并终止您的测试(导致安排的工作消失)。

为了让它异步执行,您可以在 trampoline 调度程序实现上安排延迟。

.delay(2, TimeUnit.SECONDS, Schedulers.trampoline())

关于java - RxJava。 Observable.delay 工作奇怪(最后缺少一些项目),我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/32915351/

相关文章:

java - mybatis。 java.lang.UnsupportedOperationException异常

java - RxJava : chaining results from map methods

observable - RxJava 2 Flowable 是热的还是冷的?

android - RXJava Android - 创建另一个可观察对象所需的可观察结果

java - RxJava 中的指数退避

Javafx 部署失败。在指定位置找不到 JDK 工件 : C:\Program Files\Java\jre1. 8.0

java - 在 Maven 构建期间从 Maven 依赖项 jar 创建类列表

java - JFrame : 'Random' size?

Java 数组自定义函数

java - StringObservable.from(InputStream).share() 立即导致 MissingBackPressure