redis - Flink 窗口滑动尺寸小、内容相同的优化?

标签 redis apache-flink

我们开发了一个 Flink 应用程序,它接收具有类似 id, value1, value2, ... 字段的消息。在 Flink 应用程序中,它们的键为 id ,并分配不同大小的滑动窗口和 slide 10 秒。我们使用 AggregateFunction 计算每个 id 的一些统计信息,并将结果数据流下沉到 Redis。代码如下:

DataStream<Tuple2<String, String>> statistics = messages.keyBy(0)
    .timeWindow(Time.seconds(300), Time.seconds(10))
    .aggregate(new Min5Aggregate())
    .setParallelism(20);

statistics.addSink(new CustomRedisSink()).setParallelism(20);

这样,其他系统就可以通过读取Redis来使用和显示统计数据,并且结果每10秒更新一次。但现在我们的实现遇到了性能问题。我认为原因之一是为某些没有非常活跃的值更新的 id 创建了过多的窗口。

假设窗口大小为 300 秒,并且有一个 id 每隔几个小时才会有新值。但是每次这个id的新值到达时,都会创建30(300s/10s)个窗口,并且这些窗口具有相同的聚合输出,因为在它们过期之前没有更多的新值。我们现在要做的就是在结果流剥皮时将输出与Redis中的值进行比较,如果相同则跳过更新。

为了优化性能,我想知道Flink中是否有任何方法可以阻止窗口在与之前的窗口内容相同时触发FIRE。这样就没有必要与 Redis 进行比较了。或者,如果您对此系统有任何其他优化建议,也会非常有帮助。

*由于还有其他 id 的值经常更新,并且我们需要有最新的统计数据,因此增加幻灯片大小不是一个选择。

最佳答案

我不相信触发器可以保留前一个窗口的状态。 ProcessWindowFunction 可以保留早期窗口的状态,因此这是一个选项。

一个相当简单的解决方案是在窗口和接收器之间插入一个 RichFlatMapFunction ,它会记住先前的结果,并且仅在新结果不同时才生成输出。

要对滑动窗口进行更精细的优化,您可以将窗口实现为 KeyedProcessFunction。这样,您就可以保留大约 30 个 10 秒切片以及完全聚合的结果 300 秒,然后每隔 10 秒您需要做的就是减去最旧的 10 秒并添加最新的 10 秒。对于繁忙的按键,这应该比将每个事件添加到 30 个窗口中更有效 - 但自己完成所有簿记工作肯定会更多。 Flink 文档包括 an example of doing this for tumbling event time windows ;滑动窗口的扩展留给读者。

关于redis - Flink 窗口滑动尺寸小、内容相同的优化?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/65681792/

相关文章:

node.js - 我如何在 node.js 中使用 redis?

python - 向所有 worker 广播任务 : redis+celery

list - 删除 Redis 列表中的所有条目

scala - Flink 可以处理单个表/窗口约 50 GB 的状态吗?

apache-flink - 以下几种在 Apache Flink 中进行字数统计的方法有什么区别?

python - 使用 'spawn'启动Redis进程但面临TypeError:无法腌制_thread.lock对象

apache-flink - Apache 弗林克 : How to apply multiple counting window functions?

apache-kafka - 当我重新运行 Flink 消费者时,Kafka 再次消费最新的消息

java - org.apache.flink.util.FlinkException : Releasing shared slot parent

ruby - 如何使用redis ruby​​计算redis中的set complement操作?