java - 如何检查所有需要的订阅者是否完成了工作?

标签 java spring rx-java reactive-programming

使用 rx 实体时我有以下方案:

    Observerable1
        |
------------------
|       |         |   
S1(*)  S2(*)     S3 (onCompleted() starts another observable)
                  |
              Observable2
                  |
                 S4(*)

我需要知道所有带有 * 的订阅者何时完成其工作 (S1,S2,S4)。由于它们可以在不同的线程中执行,我需要一些同步机制,或者 rx-java 有一些开箱即用的解决方案?

一些示例代码来说明当前的设计:

@Component
public class BatchReader {
    @Autowired List<Subscriber<List<Integer>>> subscribers;

    public void start() {
        ConnectableObservable<List<Integer>> observable1 = createObservable();
        subscribers.forEach(observable1::subscribe);
        observable1.connect();
    }

    private ConnectableObservable<List<Integer>> createObservable() {
        return Observable.create((Subscriber<? super List<Integer>> subscriber) -> {
            try {    
                subscriber.onStart();

                while (someCondition) {
                    List<Integer> numbers = ...;
                    subscriber.onNext(numbers);
                }

                subscriber.onCompleted();
            } catch (Exception ex) {
                subscriber.onError(ex);
            }
        }).observeOn(Schedulers.newThread()).publish();
    }
}

在 S3 中我有以下逻辑:

@Component
public class S3 extends Subscriber<List<Integer>> {
    @Autowired AnotherBatchReader anotherBatchReader;
    ....
    @Override
    public void onCompleted() {
        anotherBatchReader.start();
    }
    ...
}

S4 在 AnotherBatchReader 中订阅:

@Component
public class AnotherBatchReader {
    @Autowired S4<List<Foo>> subscriber4;

    public void start() {
        Observable<List<Foo>> observable2 = createObservable();
        observable2.subscribe(subscriber4);
    }

    private Observable<List<Foo>> createObservable() {
        return Observable.create(subscriber -> {
            try {
                    subscriber.onStart();
                    while (someConditionBar) {
                        List<Foo> foo = ...;
                        subscriber.onNext(foo);
                    }
                    subscriber.onCompleted();
                }
            } catch (RuntimeException ex) {
                subscriber.onError(ex);
            }
        });
    }
}

那么,当我感兴趣的所有订阅者完成工作时,有没有一种方法可以得到正确的通知? rx 是否支持开箱即用?或者也许有更好的设计来支持它?

编辑: 我有不同的订阅者,因为每个订阅者都有不同的行为。最后,带有 * (S1,S2,S3) 的订阅者会将其数据写入 xml 文件。但是

  • S1 在 onNext() 中接收数据,做一些工作,并将结果直接写入文件
  • S2 在 onNext() 中接收数据,做一些工作,将结果累积到字段中,然后使用 onCompleted 写入
  • S3 在 onNext 中接收数据,做一些工作,将结果写入数据库,并在调用 onCompleted 后启动另一个可观察对象,该可观察对象开始从数据库获取数据并将其推送到S4
  • S4 在 onNext() 中接收数据,执行一些工作并写入文件

我需要将数据写入S3中的数据库的原因是因为onNext()中接收到的数据生成的结果必须是唯一的,但因为我是批量获取数据来自 Observable1 我无法保证这种唯一性,所以 DB 会处理它。

当然,在 S3 中,我不能像在 S2 中那样做(将所有结果累加到内存中),因为 S3 中存在结果的乘法与S2相比显着。

最佳答案

感谢您的澄清。在我看来,明智地应用现有运算符将最大限度地减少您的代码。现在,我不知道所有细节,但你正在做的事情感觉很像这样:

Observable<T> observable1 = ...
        .share();

Observable<?> S1 = S1(observable1);
Observable<?> S2 = S2(observable1);
Observable<?> S3 = S3(observable1);
Observable<?> S4 = defer(() -> readFromDatabase()).compose(S4::generate);

Observable.merge(S1,S2,S3.ignoreElements().switchIfEmpty(S4))
    .ignoreElements()
    .onComplete(...)
    .subscribe();

当然,根据原始可观察量是热还是冷以及S[1-4]的详细信息,细节会有所不同。

另外,不要尝试自己驱动所有订阅者,让框架为您做这件事,您会从中获得更多 - 例如:

S4 = Observable.create(SyncOnSubscribe.generateStateless(
       observer -> observer.onNext(<get List<Foo>>)
   ))
   .takeWhile(list -> someConditionBar);

编辑:这是 XY 问题的一个案例 - 我们都经历过它......

关于java - 如何检查所有需要的订阅者是否完成了工作?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/39470436/

相关文章:

java - 在播放期间更改样本的属性

java - 将 Double 转换为 ArrayList - Android

java - 在一行上无序地声明多个变量

rx-java - 如何使用 RxJava 和 Kotlin 进行 groupBy 和收集?

java - Xuggler - 新 Runnable 中的视频没有愿景且无法结束

java - spring什么时候可以使用cglib来创建代理了?

spring - 有什么方法可以阻止springfox Swagger 地扫描模型类吗?

android - 何时在 rxjava 中使用 `unSubscribeOn`

android - onNext 和 onComplete 的行为

java - Spring Boot - JpaRepository 在使用多个数据源时未初始化