spring - 如何避免Spring Reactor中类似订阅者重复映射操作?

标签 spring kotlin reactive-programming project-reactor

我有一个发布者发出字符串,而许多订阅者可能使用相同的映射函数来创建具有不同过滤器的模型。

发布者:

val publisher: Flux<String> = ...

订阅者#1

val sub1 = publisher.map{veryExpensiveConverter.convert(it)}
                    .filter(it.metric<10)

订阅者#2

val sub2 = publisher.map{veryExpensiveConverter.convert(it)}
                    .filter(it.metric>5)

订阅者#3

val sub3 = sub2.map{cheapConverter.convert(it)}
                    .filter(it.metric>8)

订阅者#4

val sub4 = sub3.map{yetAnotherConverter.convert(it)}
                    .filter(it.metric>80)

最后我订阅了所有的通量

Flux.merge(sub1, sub2, sub3, ..., subn)
     .map{//some logic for following data of subscribers}
     .subscribe()

问题:对于每个订阅者的同一条已发布记录,veryExppressiveConverter 会执行多次。 执行流程如下

Input1 -> veryExpensiveConverter -> filter1 -> output1
       -> veryExpensiveConverter -> filter2 -> output2
       -> veryExpensiveConverter -> cheapConverter -> filter3 -> output3

我也想要

Input1 -> veryExpensiveConverter -> filter1 -> output1  
                                 -> filter2 -> output2
                                 -> cheapConverter -> filter3 -> output3

什么模式最适合避免为每个订阅者执行相同的映射?

最佳答案

您可以在某种程度上 .share() 以确保对该共享部分的每个订阅仅触发其上方的单个订阅。

您还可以查看 .publish().xxx() 方法以获取更高级的自动触发器(.share() 将在第一个订阅进来)。

类似这样的事情:

val expensiveDoneOnce = publisher
    .map{veryExpensiveConverter.convert(it)}
    .publish()
    .refCount(2)
val sub1 = expensiveDoneOnce.filter(it.metric < 10)
val sub2 = expensiveDoneOnce.filter(it.metric > 5)

关于spring - 如何避免Spring Reactor中类似订阅者重复映射操作?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/65877264/

相关文章:

reactive-programming - 如何使用 StartWith() rx 运算符但仅当序列不为空时?

angular - 如何在使用 jib 时创建 docker-compose.yml 文件?

spring - 找不到使用域类资源创建的Grails REST API返回

java - 使用 Spring 的 Google App Engine GAE MemcacheService 单例注入(inject)

java - Spring 存储库并不总是抛出 DataIntegrityViolationException

kotlin - 如何将 `throw` 放入辅助函数中但仍然具有空安全性?

android - 如何使用TabLayout在Fragment中添加ListView?

android - 如何在Kotlin中为Android应用程序生成protobuf?

c# - RX Switch() 的订阅和取消订阅命令

javascript - 如何观察对象的方法返回的属性变化