scala - 根据 Source 的物化值创建 Sink

标签 scala akka akka-stream

我想围绕 Pubsub 主题构建 Flow

\-----------------------------------------------------\
 \  ------------------  ------------    -------------  \
  > > wrapWithPublish > > toPubsub |    | fromPubsub >  >
 /  ------------------  ------------    -------------  /
/-----------------------------------------------------/

这是我到目前为止编写的代码

def mediatorFlow[In, Out](mediator: ActorRef, topic: String): Flow[In, Out, Unit] = {
  val source =
    Source
      .actorRef[Out](10, OverflowStrategy.dropHead)
      .mapMaterializedValue { ref => mediator ! DistributedPubSubMediator.Subscribe(topic, ref); ref }

  val wrapWithPublish =
    Flow[In].map(DistributedPubSubMediator.Publish(topic, _))

  val unsubscribe = DistributedPubSubMediator.Unsubscribe(topic, ref???)

  val toPubsub =
    Sink.actorRef[DistributedPubSubMediator.Publish](mediator, unsubscribe)

  Flow.fromSinkAndSource(wrapWithPublish to toPubsub, source)
}

问题出在unsubscribe的定义中,我想在流末尾发送一个DistributedPubSubMediator.Subscribe,它指定一个ref 应该是上面定义的 source 的具体化值。

我知道当 Actor 在流结束时死亡时,Pubsub 会自动取消订阅 Actor。但我还是很好奇如何解决这个问题。

最佳答案

要实现这一点,您需要构建一个比 fromSinkAndSource 更紧密的 Flow,您需要使用 GraphDSL:

val source = ... // as above
Flow.fromGraph(GraphDSL.create(source) { implicit b =>
  src =>
    import GraphDSL.Implicits._
    val concat = b.add(Concat[Any](2))
    val wrapWithPublish = b.add(Flow[In].map(DistributedPubSubMediator.Publish(topic, _)))
    val toPubSub = b.add(Sink.actorRef[Any](mediator, unsubscribe))

    wrapWithPublish ~> concat ~> toPubSub
    b.materializedValue.map(DistributedPubSubMediator.Unsubscribe(topic, _)) ~> concat

    FlowShape(wrapWithPublish.in, src.out)
})

通过这种方式,您可以将其中一个部分的物化值注入(inject)流元素级别,使其可以发送到 pubsub 中介器。

关于scala - 根据 Source 的物化值创建 Sink,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/35255422/

相关文章:

list - scala 通过从带有元组的现有列表中省略 n 元素来创建新列表

java - 脚本每次对多个请求使用相同的名称,而不是每个请求使用不同的名称

scala - 如何在 Scala 复制中导入游戏

java - 如何在 Java Play Framework 中记录请求和响应

scala - 使用 Akka HTTP 上传文件

java - 物化值在 Akka Stream 中如何工作

scala - 在 spark 中使用 partitionBy 和 coalesce

java - 使用 Akka http 向请求传递数据

json - 将 Akka actorRef 发送到 json

java - Akka如何连接源、流和汇