java - RxJava Observable "Iteration"是如何工作的?

标签 java rx-java

我开始尝试使用 RxJava 和 ReactFX,并且对它们非常着迷。但在我进行实验时,我有很多问题,而且我一直在寻找答案。

我观察到的一件事(没有双关语意)当然是惰性执行。通过下面的探索性代码,我注意到在调用 merge.subscribe(pet -> System.out.println(pet)) 之前没有执行任何操作。但令我着迷的是,当我订阅第二个订阅者 merge.subscribe(pet -> System.out.println("Feed "+ pet)) 时,它再次触发了“迭代”。

我想了解的是迭代的行为。它的行为似乎不像只能使用一次的 Java 8 stream。它真的是一次遍历每个 String 并将其作为该时刻的值发布吗?在之前解雇的订阅者之后是否有任何新订阅者收到这些项目,就好像它们是新的一样?

public class RxTest {

    public static void main(String[] args) {

        Observable<String> dogs = Observable.from(ImmutableList.of("Dasher", "Rex"))
                .filter(dog -> dog.matches("D.*"));

        Observable<String> cats = Observable.from(ImmutableList.of("Tabby", "Grumpy Cat", "Meowmers", "Peanut"));

        Observable<String> ferrets = Observable.from(CompletableFuture.supplyAsync(() -> "Harvey"));

        Observable<String> merge = dogs.mergeWith(cats).mergeWith(ferrets);

        merge.subscribe(pet -> System.out.println(pet));


        merge.subscribe(pet -> System.out.println("Feed " + pet));

    }
}

最佳答案

Observable<T>代表一个 monad,一个链式操作,而不是操作本身的执行。它是描述性语言,而不是您习惯的命令式语言。要执行操作,您 .subscribe()给它。每次您订阅时,都会从头开始创建一个新的执行流。不要将流与线程混淆,因为订阅是同步执行的,除非您使用.subscribeOn() 指定线程更改.observeOn() .您将新元素链接到任何现有操作/monad/Observable 以添加 new behaviour ,例如更改线程、过滤、累积、转换等。如果您的可观察对象是您不想在每个订阅上重复的昂贵操作,您可以使用 .cache() 来防止重新创建。 .

使任何异步/同步Observable<T>将操作转换为同步内联操作,使用 .toBlocking()将其类型更改为 BlockingObservable<T> .而不是 .subscribe()它包含使用 .forEach() 对每个结果执行操作的新方法, 或强制使用 .first()

Observables 是一个很好的工具,因为它们大多*是确定性的(相同的输入总是产生相同的输出,除非你做错了什么)、可重用(你可以将它们作为命令/策略模式的一部分发送)并且用于大部分忽略并发,因为他们不应该依赖共享状态(也就是做错事)。如果您尝试将基于可观察对象的库引入命令式语言,或者只是对您有 100% 信心管理良好的可观察对象执行操作,则 BlockingObservables 非常有用。

围绕这些原则构建您的应用程序是范式的改变,我无法在这个答案中真正涵盖。

*There are breaches like Subject and Observable.create() that are needed to integrate with imperative frameworks.

关于java - RxJava Observable "Iteration"是如何工作的?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/29711347/

相关文章:

java.lang.NoSuchMethodError : javax. 持久性.PersistenceContext.同步

Java 位操作 - (num >>= 1) 做什么?

java - 在 CSV 文件中写入/追加/搜索文本的最有效的内存/CPU 方式

android - 在 RxJava 中处理 Completable、Single、Maybe 和终止 Observable 的最佳实践

java - ehcache 3如何切换到TickingTimeSource?

java - 在 Java 中通过引用传递原始数据

spring - 在RxJava Services中管理事务性的正确方法是什么?

rx-java - 创建一个可以接受参数的 Observable

android - RXJava 如何在 x 时间后尝试获取下一个

Java8- "effectively final"