Java 8 流与 RxJava 可观察量相似吗?
Java 8 流定义:
Classes in the new
java.util.stream
package provide a Stream API to support functional-style operations on streams of elements.
最佳答案
简短回答
所有序列/流处理库都为管道构建提供非常相似的 API。差异在于用于处理多线程和管道组合的 API。
长答案
RxJava 与 Stream 有很大不同。在所有 JDK 事物中,最接近 rx.Observable
也许是 java.util.stream.Collector
Stream
+ CompletableFuture
组合(这是以处理额外的 monad 层为代价的,即必须处理 Stream<CompletableFuture<T>>
和 CompletableFuture<Stream<T>>
之间的转换)。
Observable 和 Stream 之间存在显着差异:
- 流是基于拉取的,Observables 是基于推取的。这听起来可能太抽象,但它会产生非常具体的重大后果。
- Stream只能使用一次,Observable可以订阅多次。
-
Stream#parallel()
将序列拆分为多个分区,Observable#subscribeOn()
和Observable#observeOn()
不要;模仿Stream#parallel()
很棘手Observable 的行为,它曾经有.parallel()
方法,但这种方法引起了如此多的困惑,以至于.parallel()
支持已移至单独的存储库:ReactiveX/RxJavaParallel: Experimental Parallel Extensions for RxJava 。更多详情见another answer . -
Stream#parallel()
与大多数接受可选 Scheduler 的 RxJava 方法不同,不允许指定要使用的线程池。由于 JVM 中的所有流实例都使用相同的 fork-join 池,因此添加.parallel()
可能会意外影响程序另一个模块的行为。 - 流缺少与时间相关的操作,例如
Observable#interval()
,Observable#window()
以及许多其他的;这主要是因为 Streams 是基于拉取的,上游无法控制何时向下游发出下一个元素。 - 与 RxJava 相比,Stream 提供了有限的操作集。例如。流缺少截止操作(
takeWhile()
、takeUntil()
);使用Stream#anyMatch()
的解决方法受到限制:它是终端操作,因此每个流不能多次使用它 - 从 JDK 8 开始,没有
Stream#zip()
操作,有时还是很有用的。 Stream 很难自己构建,Observable 可以通过多种方式构建编辑: 正如评论中所述,有多种构建 Stream 的方法。但是,由于不存在非终端短路,因此您不能。 G。轻松生成文件中的行流(JDK 提供了开箱即用的Files#lines()
和BufferedReader#lines()
,其他类似的场景可以通过从 Iterator 构造 Stream 来管理)。- Observable 提供资源管理工具 (
Observable#using()
);您可以用它包装 IO 流或互斥体,并确保用户不会忘记释放资源 - 它将在订阅终止时自动释放;流有onClose(Runnable)
方法,但您必须手动或通过 try-with-resources 调用它。例如你必须记住Files#lines()
必须包含在 try-with-resources block 中。 - Observables 一直都是同步的(我实际上并没有检查 Streams 是否也是如此)。这使您无需思考基本操作是否是线程安全的(答案始终是"is",除非存在错误),但无论您的代码是否需要,与并发相关的开销都会存在。
综述
RxJava 与 Streams 有很大不同。真正的 RxJava 替代方案是 ReactiveStreams 的其他实现,e。 G。 Akka 的相关部分。
更新
有一个技巧可以使用 Stream#parallel
的非默认 fork-join 池,参见Custom thread pool in Java 8 parallel stream .
更新
以上所有内容均基于 RxJava 1.x 的经验。现在RxJava 2.x is here ,这个答案可能已经过时了。
关于java-8 - Java 8 流和 RxJava 可观察量之间的区别,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/30216979/