我有一个 observable,它使用 scan
运算符进行转换,因此它总是根据当前值和先前值(或初始值)发出一个值。
Subject<String> subject = PublishSubject.create();
Observable<String> observable = subject.scan("zero", (a, b) -> a + ", " + b);
然后,首先观察者订阅,打印所有发出的值,一段时间后取消订阅。
Disposable first = observable.subscribe(System.out::println); // "zero"
subject.onNext("one"); // "zero, one"
first.dispose();
稍后,另一个订阅者订阅:
Disposable second = observable.subscribe(System.out::println); // "zero"
subject.onNext("two"); // "zero, one"
second.dispose();
如您所见,每个观察者首先会收到初始值,而前一个观察者订阅时发出的值会消失。我想要实现的是在所有订阅中将状态保持在 scan
运算符中:
Disposable first = observable.subscribe(System.out::println); // "zero"
subject.onNext("one"); // "zero, one"
first.dispose();
Disposable second = observable.subscribe(System.out::println); // "zero, one"
subject.onNext("two"); // "zero, one, two"
second.dispose();
在 RxJava 中有解决这个问题的方法吗?
最佳答案
另一种解决方案:
observable#publish 会将结果多播给所有订阅者,因此保留最后一个#scan 值。您的解决方案不起作用,因为您每次订阅它时都会创建一个新的 Observable。扫描值未保留。
@Test
void name3231() {
Subject<String> subject = PublishSubject.create();
Observable<String> observable = subject.scan("zero", (a, b) -> a + ", " + b);
ConnectableObservable<String> observableMulticast = observable.publish();
Disposable connect = observableMulticast.connect(); // must be disposed by hand
TestObserver<String> test = observableMulticast.test();
Disposable first = observableMulticast.subscribe(System.out::println); // "zero"
subject.onNext("one"); // "zero, one"
first.dispose();
test.assertValues("zero, one");
Disposable second = observableMulticast.subscribe(System.out::println); // "zero, one"
subject.onNext("two"); // "zero, one, two"
second.dispose();
test.assertValues("zero, one", "zero, one, two");
}
关于java - RxJava 的扫描运算符能否跨多个订阅者保持状态,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/46999610/