我想创建一个可观察量,仅当新值与前一个值不同时,它才会从底层热可观察量(之前以 -1
开始)发出值。此外,我希望最新的值(value)能够立即发送给新订阅者。我想出了以下代码:
PublishSubject<Integer> hotObservable = PublishSubject.create();
Observable<Integer> observable = hotObservable
.startWith(-1)
.distinctUntilChanged()
.replay(1)
.autoConnect(0);
但是,在将第一个值(始终-1
,无论hotObservable
在订阅observable
之前发出什么)发送给新订阅者后失败java.lang.IllegalStateException:产生的数量多于请求的数量
有趣的是,当我不自动连接,而是手动订阅时:
Observable<Integer> observable = hotObservable
.startWith(-1)
.distinctUntilChanged()
.replay(1)
.autoConnect();
observable.subscribe().unsubscribe();
以下订阅者正常工作,接收最后的值,然后更新。
我无法让 replay(1).autoConnect(0)
工作,而且我觉得我错过了一些东西 - 为什么订阅和取消订阅会起作用,而 autoConnect(0)不会吗?创建此类可观察的正确方法是什么?
除非我使用 autoConnect(),否则这是失败的测试方法; observable.subscribe().unsubscribe():
Observable<Integer> observable = hotObservable
.startWith(-1)
.distinctUntilChanged()
.replay(1)
.autoConnect(); // With (0) it fails
observable.subscribe().unsubscribe(); // Needed if we don't auto connnect
hotObservable.onNext(1);
hotObservable.onNext(2);
hotObservable.onNext(3); // I want this value to be received by new subscriber
TestSubscriber<Integer> subscriber = TestSubscriber.create();
observable.subscribe(subscriber);
subscriber.assertNoErrors();
subscriber.assertValues(3);
最佳答案
在 RxJava 1.1.3 上使用上述代码时,我没有收到生产的数量多于请求的数量
错误。
断言失败的原因是,在任何订阅者实际请求之前,replay
不会向上游请求任何内容。如果 TestSubscriber
是第一个订阅的,它将触发 startWith 发出 -1,然后切换到 PublishSubject,它不保留任何值,因此您不会收到任何其他内容。
我相信您正在寻找的是 BehaviorSubject
,它保留最后的值并从新订阅者的值开始:
BehaviorSubject<Integer> hotObservable = BehaviorSubject.create(-1);
Observable<Integer> observable = hotObservable.distinctUntilChanged();
hotObservable.onNext(1);
hotObservable.onNext(2);
hotObservable.onNext(3);
TestSubscriber<Integer> subscriber = TestSubscriber.create();
observable.subscribe(subscriber);
subscriber.assertNoErrors();
subscriber.assertValue(3);
关于java - "More produced than requested"使用 `replay` 和 `autoConnect` 时出现异常,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/36544843/