我有一个高度并行的聚合,其中有很多键在多个节点上运行。然后,我想对类似于以下代码的所有值进行汇总聚合:
val myStream = sourceStream
.keyBy( 0 )
.window(TumblingProcessingTimeWindows.of(Time.minutes(5)))
.reduce(_ + _)
.addSink(new OtherSink)
val summaryStream = myStream
.map(Row.fromOtherRow(_))
// parallelism is 1 by definition
.windowAll(TumblingProcessingTimeWindows.of(Time.minutes(5)))
.reduce(_ + _)
.addSink(new RowSink)
这工作正常,但我注意到最终执行 windowAll() 的节点获得大量入站网络流量以及该节点 CPU 的显着峰值。这显然是因为所有数据都聚合在一起并且并行度为“1”。
Flink 是否有任何当前或计划中的规定来做更多的两层汇总聚合,将所有数据保存在每个节点上,在将结果发送到第二层进行最终聚合之前对其进行预聚合?这是我希望找到的一些伪代码:
val myStream = sourceStream
.keyBy( 0 )
.window(TumblingProcessingTimeWindows.of(Time.minutes(5)))
.reduce(_ + _)
.addSink(new OtherSink)
val summaryStream = myStream
.map(Row.fromOtherRow(_))
// parallelism would be at the default for the env
.windowLocal(TumblingProcessingTimeWindows.of(Time.minutes(5)))
.reduce(_ + _)
// parallelism is 1 by definition
.windowAll(TumblingProcessingTimeWindows.of(Time.minutes(5)))
.reduce(_ + _)
.addSink(new RowSink)
我将其命名为“windowLocal()”,但我相信还有更好的名称。它将像 windowAll() 一样是非键控的。主要好处是它会减少网络和 CPU 以及内存命中 windowAll(),通过将其分布在您正在运行的所有节点上。我目前必须为我的节点分配更多资源以适应此摘要。
如果当前版本可以通过其他方式实现这一点,我很乐意听到。我已经考虑过对第二层的 key 使用随机值,但我相信这会导致数据完全重新平衡,因此它解决了我的 CPU 和内存问题,但没有解决网络问题。我正在寻找与 rescale() 类似的东西,其中数据保留在任务管理器或插槽的本地。
最佳答案
使用 FoldFunction 进行增量窗口聚合
以下示例展示了如何将增量 FoldFunction 与 WindowFunction 结合使用以提取窗口中的事件数并返回窗口的键和结束时间。
val input: DataStream[SensorReading] = ...
input
.keyBy(<key selector>)
.timeWindow(<window assigner>)
.fold (
("", 0L, 0),
(acc: (String, Long, Int), r: SensorReading) => { ("", 0L, acc._3 + 1) },
( key: String,
window: TimeWindow,
counts: Iterable[(String, Long, Int)],
out: Collector[(String, Long, Int)] ) =>
{
val count = counts.iterator.next()
out.collect((key, window.getEnd, count._3))
}
)
使用 ReduceFunction 的增量窗口聚合
以下示例展示了如何将增量 ReduceFunction 与 WindowFunction 组合以返回窗口中的最小事件以及窗口的开始时间。
val input: DataStream[SensorReading] = ...
input
.keyBy(<key selector>)
.timeWindow(<window assigner>)
.reduce(
(r1: SensorReading, r2: SensorReading) => { if (r1.value > r2.value) r2 else r1 },
( key: String,
window: TimeWindow,
minReadings: Iterable[SensorReading],
out: Collector[(Long, SensorReading)] ) =>
{
val min = minReadings.iterator.next()
out.collect((window.getStart, min))
}
)
想了解更多请看这里
在此输入代码
https://ci.apache.org/projects/flink/flink-docs-release-1.2/dev/windows.html
关于scala - 寻找 windowAll() 的替代形式,将数据保存在同一节点上以进行聚合,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/45517342/