scala - 并非所有 akka 流接收器都会接收发出的数据

标签 scala akka-stream

当运行以下 akka 流 FlowGraph 时,并非所有发出的 Char 都会被所有接收器接收。

package sample.stream

import java.io.{ FileOutputStream, PrintWriter }
import akka.actor.ActorSystem
import akka.stream.ActorFlowMaterializer
import akka.stream.scaladsl.{ Broadcast, FlowGraph, Sink, Source }
import scala.concurrent.forkjoin.ThreadLocalRandom
import scala.util.{ Failure, Success, Try }

object Sample {

  def main(args: Array[String]): Unit = {
    println("start")
    implicit val system = ActorSystem("Sys")
    import system.dispatcher
    implicit val materializer = ActorFlowMaterializer()
    var counter = -1

    val countSource: Source[Char, Unit] = Source(() => Iterator.continually { counter += 1; (counter + 'A').toChar }.take(11))

    var counter1 = 0
    val consoleSink1 = Sink.foreach[Char] { counter =>
      println("sink1:" + counter1 + ":" + counter)
      counter1 += 1
      Thread.sleep(100)
      //Thread.sleep(300)
    }
    var counter2 = 0
    val consoleSink2 = Sink.foreach[Char] { counter =>
      println("sink2:" + counter2 + ":" + counter)
      counter2 += 1
      Thread.sleep(200)
    }

    val materialized = FlowGraph.closed(consoleSink1, consoleSink2)((x1, x2) => x1) { implicit builder =>
      (console1, console2) =>
        import FlowGraph.Implicits._
        val broadcast = builder.add(Broadcast[Char](2))
        countSource ~> broadcast ~> console1
        broadcast ~> console2
    }.run()

    // ensure the output file is closed and the system shutdown upon completion
    materialized.onComplete {
      case Success(_) =>
        system.shutdown()
      case Failure(e) =>
        println(s"Failure: ${e.getMessage}")
        system.shutdown()
    }
    println("waiting the remaining ones")
    //scala.concurrent.Await.ready(materialized, scala.concurrent.duration.DurationInt(100).seconds)
    //system.shutdown()
    println("end")
  }
}

运行后生成以下输出

[info] Running sample.stream.Sample
[info] start
[info] waiting the remaining ones
[info] end
[info] sink2:0:A
[info] sink1:0:A
[info] sink1:1:B
[info] sink1:2:C
[info] sink2:1:B
[info] sink1:3:D
[info] sink2:2:C
[info] sink1:4:E
[info] sink1:5:F
[info] sink2:3:D
[info] sink1:6:G
[info] sink1:7:H
[info] sink2:4:E
[info] sink2:5:F
[info] sink1:8:I
[info] sink1:9:J
[info] sink2:6:G
[info] sink2:7:H
[info] sink1:10:K

第二个接收器没有收到第 8、9 和 10 个值:IJK,但整个流程仍然结束。

我应该怎么做才能等待两个Sink消耗完所有数据? 我发现如果我将 (x1,x2)=>x1 更改为 (x1,x2)=>x2 这将等待。这与在第一个接收器中休眠 300 毫秒是一样的。

最佳答案

传递给 FlowGraph.close 的第二个参数列表的函数确定运行流程时返回的具体化值。因此,当您传入 (x1,x2)=>x1 时,您会返回一个 future,当第一个接收器获取所有元素时,该 future 就会完成,然后该 future 的回调会在没有第二个接收器的情况下关闭 actor 系统有机会获得所有元素。

相反,您应该将两个 future 都取出来,并仅在两个 future 都完成时才关闭系统。

您实际上可以看到在一些 akka-stream 测试中如何使用此方法 here .

关于scala - 并非所有 akka 流接收器都会接收发出的数据,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/29903676/

相关文章:

scala - Scala 中的模式匹配与案例类

scala - Akka Streams 入口和导出匹配

scala-2.11 - 如何从 akka.stream.io.Framing$FramingException 中恢复

playframework - 基于 "resource"连接的客户端正在处理的 BroadcastHub 过滤?

scala - 创建一个生成长度为 N 的整数有序列表的 Scala 函数

Scala future 结合

scala - 我的特征列在数据框中变为空

scala - Akka-Streams 收集数据(Source -> Flow -> Flow (collect) -> Sink)

java - Akka流滑动窗口通过SourceQueue控制receive到sink

scala - 以下 Java "continue"代码如何转换为 Scala?