我想围绕 Pubsub 主题构建 Flow
\-----------------------------------------------------\
\ ------------------ ------------ ------------- \
> > wrapWithPublish > > toPubsub | | fromPubsub > >
/ ------------------ ------------ ------------- /
/-----------------------------------------------------/
这是我到目前为止编写的代码
def mediatorFlow[In, Out](mediator: ActorRef, topic: String): Flow[In, Out, Unit] = {
val source =
Source
.actorRef[Out](10, OverflowStrategy.dropHead)
.mapMaterializedValue { ref => mediator ! DistributedPubSubMediator.Subscribe(topic, ref); ref }
val wrapWithPublish =
Flow[In].map(DistributedPubSubMediator.Publish(topic, _))
val unsubscribe = DistributedPubSubMediator.Unsubscribe(topic, ref???)
val toPubsub =
Sink.actorRef[DistributedPubSubMediator.Publish](mediator, unsubscribe)
Flow.fromSinkAndSource(wrapWithPublish to toPubsub, source)
}
问题出在unsubscribe
的定义中,我想在流末尾发送一个DistributedPubSubMediator.Subscribe
,它指定一个ref
应该是上面定义的 source
的具体化值。
我知道当 Actor 在流结束时死亡时,Pubsub 会自动取消订阅 Actor。但我还是很好奇如何解决这个问题。
最佳答案
要实现这一点,您需要构建一个比 fromSinkAndSource
更紧密的 Flow,您需要使用 GraphDSL:
val source = ... // as above
Flow.fromGraph(GraphDSL.create(source) { implicit b =>
src =>
import GraphDSL.Implicits._
val concat = b.add(Concat[Any](2))
val wrapWithPublish = b.add(Flow[In].map(DistributedPubSubMediator.Publish(topic, _)))
val toPubSub = b.add(Sink.actorRef[Any](mediator, unsubscribe))
wrapWithPublish ~> concat ~> toPubSub
b.materializedValue.map(DistributedPubSubMediator.Unsubscribe(topic, _)) ~> concat
FlowShape(wrapWithPublish.in, src.out)
})
通过这种方式,您可以将其中一个部分的物化值注入(inject)流元素级别,使其可以发送到 pubsub 中介器。
关于scala - 根据 Source 的物化值创建 Sink,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/35255422/