java-8 - Java 8 流和 RxJava 可观察量之间的区别

标签 java-8 java-stream rx-java observable

Java 8 流与 RxJava 可观察量相似吗?

Java 8 流定义:

Classes in the new java.util.stream package provide a Stream API to support functional-style operations on streams of elements.

最佳答案

简短回答

所有序列/流处理库都为管道构建提供非常相似的 API。差异在于用于处理多线程和管道组合的 API。

长答案

RxJava 与 Stream 有很大不同。在所有 JDK 事物中,最接近 rx.Observable也许是 java.util.stream.Collector Stream + CompletableFuture组合(这是以处理额外的 monad 层为代价的,即必须处理 Stream<CompletableFuture<T>>CompletableFuture<Stream<T>> 之间的转换)。

Observable 和 Stream 之间存在显着差异:

  • 流是基于拉取的,Observables 是基于推取的。这听起来可能太抽象,但它会产生非常具体的重大后果。
  • Stream只能使用一次,Observable可以订阅多次。
  • Stream#parallel()将序列拆分为多个分区,Observable#subscribeOn()Observable#observeOn()不要;模仿 Stream#parallel() 很棘手Observable 的行为,它曾经有 .parallel()方法,但这种方法引起了如此多的困惑,以至于 .parallel()支持已移至单独的存储库:ReactiveX/RxJavaParallel: Experimental Parallel Extensions for RxJava 。更多详情见another answer .
  • Stream#parallel()与大多数接受可选 Scheduler 的 RxJava 方法不同,不允许指定要使用的线程池。由于 JVM 中的所有流实例都使用相同的 fork-join 池,因此添加 .parallel()可能会意外影响程序另一个模块的行为。
  • 流缺少与时间相关的操作,例如 Observable#interval() , Observable#window()以及许多其他的;这主要是因为 Streams 是基于拉取的,上游无法控制何时向下游发出下一个元素。
  • 与 RxJava 相比,Stream 提供了有限的操作集。例如。流缺少截止操作( takeWhile()takeUntil() );使用 Stream#anyMatch() 的解决方法受到限制:它是终端操作,因此每个流不能多次使用它
  • 从 JDK 8 开始,没有 Stream#zip()操作,有时还是很有用的。
  • Stream 很难自己构建,Observable 可以通过多种方式构建 编辑: 正如评论中所述,有多种构建 Stream 的方法。但是,由于不存在非终端短路,因此您不能。 G。轻松生成文件中的行流(JDK 提供了开箱即用的 Files#lines()BufferedReader#lines(),其他类似的场景可以通过从 Iterator 构造 Stream 来管理)。
  • Observable 提供资源管理工具 ( Observable#using() );您可以用它包装 IO 流或互斥体,并确保用户不会忘记释放资源 - 它将在订阅终止时自动释放;流有onClose(Runnable)方法,但您必须手动或通过 try-with-resources 调用它。例如你必须记住Files#lines() 必须包含在 try-with-resources block 中。
  • Observables 一直都是同步的(我实际上并没有检查 Streams 是否也是如此)。这使您无需思考基本操作是否是线程安全的(答案始终是"is",除非存在错误),但无论您的代码是否需要,与并发相关的开销都会存在。

综述

RxJava 与 Streams 有很大不同。真正的 RxJava 替代方案是 ReactiveStreams 的其他实现,e。 G。 Akka 的相关部分。

更新

有一个技巧可以使用 Stream#parallel 的非默认 fork-join 池,参见Custom thread pool in Java 8 parallel stream .

更新

以上所有内容均基于 RxJava 1.x 的经验。现在RxJava 2.x is here ,这个答案可能已经过时了。

关于java-8 - Java 8 流和 RxJava 可观察量之间的区别,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/30216979/

相关文章:

java - 如何从订阅者返回 Observable?

string - Stream - 打印排序前后的值

java - 以 react 方式从流中删除已经在数据库中的对象

java - 重做 foreach 到 .stream().map

java - RxJava 与 JavaFx 的性能较差

android - Android 中的 Throttle Subject

angular - 我们是否需要取消订阅已完成/出错的可观察对象?

java-8 - 将逻辑转换为 Streams (Java)(嵌套 for 循环与计数器)

Java 8 流 Map<K,V> 到 List<T>

java - Java 日历中的前一天不正确