我有一个发布者发出字符串,而许多订阅者可能使用相同的映射函数来创建具有不同过滤器的模型。
发布者:
val publisher: Flux<String> = ...
订阅者#1
val sub1 = publisher.map{veryExpensiveConverter.convert(it)}
.filter(it.metric<10)
订阅者#2
val sub2 = publisher.map{veryExpensiveConverter.convert(it)}
.filter(it.metric>5)
订阅者#3
val sub3 = sub2.map{cheapConverter.convert(it)}
.filter(it.metric>8)
订阅者#4
val sub4 = sub3.map{yetAnotherConverter.convert(it)}
.filter(it.metric>80)
最后我订阅了所有的通量
Flux.merge(sub1, sub2, sub3, ..., subn)
.map{//some logic for following data of subscribers}
.subscribe()
问题:对于每个订阅者的同一条已发布记录,veryExppressiveConverter 会执行多次。 执行流程如下
Input1 -> veryExpensiveConverter -> filter1 -> output1
-> veryExpensiveConverter -> filter2 -> output2
-> veryExpensiveConverter -> cheapConverter -> filter3 -> output3
我也想要
Input1 -> veryExpensiveConverter -> filter1 -> output1
-> filter2 -> output2
-> cheapConverter -> filter3 -> output3
什么模式最适合避免为每个订阅者执行相同的映射?
最佳答案
您可以在某种程度上 .share()
以确保对该共享部分的每个订阅仅触发其上方的单个订阅。
您还可以查看 .publish().xxx()
方法以获取更高级的自动触发器(.share()
将在第一个订阅进来)。
类似这样的事情:
val expensiveDoneOnce = publisher
.map{veryExpensiveConverter.convert(it)}
.publish()
.refCount(2)
val sub1 = expensiveDoneOnce.filter(it.metric < 10)
val sub2 = expensiveDoneOnce.filter(it.metric > 5)
关于spring - 如何避免Spring Reactor中类似订阅者重复映射操作?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/65877264/