java - 可观察以重用运算符执行

标签 java rx-java reactive-programming kotlin

给定以下示例(kotlin 代码)

val subject = PublishSubject.create<Int>()

val stream = subject.map {
    println("mapping")
    it * 2
}

stream.forEach { println("A: $it") }
stream.forEach { println("B: $it") }

subject.onNext(1)
subject.onCompleted()

输出将是

mapping
A: 2
mapping
B: 2

我想要实现的是,source observable 被映射一次,所有订阅者都得到结果,但不为每个订阅者执行映射操作......

像这样

mapping
A: 2
B: 2

在我的例子中,我在延迟和性能至关重要的地方进行非常昂贵的计算,我有一个热可观察作为源和大量订阅者......

我们如何重用运算符执行?以及通常不同的映射操作?

最佳答案

您可以使用 cache缓存任何 future 订阅者可观察到的源结果:

val stream = subject.map {
    println("mapping")
    it * 2
}.cache()

如果你想更精细地控制缓存的方式 replay值得研究。

如果您不想缓存源 Observable 的每个项目,而只想重新发布 新项目,您可以使用 publishautoConnect :

val stream = subject.map {
    println("mapping")
    it * 2
}.publish()
 .autoConnect()

给定以下事件序列:

stream.forEach { println("A: $it") }
stream.forEach { println("B: $it") }

subject.onNext(1)

stream.forEach { println("C: $it") }
subject.onNext(2)
subject.onCompleted()

将打印:

mapping
A: 2
B: 2
mapping
A: 4
B: 4
C: 4

关于java - 可观察以重用运算符执行,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/38196397/

相关文章:

multithreading - 如果消息队列关闭,如何在事件驱动微服务中处理?

java - Apache Camel - 使用 Spring Security 进行 Camel 路由的基本身份验证

java - java中的断言

java - 异步调用集合中的每个项目

java - 使用 Java 中的响应式(Reactive)扩展对对象集合进行排序?

java - 从 Flowable 一次发出一个列表项

java - 将文件从服务器传输到客户端会导致下载速度非常低

java - 用 0-(N-1) 中的唯一数字替换重复数字

angular - RxJS distinctUntilChanged - 对象比较

ios - 如何更改我的返回方法上的可观察类型