使用 rx 实体时我有以下方案:
Observerable1
|
------------------
| | |
S1(*) S2(*) S3 (onCompleted() starts another observable)
|
Observable2
|
S4(*)
我需要知道所有带有 *
的订阅者何时完成其工作 (S1,S2,S4
)。由于它们可以在不同的线程中执行,我需要一些同步机制,或者 rx-java 有一些开箱即用的解决方案?
一些示例代码来说明当前的设计:
@Component
public class BatchReader {
@Autowired List<Subscriber<List<Integer>>> subscribers;
public void start() {
ConnectableObservable<List<Integer>> observable1 = createObservable();
subscribers.forEach(observable1::subscribe);
observable1.connect();
}
private ConnectableObservable<List<Integer>> createObservable() {
return Observable.create((Subscriber<? super List<Integer>> subscriber) -> {
try {
subscriber.onStart();
while (someCondition) {
List<Integer> numbers = ...;
subscriber.onNext(numbers);
}
subscriber.onCompleted();
} catch (Exception ex) {
subscriber.onError(ex);
}
}).observeOn(Schedulers.newThread()).publish();
}
}
在 S3 中我有以下逻辑:
@Component
public class S3 extends Subscriber<List<Integer>> {
@Autowired AnotherBatchReader anotherBatchReader;
....
@Override
public void onCompleted() {
anotherBatchReader.start();
}
...
}
S4 在 AnotherBatchReader 中订阅:
@Component
public class AnotherBatchReader {
@Autowired S4<List<Foo>> subscriber4;
public void start() {
Observable<List<Foo>> observable2 = createObservable();
observable2.subscribe(subscriber4);
}
private Observable<List<Foo>> createObservable() {
return Observable.create(subscriber -> {
try {
subscriber.onStart();
while (someConditionBar) {
List<Foo> foo = ...;
subscriber.onNext(foo);
}
subscriber.onCompleted();
}
} catch (RuntimeException ex) {
subscriber.onError(ex);
}
});
}
}
那么,当我感兴趣的所有订阅者完成工作时,有没有一种方法可以得到正确的通知? rx 是否支持开箱即用?或者也许有更好的设计来支持它?
编辑:
我有不同的订阅者,因为每个订阅者都有不同的行为。最后,带有 *
(S1,S2,S3) 的订阅者会将其数据写入 xml 文件。但是
- S1 在
onNext()
中接收数据,做一些工作,并将结果直接写入文件 - S2 在
onNext()
中接收数据,做一些工作,将结果累积到字段中,然后使用onCompleted
写入 - S3 在
onNext
中接收数据,做一些工作,将结果写入数据库,并在调用onCompleted
后启动另一个可观察对象,该可观察对象开始从数据库获取数据并将其推送到S4 - S4 在
onNext()
中接收数据,执行一些工作并写入文件
我需要将数据写入S3中的数据库的原因是因为onNext()
中接收到的数据生成的结果必须是唯一的,但因为我是批量获取数据来自 Observable1
我无法保证这种唯一性,所以 DB 会处理它。
当然,在 S3
中,我不能像在 S2
中那样做(将所有结果累加到内存中),因为 S3 中存在结果的乘法与S2相比显着。
最佳答案
感谢您的澄清。在我看来,明智地应用现有运算符将最大限度地减少您的代码。现在,我不知道所有细节,但你正在做的事情感觉很像这样:
Observable<T> observable1 = ...
.share();
Observable<?> S1 = S1(observable1);
Observable<?> S2 = S2(observable1);
Observable<?> S3 = S3(observable1);
Observable<?> S4 = defer(() -> readFromDatabase()).compose(S4::generate);
Observable.merge(S1,S2,S3.ignoreElements().switchIfEmpty(S4))
.ignoreElements()
.onComplete(...)
.subscribe();
当然,根据原始可观察量是热还是冷以及S[1-4]
的详细信息,细节会有所不同。
另外,不要尝试自己驱动所有订阅者
,让框架为您做这件事,您会从中获得更多 - 例如:
S4 = Observable.create(SyncOnSubscribe.generateStateless(
observer -> observer.onNext(<get List<Foo>>)
))
.takeWhile(list -> someConditionBar);
编辑:这是 XY 问题的一个案例 - 我们都经历过它......
关于java - 如何检查所有需要的订阅者是否完成了工作?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/39470436/