java - RxJava : Dynamic set of observables

标签 java rx-java

我有一个中心类,称之为Central。它可以添加 1-N 个可观察量。我需要动态添加这些,然后知道最终的 onComplete() 何时执行。

如何做到这一点?

代码示例:

public class Central {
  public void addObservable(Observable o){
    // Add the new observable to subscriptions
  }
}
<小时/>

更新。

我已经研究这个问题有一段时间了。使用@DaveMoten的答案,我已经接近了。

这是我随意添加新可观察对象的方法,并在所有观察对象完成时收到通知(伪代码):

class central {
  PublishSubject<Observable<T>> subject = PublishSubject.create();
  int commandCount = 0;
  int concatCount = 0;
  addObservable(Observable newObs){
    commandCount++;
    concatCount++;
    subject.concatMap(o -> 
          o.doOnCompleted(
                  () -> { 
                    concatCount--;
                    LOG.warn("inner completed, concatCount: " + concatCount);
                  })
          ).doOnNext(System.out::println)
            .doOnCompleted(() -> { 
                System.out.println("OUTER completed");
            } ) .subscribe();

    onNext(newObs);
    subject.onCompleted(); // If this is here, it ends prematurely (after first onComplete)
    // if its not, OUTER onComplete never happens

    newObs.subscribe((e) ->{},
    (error) -> {},
    () -> {
        commandCount--;
        LOG.warn("inner end: commandCount: " + commandCount);
    });
  }
}
// ... somewhere else in the app:
Observable ob1 = Observable.just(t1);
addObservable.addObservable(ob1);
// ... and possibly somewhere else:
Observable ob2 = Observable.just(t2, t3);
addObservable(ob2);

// Now, possibly somewhere else:
ob1.onNext(1);
// Another place:
ob2.onNext(2);

日志记录如下所示:

19:51:16.248 [main] WARN  execute: commandCount: 1
19:51:17.340 [main] WARN  execute: commandCount: 2
19:51:23.290 [main] WARN  execute: commandCount: 3
9:51:26.969 [main] WARN   inner completed, concatCount: 2
19:51:27.004 [main] WARN  inner end: commandCount: 2
19:51:27.008 [main] WARN  inner completed, concatCount: 1
19:51:27.009 [main] WARN  inner end: commandCount: 1
19:51:51.745 [ProcessIoCompletion0] WARN  inner completed, concatCount: 0
19:51:51.750 [ProcessIoCompletion0] WARN  inner completed, concatCount: -1
19:51:51.751 [ProcessIoCompletion0] WARN  inner end: commandCount: 0
19:51:51.752 [ProcessIoCompletion0] WARN  inner completed, concatCount: -2
19:51:51.753 [ProcessIoCompletion0] WARN  inner completed, concatCount: -3

更新:我添加了一些计数器,这表明我不明白 concatMap 发生了什么。您可以看到观察者本身的 lambda 订阅正确倒数到 0,但 concatMap oncomplete 下降到 -3!并且外部完整永远不会发生。

最佳答案

使用PublishSubject:

PublishSubject<Observable<T>> subject = 
    PublishSubject.create();
subject
    // protect against concurrent calls to subject (optional)
    .serialize()
    .concatMap(o -> 
      o.doOnCompleted(() -> System.out.println("inner completed")))
    .doOnNext(System.out::println)
    .doOnCompleted(() -> System.out.println("completed"))
    .subscribe(subscriber);

subject.onNext(Observable.just(t1));
subject.onNext(Observable.just(t2, t3));
subject.onCompleted();

  .

关于java - RxJava : Dynamic set of observables,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/39938933/

相关文章:

java - 编写 jax-ws web 服务并生成无 XSD 的 WSDL

java - 使用 apache commons io 更改文件的编码不起作用?

android - 在 RxJava 中观察一个基本的自定义事件

android - RxJavaCallAdapterFactory 无法转换为 Factory

rx-java - 接收超时操作符

java - 在 Liferay 5 中获取文章的类别

Java 8 增强了带有索引/范围的 for 循环

android - 在 doAsync 结果中调用方法 - 主线程上的工作太多

java - 如何根据一组值更新数据模型

java - 操作后格式化bigdecimal数