scala - Akka Streams - 如何在图表中保留辅助接收器的物化值

标签 scala akka akka-stream

我有一个返回Flow的函数,其逻辑涉及将图形的一些元素传递到作为参数传递的辅助Sink。我想保留辅助 Sink 的物化值,以便在启动构建的流时能够对其值采取行动。

这是我正在构建的流程的粗略图片:

IN ~> (logic: In => Out) ~> Broadcast ~> AuxFilter ~> AuxSink
                                      ~> OutFilter ~> OUT

示例代码:

case class Incoming()
trait Element
case class Outcoming() extends Element
case class Persistent() extends Element

def flow[Mat](auxSink: Sink[Persistent, Mat]): Flow[Incoming, Outcoming, NotUsed] = {
  val isPersistent = Flow[Element].collect {
    case persistent: Persistent => persistent
  }

  val isRunning = Flow[Element].collect {
    case out: Outcoming => out
  }

  val magicFlow: Flow[Incoming, Element, NotUsed] = Flow[Incoming]
    .map(_ => if (Random.nextBoolean()) Outcoming() else Persistent())

  Flow.fromGraph {
    GraphDSL.create() { implicit b =>
      import GraphDSL.Implicits._

      val magic = b.add(magicFlow)
      val bcast = b.add(Broadcast[Element](2))
      val sink = b.add(isRunning)

                   bcast.out(0) ~> isPersistent ~> auxSink
      magic.out ~> bcast.in
                   bcast.out(1) ~> isRunning ~> sink.in

      FlowShape(magic.in, sink.out)
    }
  }
}

有没有办法以某种方式将 auxSinkMat 传递到生成的 Flow

谢谢。

最佳答案

回答我自己的问题...

找到了! Flow.alsoToMat 的来源准确地指出了我所需要的逻辑 - 要访问辅助图的物化值(在我的例子中 auxSink ),必须导入其通过将其作为参数传递给 GraphDSL.create() 来构建图形。

def flow[Mat](auxSink: Sink[Persistent, Mat]): Flow[Incoming, Outcoming, Mat] = {
  val isPersistent = ...
  val isRunning = ...
  val magicFlow = ...

  Flow.fromGraph {
    GraphDSL.create(auxSink) { implicit b => aux =>
      import GraphDSL.Implicits._

      val magic = b.add(magicFlow)
      val bcast = b.add(Broadcast[Element](2))
      val sink = b.add(isRunning)

      magic ~> bcast ~> isPersistent ~> aux
               bcast ~> isRunning ~> sink

      FlowShape(magic.in, sink.out)
    }
  }
}

关于scala - Akka Streams - 如何在图表中保留辅助接收器的物化值,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/40183214/

相关文章:

scala - scala 与 JVM 的联系有多紧密?

scala - sbt:发布生成的源

mongodb - Akka和ReactiveMongo

scala - Akka Streams 中的 RestartFlow 未按预期工作

java - 如何在 Scala 中捕获中断的 java 进程

java - Play Framework 2.1.2 国际化不适用于芬兰语字符

sockets - Akka 远程连接

java - Akka 身份验证设计(有限状态机)

scala - 如何在 Akka Stream 中记录流速?

java - Scala 中线程执行器池的替代品