java - 事件时间过期的消息导致 java 堆空间 OutOfMemory 错误

标签 java apache-flink

我正在运行一个具有正常翻滚事件时间窗口(窗口大小为 1 小时)的作业。运行足够长的时间后,它将抛出有关 java 堆空间不足的错误。现在,有关正在处理的数据的问题是,今天中午将出现一条消息,接下来的 15k 条左右将来自一周前(这不是数据预期的样子,但它应该以任何一种方式处理)。因此,即使有允许的延迟,水印也远远超过了接下来 15k 消息的事件时间,因此应该丢弃延迟的消息。或者至少我是这么想的,因为他们不再在那个窗口中。

所以我的问题是这样的。 Flink 是否会维护过期的消息,即使它们没有被窗口使用?或者只是为了他们的翻滚窗口,我应该设置其他内容或某些属性以确保过期数据不会耗尽内存?

感谢您的帮助!

编辑

DataStream<OutputObject> outputStream = sourceData
    .assignTimestampsAndWatermarks(new BoundedOutOfOrdernessTimestampExtractor<Record>(Time.minutes(1)) {
        @Override
        public long extractTimestamp(Record record) {
            long eventTimeFromRecord = record.eventTimestamp;

            return eventTimeFromRecord;
        }
    })
    .keyBy("fieldToKeyBy")
    .window(TumblingEventTimeWindows.of(Time.hours(1)))
    .apply(new ApplyFunction());

最佳答案

当源的并行度为 n 时,就会有 n 个水印——每个并行子任务都有一个水印。如果 Flink 作业在今天中午收到一 strip 有时间戳的消息,然后收到一周前的许多事件,则该消息只会提前其中一个并行任务的水印,而其他 n-1 个任务仍将具有 Long .min_value 作为他们的水印。因此,那些“迟到”事件只会在并行窗口运算符之一中被识别为迟到,而其他 n-1 个窗口将继续处理这些“迟到”事件。

请注意,如果您刚刚从检查点或保存点恢复,也可能会发生这种情况,因为水印未保存在检查点或保存点中。这意味着您不能指望以前作业的消息流量来更新水印。

关于java - 事件时间过期的消息导致 java 堆空间 OutOfMemory 错误,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/46531141/

相关文章:

apache-flink - 管理具有大量内存使用的状态 - 从存储中查询

java - 为什么我会收到未检查的作业警告?

Java 9.0 |类加载器::getResourceAsStream: NullPointerException

java - twitter4j 程序上出现 "unexpected token: int"错误

apache-flink - Apache Flink : guideliness for setting parallelism?

flink 进程中 Java 反射的 java.lang.NoSuchMethodError 异常

java - 在二叉树中搜索字符串

java - 发疯 : Why is array in member class zero

apache-flink - Flink时间特性和AutoWatermarkInterval

elasticsearch - 如何从 Apache Flink 写入 Elasticsearch