java - "More produced than requested"使用 `replay` 和 `autoConnect` 时出现异常

标签 java rx-java reactive-programming

我想创建一个可观察量,仅当新值与前一个值不同时,它才会从底层热可观察量(之前以 -1 开始)发出值。此外,我希望最新的值(value)能够立即发送给新订阅者。我想出了以下代码:

PublishSubject<Integer> hotObservable = PublishSubject.create();

Observable<Integer> observable = hotObservable
        .startWith(-1)
        .distinctUntilChanged()
        .replay(1)
        .autoConnect(0);

但是,在将第一个值(始终-1,无论hotObservable在订阅observable之前发出什么)发送给新订阅者后失败java.lang.IllegalStateException:产生的数量多于请求的数量 有趣的是,当我不自动连接,而是手动订阅时:

Observable<Integer> observable = hotObservable
        .startWith(-1)
        .distinctUntilChanged()
        .replay(1)
        .autoConnect();

observable.subscribe().unsubscribe();

以下订阅者正常工作,接收最后的值,然后更新。

我无法让 replay(1).autoConnect(0) 工作,而且我觉得我错过了一些东西 - 为什么订阅和取消订阅会起作用,而 autoConnect(0)不会吗?创建此类可观察的正确方法是什么?

除非我使用 autoConnect(),否则这是失败的测试方法; observable.subscribe().unsubscribe():

Observable<Integer> observable = hotObservable
        .startWith(-1)
        .distinctUntilChanged()
        .replay(1)
        .autoConnect(); // With (0) it fails

observable.subscribe().unsubscribe(); // Needed if we don't auto connnect

hotObservable.onNext(1);
hotObservable.onNext(2);
hotObservable.onNext(3); // I want this value to be received by new subscriber

TestSubscriber<Integer> subscriber = TestSubscriber.create();
observable.subscribe(subscriber);

subscriber.assertNoErrors();
subscriber.assertValues(3);

最佳答案

在 RxJava 1.1.3 上使用上述代码时,我没有收到生产的数量多于请求的数量错误。

断言失败的原因是,在任何订阅者实际请求之前,replay 不会向上游请求任何内容。如果 TestSubscriber 是第一个订阅的,它将触发 startWith 发出 -1,然后切换到 PublishSubject,它不保留任何值,因此您不会收到任何其他内容。

我相信您正在寻找的是 BehaviorSubject ,它保留最后的值并从新订阅者的值开始:

BehaviorSubject<Integer> hotObservable = BehaviorSubject.create(-1);

Observable<Integer> observable = hotObservable.distinctUntilChanged();

hotObservable.onNext(1);
hotObservable.onNext(2);
hotObservable.onNext(3);

TestSubscriber<Integer> subscriber = TestSubscriber.create();
observable.subscribe(subscriber);

subscriber.assertNoErrors();
subscriber.assertValue(3);

关于java - "More produced than requested"使用 `replay` 和 `autoConnect` 时出现异常,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/36544843/

相关文章:

java - [JAVA]如何调用方法 "itemStateChanged"

java - 使用 Hibernate 执行 "IN"查询

java - 如何锁定线程直到 Observable 完成

javascript - 检测对象是否是 Stream 类实例的最佳方法是什么?

java - 无法启动 spring boot web 应用程序

java - 使用 XFire 通过 ssl 使用 Web 服务

java - @Schedule 在随机时刻执行

http - 使用 RxJava 使用 Async Jersey HTTP Client 限制传出的 HTTP 请求

java - 将标准 Android 代码重构为 Reactive 代码

java - Mono.subscriberContext() 返回旧上下文