java - 用于组合映射的 RxJava 组合器?

标签 java rx-java rx-java2

我有一个Map像这样的任务:

Map<Identifier, Observable<Progress>> tasks;

我想将其转换为:

Observable<Map<Identifier, Progress>>

其中每个 Map<Identifier, Progress> element包含每个任务的最新元素。

我想要新的Observable仅当所有任务完成时才完成,当任意一个任务失败时失败。

有一个组合器吗?

最佳答案

要改造单例Map<Identifier, Observable<Progress>> task<Identifier, Progress>使用:

Observable.just(Identifier)
    .flatMap(Observable<Progress> progressObservable, 
        (Identifier, Progress, Pair<Identifier, Progress>) -> {create pair of elements})

要将这些任务组合在一起,请使用 combineLatest运算符

Observable.combineLatest(List<Observable<Pair<Identifier, Progress>>>, 
    (List<Pair<Identifier, Progress>>) -> {merge into `Map`})

我希望我能很好地理解你的想法。当然,用伪代码编写。

<小时/>

实现:

public static <T, S> Observable<Map<T, S>> zipMaps(final Map<T, Observable<S>> tasks) {
    Objects.requireNonNull(tasks, "tasks is null");
    return Observable.combineLatest(
        tasks.entrySet()
            .stream()
            .map(entry -> Observable.combineLatest(
                Observable.just(entry.getKey()),
                entry.getValue(),
                Pair::with))
            .collect(ImmutableList.toImmutableList()),
        xs -> Arrays.stream(xs)
            .map(x -> (Pair<T, S>)x)
            .collect(ImmutableMap.toImmutableMap(Pair::getValue0, Pair::getValue1)));
}

关于java - 用于组合映射的 RxJava 组合器?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/44441762/

相关文章:

android - 关于为与 Room 中的另一个实体具有一对多关系的实体插入记录的问题

android - RxJava 将 Single 与 Completable 结合起来

java - 如何在 2 月 28 日在 android 中添加 1 个月

java - 在 JavaEE 中创建具有适当分层的 Web 服务

kotlin - 不能使用提供的参数调用以下函数

android - NetworkOnMainThreadException 与 .subscribeOn(Schedulers.newThread())

C# SHA-256 与 Java SHA-256。不同的结果?

java - 为什么显示 "cannot issue executeUpdate() for SELECTs"SQLException?

android - RxJava 运算符像 amb,但只有有效结果

java - 如何在 Rxjava 中观察套接字直到取消订阅