java - 如何将 Observable<Observable<List<T>>> 转换为 Observable<List<T>>

标签 java rx-java reactive-programming rx-java2

我陷入了如何将以下可观察类型转换/转换为我的目标类型的困境:

我有可观察的类型:

Observable<Observable<List<FooBar>>>

我想将其转换为:

Observable<List<FooBar>>

所以当我订阅它时,它会发出 List<FooBar>不是Observable<List<FooBar>>

我尝试使用 map 、flatMap...我找不到解决方案。

但是,我发现了一个看起来很奇怪的运算符,叫做 blockingFirst我的 IDE 表明它返回 Observable<List<FooBar>>当应用于Observable<Observable<List<FooBar>>>

但是“阻塞”部分让我感到困惑。

我也在寻找比 blockingFirst 更好的解决方案一个(如果有的话)。

最佳答案

flatMap确实是要走的路:

Observable<Observable<List<FooBar>>> streamOfStreams = ...
Observable<List<FooBar>> listStream = 
          streamOfStreams.flatMap(listObservable -> listObservable);

我认为你应该从不同的角度来看待它,它不是简单地从一种类型转换为另一种类型。 ObservableObservables表示发出流的流,每个流发出一些项目的列表。您想要实现的是将其展平为单个流,从所有流中发出所有列表。
flatMap准确地做到这一点,你给它一个项目发射,并返回 Observable , flatMap将订阅返回的Observable并将将从它发出的每个项目合并到源流,在这种情况下,因为您只需返回每个项目发射,即 Observable<List<FooBar>> ,您实际上采用了每个发出的 Observable ,订阅它,并将其所有列表排放合并回来,这样您就可以从所有 Observables 获得所有列表的返回流。 .

blockingFirst绝对不是要走的路,它所做的是等待(阻止)直到第一次发射并仅返回该项目,因为您的项目 ar Observable<List<FooBar>>您将获得唯一的第一个 Observable 。因此,虽然它确实具有相同的类型,但它显然不是您想要的相同的流。

关于java - 如何将 Observable<Observable<List<T>>> 转换为 Observable<List<T>>,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/43712065/

相关文章:

java - 即使指定了 subscribeOn,代码也在主线程上运行

android - rx-java:在IntentService的onHandleIntent中订阅Observable

c# - 在 RX.net 和 WPF 中长时间运行 API 调用的正确方法

java Process.waitfor 是一个阻塞调用

java - AspectJ 在 Eclipse 中不能正常工作

java多态性使用父类(super class)变量创建新的子类对象

java - FIrebaseUI(版本 0.4.0): firebase recycler Adapter not fetching data from firebase database

java - 检查输入的数字是否为奇数

reactive-programming - 如何使用 RxJava 串行批处理长进程?

javascript - RXJS 可观察拉伸(stretch)