java - 强制驱逐滑动事件窗口以在 Flink 上进行处理(历史流)

标签 java apache-flink flink-streaming

目前我正在使用Flink进行流处理引擎的研究。在我的研究中,我使用历史流,它由以下形式的元组组成:

事件时间、attribute_1、...、attribute_X

其中 event_time 在处理过程中用作 TimeCharacteristic.EventTime。此外,我通过以下任一方式将数据集插入处理拓扑:(i) 创建内存结构,或 (ii) 通过读取 CSV 文件本身。

不幸的是,我注意到即使足够的元组已到达完成整个窗口的窗口运算符,该窗口也不会被推送到下游进行处理。结果,性能显着下降,并且多次出现 OutOfMemoryError 异常(具有大量历史流)。

为了说明典型用例,我提供以下示例:

StreamExecutionEnvironment env = 
    StreamExecutionEnvironment.createLocalEnvironment();
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
env.setParallelism(1);
env.setMaxParallelism(1);
List<Tuple2<Long, Integer>> l = new ArrayList<>();
    l.add(new Tuple2<>(1L, 11));
    l.add(new Tuple2<>(2L, 22));
    l.add(new Tuple2<>(3L, 33));
    l.add(new Tuple2<>(4L, 44));
    l.add(new Tuple2<>(5L, 55));
    DataStream<Tuple2<Long, Integer>> stream = env.fromCollection(l);
    stream.assignTimestampsAndWatermarks(
        new AscendingTimestampExtractor<Tuple2<Long, Integer>>() {
            @Override
            public long extractAscendingTimestamp(Tuple2<Long, Integer> t) {
                return t.f0;
            }
        })
        .windowAll(SlidingEventTimeWindows.of(Time.milliseconds(2), 
                Time.milliseconds(1)))
        .sum(1)
        .print();
    env.execute();

根据l的内容,我需要得到以下窗口结果:

  • [0, 2) 总和:11
  • [1, 3) 总和:33
  • [2, 4) 总和:55
  • [3, 5) 总和:77
  • [4, 6) 总和:99
  • [5, 7) 总和:55

每个列表项都可以读取为[开始时间戳,结束时间戳),总和:X。

我希望每次出现一个时间戳超出打开窗口的结束时间戳的元组时,Flink 都会生成一个窗口结果。例如,我希望当将带有时间戳 4L 的元组输入窗口运算符时,生成窗口 [1, 3) 的求和。然而,当来自l的所有元组被插入流的拓扑中时,处理开始。当我使用较大的历史流时,也会发生同样的事情,这会导致性能下降(甚至耗尽内存)。

问题:如何强制 Flink 在窗口完成时将窗口推送到下游进行处理?

我相信对于 SlidingEventTimeWindows 来说,窗口的逐出是由水印触发的。如果前面的情况成立,我该如何编写我的拓扑,以便它们在具有较晚时间戳的元组到达时触发窗口?

谢谢

最佳答案

AscendingTimestampExtractor 使用周期性水印策略,其中 Flink 将每 n 毫秒调用一次 getCurrentWatermark() 方法,其中 n 为 autowatermarkinterval .

默认间隔为 200 毫秒,与窗口大小相比非常长。然而,它们不能直接比较——200 毫秒是按处理时间而不是事件时间来测量的。尽管如此,我怀疑如果您没有更改此配置设置,那么在发出第一个水印之前会创建很多窗口,我认为这可以解释您所看到的内容。

您可以减少自动水印间隔(也许减少到 1 毫秒)。或者您可以实现 AssignerWithPunctuatedWatermarks ,这会给你更多的控制权。

关于java - 强制驱逐滑动事件窗口以在 Flink 上进行处理(历史流),我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/48647930/

相关文章:

java - 弗林克 : Jar file execution on Flink cluster

java - Ant + Junit 问题

java - IntelliJ IDEA 调试器在停止时不会终止进程

javax.persistence.PersistenceException : org. hibernate.PersistentObjectException : detached entity passed to persist?

java - 事件处理问题

kubernetes - Apache Flink 作业未调度到 Kubernetes 中的多个 TaskManager(副本)上

apache-flink - Flink 如何在 S3 中将 DataSet 写成 Parquet 文件?

apache-flink - 如何在 Apache Flink 中并行写入接收器

apache-flink - 无法将保存点从 1.2.1 恢复到 1.4

apache-flink - Flink 一个作业中的多个作业或多个管道