scala - Akka流通过限流并行/处理流的吞吐量

标签 scala akka akka-stream

我有一个用例,我想向外部系统发送消息,但发送此消息的流采用并返回我无法在下游使用的类型。这是传递流的一个很好的用例。我正在使用实现 here .最初我担心如果 processingFlow 使用 mapAsyncUnordered 那么这个流将不起作用。由于处理流程可能会对消息重新排序,并且 zip with 可能会推出具有不正确对的元组。例如在下面的例子中。

  val testSource = Source(1 until 50)
  val processingFlow: Flow[Int, Int, NotUsed] = Flow[Int].mapAsyncUnordered(10)(x => Future {
    Thread.sleep(Random.nextInt(50))
    x * 10
  })
  val passThroughFlow = PassThroughFlow(processingFlow, Keep.both)

  val future = testSource.via(passThroughFlow).runWith(Sink.seq)

我希望处理流程可以根据其输入重新排序其输出,我会得到如下结果:
[(30,1), (40,2),(10,3),(10,4), ...]

右边(通过的总是按顺序传递)但左边通过我的 mapAsyncUnordered 可能与不正确的元素连接以形成一个错误的元组。

相反,我实际上得到:
[(10,1), (20,2),(30,3),(40,4), ...]

每次。经过进一步调查,我注意到代码运行缓慢,实际上尽管我的 map 异步无序,但它根本没有并行运行。我尝试在前后引入缓冲区以及异步边界,但它似乎总是按顺序运行。这解释了为什么它总是有序但我希望我的处理流程具有更高的吞吐量。

我想出了以下解决方法:
object PassThroughFlow {

  def keepRight[A, A1](processingFlow: Flow[A, A1, NotUsed]): Flow[A, A, NotUsed] =
    keepBoth[A, A1](processingFlow).map(_._2)

  def keepBoth[A, A1](processingFlow: Flow[A, A1, NotUsed]): Flow[A, (A1, A), NotUsed] =
    Flow.fromGraph(GraphDSL.create() { implicit builder => {
      import GraphDSL.Implicits._

      val broadcast = builder.add(Broadcast[A](2))
      val zip = builder.add(ZipWith[A1, A, (A1, A)]((left, right) => (left, right)))

      broadcast.out(0) ~> processingFlow ~> zip.in0
      broadcast.out(1) ~> zip.in1

      FlowShape(broadcast.in, zip.out)
    }
    })
}

object ParallelPassThroughFlow {


  def keepRight[A, A1](parallelism: Int, processingFlow: Flow[A, A1, NotUsed]): Flow[A, A, NotUsed] =
    keepBoth(parallelism, processingFlow).map(_._2)

  def keepBoth[A, A1](parallelism: Int, processingFlow: Flow[A, A1, NotUsed]): Flow[A, (A1, A), NotUsed] = {
    Flow.fromGraph(GraphDSL.create() { implicit builder =>
      import GraphDSL.Implicits._

      val fanOut = builder.add(Balance[A](outputPorts = parallelism))
      val merger = builder.add(Merge[(A1, A)](inputPorts = parallelism, eagerComplete = false))

      Range(0, parallelism).foreach { n =>
        val passThrough = PassThroughFlow.keepBoth(processingFlow)
        fanOut.out(n) ~> passThrough ~> merger.in(n)
      }

      FlowShape(fanOut.in, merger.out)
    })
  }

}

两个问题:
  • original implementation ,为什么pass through flow里面的zip会限制map async unordered的并行量?
  • 我的工作是健全的还是可以改进的?我基本上将我的输入扇出到多个传递流的堆栈中,然后将它们全部合并在一起。它似乎具有我想要的属性(并行但即使处理流程重新排序也能保持顺序)但有些地方感觉不对
  • 最佳答案

    您目睹的行为是由 broadcast 造成的和 zip工作:broadcast当它的所有输出发出需求信号时向下游发射; zip在发出需求信号(并向下游发送)之前等待其所有输入。

    broadcast.out(0) ~> processingFlow ~> zip.in0
    broadcast.out(1) ~> zip.in1
    

    考虑第一个元素 ( 1 ) 在上图中的移动。 1向双方广播 processingFlowzip . zip立即接收其输入之一( 1 )并等待其另一个输入( 10 ),这将需要更长的时间才能到达。仅当 zip两者都得到 110它是否从上游拉取更多元素,从而触发第二个元素( 2 )通过流的移动。等等。

    至于你的ParallelPassThroughFlow ,我不知道你为什么“感觉有些不对劲”。

    关于scala - Akka流通过限流并行/处理流的吞吐量,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/55219937/

    相关文章:

    scala - 如何为泛型类型编写 scalaz.IsEmpty 参数

    scala - 出现错误 JsValue Expected 但 JsNode supplied

    scala - 在 Scala REPL 中导入多个包

    web-services - spray-can webservice 优雅关闭

    Scala/Akka/Guice 动态注入(inject) child Actor

    scala - Akka流-计时器或计划程序(如CRON)

    scala - 使用 Akka 将文件从服务器流式传输到客户端

    scala - 在 Akka 中保留类型参数接收

    scala - 在接收方法中调用 Future 并在此之后停止 actor

    java - 在流程中的任何阶段失败时重试 akka 中的流