scala - Akka Streams 中平衡和广播扇出的区别

标签 scala akka akka-stream

我对 Akka streams 中的扇出策略有点困惑,我读到了Broadcast –(1 个输入,N 个输出)给定一个输入元素向每个输出发出,而 Balance –(1 个输入,N 个输出)给定一个输入元素发射到其输出端口之一。

你能解释一下吗:

  • 多个消费者之间如何平衡?
  • 短语“发射到其输出端口之一”的含义
  • 端口是否与下游相同?
  • “平衡”是否代表将输入流复制到几个输出分区
  • “平衡使图形能够分开并复制下游订阅者的多个实例以处理卷”是什么意思?
  • 最佳答案

    从文档中...广播向每个消费者发出(发送)元素。 balance 只发送给第一个可用的消费者。

    broadcast

    Emit each incoming element each of n outputs.



    balance

    Fan-out the stream to several streams. Each upstream element is emitted to the first available downstream consumer.



    从评论编辑:

    根据您的要点,您应该创建两个 averageCarrierDelay 函数,每个函数一个 ZF .然后你可以看到发送给每个元素的所有元素。
    val averageCarrierDelayZ =
        Flow[FlightDelayRecord]
          .groupBy(30, _.uniqueCarrier)
            .fold(("", 0, 0)){
              (x: (String, Int, Int), y:FlightDelayRecord) => {
                println(s"Z Received Element: ${y}")
                val count = x._2 + 1
                val totalMins = x._3 + Try(y.arrDelayMins.toInt).getOrElse(0)
                (y.uniqueCarrier, count, totalMins)
              }
            }.mergeSubstreams
    
    
    val averageCarrierDelayF =
        Flow[FlightDelayRecord]
          .groupBy(30, _.uniqueCarrier)
            .fold(("", 0, 0)){
              (x: (String, Int, Int), y:FlightDelayRecord) => {
                println(s"F Received Element: ${y}")
                val count = x._2 + 1
                val totalMins = x._3 + Try(y.arrDelayMins.toInt).getOrElse(0)
                (y.uniqueCarrier, count, totalMins)
              }
            }.mergeSubstreams
    

    编辑 2:为了将来检查事情,我建议为流阶段使用通用记录器,以便您可以看到发生了什么。
    def logElement[A](msg: String) = Flow[A].map { a => println(s"${msg} ${a}"); a }
    

    这样做可以让您执行以下操作:
    D ~> logElement[FlightDelayRecord]("F received: ") ~> F
    D ~> logElement[FlightDelayRecord]("Z received: ") ~> Z
    

    通过这种方式,您可以检查图形区域是否存在您可能会或可能不会预料到的奇怪行为。

    关于scala - Akka Streams 中平衡和广播扇出的区别,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/40222075/

    相关文章:

    scala - 已超过 Websocket 最大帧长度 65536

    scala - 让 Akka-2 实例每 n 个时间单位向自己发送一条消息,而不会溢出邮箱

    scala - Akka流-如何访问流的物化值

    scala - 改进此代码的方法

    scala - Scala 中 "static"方法的特征?

    scala - 在 Scala 编译器插件中为合成类定义构造函数参数?

    scheduled-tasks - 在集群设置中执行 2 个 Java 计划的 Akka 作业

    scala - 以编程方式停止 Alpakka Kafka 流的正确方法

    java - 从 Play 2.4.1 迁移到 2.5.6 - 套接字

    scala - 如何知道akka中SourceQueue的已用缓冲区大小?