我们开发了一个 Flink 应用程序,它接收具有类似 id, value1, value2, ...
字段的消息。在 Flink 应用程序中,它们的键为 id
,并分配不同大小的滑动窗口和 slide
10 秒。我们使用 AggregateFunction 计算每个 id 的一些统计信息,并将结果数据流下沉到 Redis。代码如下:
DataStream<Tuple2<String, String>> statistics = messages.keyBy(0)
.timeWindow(Time.seconds(300), Time.seconds(10))
.aggregate(new Min5Aggregate())
.setParallelism(20);
statistics.addSink(new CustomRedisSink()).setParallelism(20);
这样,其他系统就可以通过读取Redis来使用和显示统计数据,并且结果每10秒更新一次。但现在我们的实现遇到了性能问题。我认为原因之一是为某些没有非常活跃的值更新的 id 创建了过多的窗口。
假设窗口大小为 300 秒,并且有一个 id 每隔几个小时才会有新值。但是每次这个id的新值到达时,都会创建30(300s/10s)个窗口,并且这些窗口具有相同的聚合输出,因为在它们过期之前没有更多的新值。我们现在要做的就是在结果流剥皮时将输出与Redis中的值进行比较,如果相同则跳过更新。
为了优化性能,我想知道Flink中是否有任何方法可以阻止窗口在与之前的窗口内容相同时触发FIRE。这样就没有必要与 Redis 进行比较了。或者,如果您对此系统有任何其他优化建议,也会非常有帮助。
*由于还有其他 id 的值经常更新,并且我们需要有最新的统计数据,因此增加幻灯片大小不是一个选择。
最佳答案
我不相信触发器可以保留前一个窗口的状态。 ProcessWindowFunction
可以保留早期窗口的状态,因此这是一个选项。
一个相当简单的解决方案是在窗口和接收器之间插入一个 RichFlatMapFunction
,它会记住先前的结果,并且仅在新结果不同时才生成输出。
要对滑动窗口进行更精细的优化,您可以将窗口实现为 KeyedProcessFunction
。这样,您就可以保留大约 30 个 10 秒切片以及完全聚合的结果 300 秒,然后每隔 10 秒您需要做的就是减去最旧的 10 秒并添加最新的 10 秒。对于繁忙的按键,这应该比将每个事件添加到 30 个窗口中更有效 - 但自己完成所有簿记工作肯定会更多。 Flink 文档包括 an example of doing this for tumbling event time windows ;滑动窗口的扩展留给读者。
关于redis - Flink 窗口滑动尺寸小、内容相同的优化?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/65681792/