我开始尝试使用 RxJava 和 ReactFX,并且对它们非常着迷。但在我进行实验时,我有很多问题,而且我一直在寻找答案。
我观察到的一件事(没有双关语意)当然是惰性执行。通过下面的探索性代码,我注意到在调用 merge.subscribe(pet -> System.out.println(pet))
之前没有执行任何操作。但令我着迷的是,当我订阅第二个订阅者 merge.subscribe(pet -> System.out.println("Feed "+ pet))
时,它再次触发了“迭代”。
我想了解的是迭代的行为。它的行为似乎不像只能使用一次的 Java 8 stream
。它真的是一次遍历每个 String
并将其作为该时刻的值发布吗?在之前解雇的订阅者之后是否有任何新订阅者收到这些项目,就好像它们是新的一样?
public class RxTest {
public static void main(String[] args) {
Observable<String> dogs = Observable.from(ImmutableList.of("Dasher", "Rex"))
.filter(dog -> dog.matches("D.*"));
Observable<String> cats = Observable.from(ImmutableList.of("Tabby", "Grumpy Cat", "Meowmers", "Peanut"));
Observable<String> ferrets = Observable.from(CompletableFuture.supplyAsync(() -> "Harvey"));
Observable<String> merge = dogs.mergeWith(cats).mergeWith(ferrets);
merge.subscribe(pet -> System.out.println(pet));
merge.subscribe(pet -> System.out.println("Feed " + pet));
}
}
最佳答案
Observable<T>
代表一个 monad,一个链式操作,而不是操作本身的执行。它是描述性语言,而不是您习惯的命令式语言。要执行操作,您 .subscribe()
给它。每次您订阅时,都会从头开始创建一个新的执行流。不要将流与线程混淆,因为订阅是同步执行的,除非您使用.subscribeOn()
指定线程更改或 .observeOn()
.您将新元素链接到任何现有操作/monad/Observable 以添加 new behaviour ,例如更改线程、过滤、累积、转换等。如果您的可观察对象是您不想在每个订阅上重复的昂贵操作,您可以使用 .cache()
来防止重新创建。 .
使任何异步/同步Observable<T>
将操作转换为同步内联操作,使用 .toBlocking()
将其类型更改为 BlockingObservable<T>
.而不是 .subscribe()
它包含使用 .forEach()
对每个结果执行操作的新方法, 或强制使用 .first()
Observables 是一个很好的工具,因为它们大多*是确定性的(相同的输入总是产生相同的输出,除非你做错了什么)、可重用(你可以将它们作为命令/策略模式的一部分发送)并且用于大部分忽略并发,因为他们不应该依赖共享状态(也就是做错事)。如果您尝试将基于可观察对象的库引入命令式语言,或者只是对您有 100% 信心管理良好的可观察对象执行操作,则 BlockingObservables 非常有用。
围绕这些原则构建您的应用程序是范式的改变,我无法在这个答案中真正涵盖。
*There are breaches like
Subject
andObservable.create()
that are needed to integrate with imperative frameworks.
关于java - RxJava Observable "Iteration"是如何工作的?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/29711347/