java - RxJava。将列表的热可观察对象转换为单项流,对其进行处理并转换为列表

标签 java rx-java observable

假设我有一些项目列表的可观察对象;

Observable<List<Item>> observable = ...;

我需要将它转换为单个项目流并对每个项目执行一些操作,比如过滤,之后我应该将它转换回列表并在订阅者的 onNext 方法中处理它:

observable.flatMap(Observable::from)
    .filter(Item::isFiltered)
    .toList()
    .subscribe(this::onNext, this::onError)

public void onNext(List<Item> items) {...}

乍一看似乎没问题,但事实并非如此,因为我们的 observable 是 hot,所以 toList() 永远不会执行(因为它等待源 observable 完成)并且整个流卡住。

我该如何解决这个问题?另请注意,filter 附近可能是对单个项目的任意数量的附加操作。

最佳答案

您可以对单个项目和最终的 toList 执行所有操作您在 flatMap 中创建的 Observable 上的运算符.. 这样,您会收到 onComplete 电话和 toList将收集和转换项目。

observable.flatMap(list -> {
     return Observable.from(list)
                        .filter(Item::isFiltered)
                        .toList()
  })
  .subscribe(this::onNext, this::onError)

关于java - RxJava。将列表的热可观察对象转换为单项流,对其进行处理并转换为列表,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/41637511/

相关文章:

java - 为什么我传递给 Arrays.sort 的方法引用需要是静态的?

java - 寻找 Rxjava 运算符将源合并到一个流中

swift - Rx swift : Completing observable sequence of unknown length

递归可观察调用中的Angular 4加载树结构

rxjs - 使用 scan Observable 返回默认值

java - Android studio中eclipse有没有像ctrl + shift + T之类的东西可以看到打开类型窗口

java - 带构造函数的类 newInstance()

java - 如何在 Java 软件架构中实现可插入功能和模块?

android - dispose 后订阅 observable

Java Stream 相当于 ReactiveX Observable#scan