scala - 寻找 windowAll() 的替代形式,将数据保存在同一节点上以进行聚合

标签 scala apache-flink

我有一个高度并行的聚合,其中有很多键在多个节点上运行。然后,我想对类似于以下代码的所有值进行汇总聚合:

val myStream = sourceStream
  .keyBy( 0 )    
  .window(TumblingProcessingTimeWindows.of(Time.minutes(5)))          
  .reduce(_ + _)
  .addSink(new OtherSink)

val summaryStream = myStream
  .map(Row.fromOtherRow(_))
                     // parallelism is 1 by definition
  .windowAll(TumblingProcessingTimeWindows.of(Time.minutes(5)))   
  .reduce(_ + _)
  .addSink(new RowSink)

这工作正常,但我注意到最终执行 windowAll() 的节点获得大量入站网络流量以及该节点 CPU 的显着峰值。这显然是因为所有数据都聚合在一起并且并行度为“1”。

Flink 是否有任何当前或计划中的规定来做更多的两层汇总聚合,将所有数据保存在每个节点上,在将结果发送到第二层进行最终聚合之前对其进行预聚合?这是我希望找到的一些伪代码:

val myStream = sourceStream
  .keyBy( 0 )    
  .window(TumblingProcessingTimeWindows.of(Time.minutes(5)))          
  .reduce(_ + _)
  .addSink(new OtherSink)

val summaryStream = myStream
  .map(Row.fromOtherRow(_))
                     // parallelism would be at the default for the env
  .windowLocal(TumblingProcessingTimeWindows.of(Time.minutes(5)))
  .reduce(_ + _)
                     // parallelism is 1 by definition
  .windowAll(TumblingProcessingTimeWindows.of(Time.minutes(5)))
  .reduce(_ + _)
  .addSink(new RowSink)

我将其命名为“windowLocal()”,但我相信还有更好的名称。它将像 windowAll() 一样是非键控的。主要好处是它会减少网络和 CPU 以及内存命中 windowAll(),通过将其分布在您正在运行的所有节点上。我目前必须为我的节点分配更多资源以适应此摘要。

如果当前版本可以通过其他方式实现这一点,我很乐意听到。我已经考虑过对第二层的 key 使用随机值,但我相信这会导致数据完全重新平衡,因此它解决了我的 CPU 和内存问题,但没有解决网络问题。我正在寻找与 rescale() 类似的东西,其中数据保留在任务管理器或插槽的本地。

最佳答案

使用 FoldFunction 进行增量窗口聚合

以下示例展示了如何将增量 FoldFunction 与 WindowFunction 结合使用以提取窗口中的事件数并返回窗口的键和结束时间。

val input: DataStream[SensorReading] = ...

input
 .keyBy(<key selector>)
 .timeWindow(<window assigner>)
 .fold (
    ("", 0L, 0),
    (acc: (String, Long, Int), r: SensorReading) => { ("", 0L, acc._3 + 1) },
    ( key: String,
      window: TimeWindow,
      counts: Iterable[(String, Long, Int)],
      out: Collector[(String, Long, Int)] ) =>
      {
        val count = counts.iterator.next()
        out.collect((key, window.getEnd, count._3))
      }
  )

使用 ReduceFunction 的增量窗口聚合

以下示例展示了如何将增量 ReduceFunction 与 WindowFunction 组合以返回窗口中的最小事件以及窗口的开始时间。

val input: DataStream[SensorReading] = ...

input
  .keyBy(<key selector>)
  .timeWindow(<window assigner>)
  .reduce(
    (r1: SensorReading, r2: SensorReading) => { if (r1.value > r2.value) r2 else r1 },
    ( key: String,
      window: TimeWindow,
      minReadings: Iterable[SensorReading],
      out: Collector[(Long, SensorReading)] ) =>
      {
        val min = minReadings.iterator.next()
        out.collect((window.getStart, min))
      }
  )

想了解更多请看这里 在此输入代码 https://ci.apache.org/projects/flink/flink-docs-release-1.2/dev/windows.html

关于scala - 寻找 windowAll() 的替代形式,将数据保存在同一节点上以进行聚合,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/45517342/

相关文章:

scala - 如何将 java Map 转换为 LinkedHashMap[String,ArrayList[String]] 类型的 scala Map?

scala - 文本数据源只支持单列,你有8列

Scala 替代无限循环

scala - 使用两个 scala 库运行 scalatest 和 Maven - 一个用于 Maven,另一个用于 Scala Eclipse 插件

apache-flink - 链接映射状态大小和键数

apache-flink - 在 Flink 中使用计数器获取 numOfRecordsIn

scala - Play Framework 模板中模板之间的可重用函数

java - Flink - 多源集成测试

docker - 如何从 Kubernetes 中的 flink docker 镜像启动我的 jar 应用程序?

resources - Apache 弗林克 : Limit number of CPUs in a TaskManager