java - 使用 RxJava fork 任务并合并结果

标签 java concurrency rx-java rx-java2

我正在尝试执行以下操作:

                           A
                           |
                           |
                           V
                     Observable<B>
                           /\
                          /  \
                         /    \
                        V      V
            Observable<C>       Observable<D>
                        \      /
                         \    /
                          V  V
                      Observable<E>
  1. 给定输入 [A],异步调用返回 [B]。
  2. 两个各自需要[B]的任务需要并行运行,分别返回[C]和[D]。
  3. 这两个结果合并为 [E],然后显示在 UI 中。

我是RxJava的新手,遇到过zip、merge等,但是不太明白这类问题需要什么操作符。任何帮助将不胜感激。

附言。 1) 虽然 [C] 和 [D] 都是必需的,但 [E] 仍然可以只用其中之一创建。因此,如果其中一个(或两个)失败,最好在此时设置超时。
2) 是否可以让它们在特定线程中运行 - 一个在 computation() 中,另一个在 io() 中?

这是我目前拥有的概念代码。我这样做是线性的:
A -> B -> C -> D -> E

    return a2b(a)
            .subscribeOn(Schedulers.io())
            .flatMap(this::b2c)
            .subscribeOn(Schedulers.computation())
            .map(this::c2d)
            .map(this::d2e)
            .cast(E.class)
            .startWith(e -> new E.loadingState());

理想情况下,我应该在某处使用以下函数:

Observable<E> cd2e(C c, D d) {
    return Observable.just(new E());
}

谢谢。

最佳答案

publish() 运算符以允许多个订阅的方式绑定(bind)单个可观察对象。

return a2b(a)
        .subscribeOn(Schedulers.io())
        .publish( bObservable -> 
               Observable.zip( bObservable.map( b -> this::b2c ),
                               bObservable.map( b -> this::b2d ),
                               (c, d) -> combine( c, d ) )
        .subscribe( ... );

运营商绑定(bind)观察者链,以便可以进行多个订阅;在这种情况下,订阅被压缩在一起,将 CD 类型组合成组合的 E 类型。

然后您可以自由添加 observeOn() 运算符以在您想要的线程上完成计算。

关于java - 使用 RxJava fork 任务并合并结果,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/48015796/

相关文章:

java - 具有多个观察者的 Android Rx 生命周期

java - 导出 Spring Boot JAR 以进行远程部署

java - 如何使用 URLConnection 上传 WAV 文件

swift - 如果队列不为空,则跳过新任务。 swift

c# - 在多线程场景中从 List<T> 添加/删除特定项目的最佳方法是什么

angular - 如果我发现自己一直在订阅另一个订阅中的内容,我应该怎么做?

java - 将类路径添加到 build.xml 的最佳方法

Java - 基于旧字符串和滞后创建新字符串

Haskell 推测并行执行

android - 如何在 rxJava 中手动调用 observer.onNext