java - RxJava 的扫描运算符能否跨多个订阅者保持状态

标签 java rx-java

我有一个 observable,它使用 scan 运算符进行转换,因此它总是根据当前值和先前值(或初始值)发出一个值。

Subject<String> subject = PublishSubject.create();
Observable<String> observable = subject.scan("zero", (a, b) -> a + ", " + b);

然后,首先观察者订阅,打印所有发出的值,一段时间后取消订阅。

Disposable first = observable.subscribe(System.out::println); // "zero"
subject.onNext("one"); // "zero, one"
first.dispose();

稍后,另一个订阅者订阅:

Disposable second = observable.subscribe(System.out::println); // "zero"
subject.onNext("two"); // "zero, one"
second.dispose();

如您所见,每个观察者首先会收到初始值,而前一个观察者订阅时发出的值会消失。我想要实现的是在所有订阅中将状态保持在 scan 运算符中:

Disposable first = observable.subscribe(System.out::println); // "zero"
subject.onNext("one"); // "zero, one"
first.dispose();
Disposable second = observable.subscribe(System.out::println); // "zero, one"
subject.onNext("two"); // "zero, one, two"
second.dispose();

在 RxJava 中有解决这个问题的方法吗?

最佳答案

另一种解决方案:

observable#publish 会将结果多播给所有订阅者,因此保留最后一个#scan 值。您的解决方案不起作用,因为您每次订阅它时都会创建一个新的 Observable。扫描值未保留。

@Test
void name3231() {
    Subject<String> subject = PublishSubject.create();
    Observable<String> observable = subject.scan("zero", (a, b) -> a + ", " + b);

    ConnectableObservable<String> observableMulticast = observable.publish();

    Disposable connect = observableMulticast.connect(); // must be disposed by hand

    TestObserver<String> test = observableMulticast.test();

    Disposable first = observableMulticast.subscribe(System.out::println); // "zero"
    subject.onNext("one"); // "zero, one"
    first.dispose();

    test.assertValues("zero, one");

    Disposable second = observableMulticast.subscribe(System.out::println); // "zero, one"
    subject.onNext("two"); // "zero, one, two"
    second.dispose();

    test.assertValues("zero, one", "zero, one, two");
}

关于java - RxJava 的扫描运算符能否跨多个订阅者保持状态,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/46999610/

相关文章:

java - 如何从 Eclipse 外部驱动 Eclipse?

java - 如何在 Angular JS中发送java对象

java - 如何在 RXJava/Android 中连接两个订阅

java - RXJava 2 多个订阅者轮询

android - 停止 Observable.interval 并发送 onComplete

java - ImageView 不显示在以图层列表为背景的 LinearLayout 内

java - 无法写入文件扫描仪 try/catch Java

java - 使用 JAXB 合并多个文件

java - 你什么时候在 RxJava 中使用 map 和 flatMap?

android - Android 中带有 RetroFit2 的 ReactiveLocationProvider 抛出 NetworkOnMainThread