scala - 如何解释官方文档中的 Akka Streams 图?

标签 scala akka akka-stream

我对官方托管的示例代码有几个问题 here :

val topHeadSink = Sink.head[Int]
val bottomHeadSink = Sink.head[Int]
val sharedDoubler = Flow[Int].map(_ * 2)

RunnableGraph.fromGraph(GraphDSL.create(topHeadSink, bottomHeadSink)((_, _)) { implicit builder =>
  (topHS, bottomHS) =>
  import GraphDSL.Implicits._
  val broadcast = builder.add(Broadcast[Int](2))
  Source.single(1) ~> broadcast.in

  broadcast.out(0) ~> sharedDoubler ~> topHS.in
  broadcast.out(1) ~> sharedDoubler ~> bottomHS.in
  ClosedShape
})
  1. 什么时候通过create传入图表?

为什么create时传入了topHeadSink、bottomHeadSink,而sharedDoubler却没有传入?它们有什么区别?

  • 什么时候需要builder.add
  • 我可以在没有 builder.add 的情况下在图表外部创建广播吗?如果我在图中添加几个流,是否也应该通过 builder.add 添加流?令人困惑的是,有时我们需要 builder.add,有时则不需要。

    更新

    我觉得这仍然令人困惑:

    这些方法之间的区别在于,使用 builder.add(...) 导入会忽略导入图形的具体化值,而通过工厂方法导入则允许包含它。

    topHS、bottomHS 是从 create 导入的,因此它们将保留其物化值。如果我执行 builder.add(topHS) 会怎样?

    你如何解释sharedDoubler:它是否具有物化值?如果我使用 builder.add 会怎样?

  • GraphDSL.create(topHeadSink, BottomHeadSink)((_, _))((_,_)) 是什么意思?
  • 它看起来像我们只需要的样板,但我不确定它是什么。

    最佳答案

    1. When do you pass in a graph through create?

    当您想要获取传递给 create 工厂方法的图形的具体化值时。您问题中的 RunnableGraph 的类型是 RunnableGraph[(Future[Int], Future[Int])],这意味着该图的物化值为 ( future [Int], future [Int]):

    val g = RunnableGraph.fromGraph(...).run() // (Future[Int], Future[Int])
    val topHeadSinkResult    = g._1 // Future[Int]
    val bottomHeadSinkResult = g._2 // Future[Int]
    

    现在考虑以下变体,它定义图形“内部”的接收器并丢弃物化值:

    val g2 = RunnableGraph.fromGraph(GraphDSL.create() { implicit builder =>
      import GraphDSL.Implicits._
    
      val topHeadSink = Sink.head[Int]
      val bottomHeadSink = Sink.head[Int]
      val broadcast = builder.add(Broadcast[Int](2))
    
      Source.single(1) ~> broadcast.in
      broadcast.out(0) ~> sharedDoubler ~> topHeadSink
      broadcast.out(1) ~> sharedDoubler ~> bottomHeadSink
      ClosedShape
    }).run() // NotUsed
    

    g2 的值为 NotUsed

    1. When do you need builder.add?

    图表的所有组件都必须添加到构建器中,但 ~> 有变体运算符将最常用的组件(例如 SourceFlow)添加到底层的构建器中。但是,执行扇入(例如合并)或扇出(例如广播)的连接操作必须显式传递如果您使用的是 Graph DSL,则添加到 builder.add

    请注意,对于简单图形,您可以使用联结而无需使用图形 DSL。这是 documentation 中的示例:

    val sendRmotely = Sink.actorRef(actorRef, "Done")
    val localProcessing = Sink.foreach[Int](_ => /* do something usefull */ ())
    
    val sink = Sink.combine(sendRmotely, localProcessing)(Broadcast[Int](_))
    
    Source(List(0, 1, 2)).runWith(sink)
    
    1. What does this mean? the ((_,_)) of GraphDSL.create(topHeadSink, bottomHeadSink)((_, _))?

    它是一个柯里化(Currying)参数,指定要保留的具体化值。此处使用 ((_, _)) 等同于:

    val g = RunnableGraph.fromGraph(GraphDSL.create(topHeadSink, bottomHeadSink)((t, b) => (t, b)) {
      implicit builder => (topHS, bottomHS) =>
      ...
    }).run() // (Future[Int], Future[Int])
    

    换句话说,在此上下文中 ((_, _))((t, b) => (t, b)) 的简写,其中保留传入的两个接收器各自的物化值。例如,如果您只想保留 topHeadSink 的物化值,则可以将调用更改为以下内容:

    val g = RunnableGraph.fromGraph(GraphDSL.create(topHeadSink, bottomHeadSink)((t, _) => t) {
      implicit builder => (topHS, bottomHS) =>
      ...
    }).run() // Future[Int]
    

    关于scala - 如何解释官方文档中的 Akka Streams 图?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/48633778/

    相关文章:

    具有持久邮箱的 Akka 无状态参与者

    scala - 如何在 akka http 中向 POST 请求添加重试?

    scala - 运行 Akka 微内核时获取命令行参数?

    java - Akka Streams onFailuresWithBackoff 未重新启动流程

    scala - 为什么Scala无法通过try/catch优化尾调用?

    java - 使用 Spark 连接 MariaDB 时出现 ClassNotFoundException

    scala - 为什么 Scala 编译器没有标记看起来不是尾递归函数的东西?

    scala - 为什么自定义 DefaultSource 会给出 java.io.NotSerializableException?

    scala - 如何正确终止包含循环的图?

    scala - 什么是 Akka Streams 中的 Flow#join