apache-flink - Apache 弗林克 : Window Functions and the beginning of time

标签 apache-flink flink-streaming

WindowAssigner 中,一个元素被分配给一个或多个 TimeWindow 实例。如果是滑动事件时间窗口,则会在 SlidingEventTimeWindows#assignWindows1 中发生。

如果窗口具有 size=5slide=1,则具有 timestamp=0 的元素将分配到以下窗口中:

  1. 窗口(开始=0,结束=5)
  2. 窗口(开始=-1,结束=4)
  3. 窗口(开始=-2,结束=3)
  4. 窗口(开始=-3,结束=2)
  5. 窗口(开始=-4,结束=1)

在一张图片中:

                            +-> Beginning of time
                            |
                            |
+----------------------------------------------+
|     size = 5              +--+ element       |
|    slide = 1              |                  |
|                           v                  |
| t=[ 0,5[ Window 1         XXXXX              |
| t=[-1,4[ Window 2        XXXXX               |
| t=[-2,3[ Window 3       XXXXX                |
| t=[-3,2[ Window 4      XXXXX                 |
| t=[-4,1[ Window 5     XXXXX                  |
|                                              |
| time(-4 to +4)        ----                   |
|                       432101234              |
+---------------------------+------------------+
                            |
                            |
                            |
                            +

有没有办法告诉Flink有一个时间的开始,之前没有窗口?如果不是,从哪里开始改变这一点?在上述情况下,Flink 应该只有一个窗口(t=[4,8[ Window 1)用于第一个元素。像这样:

                            +-> Beginning of time
                            |
                            |
+-----------------------------------------------+
|     size = 5              +--+ element        |
|    slide = 1              |                   |
|                           v                   |
| t=[ 0,5[ Window 1         XXXXX               |
| t=[ 1,6[ Window 2          XXXXX              |
| t=[ 2,7[ Window 3           XXXXX             |
| t=[ 3,8[ Window 4            XXXXX            |
| t=[ 4,9[ Window 5             XXXXX           |
|                                               |
| time(-4 to +8)        ----                    |
|                       4321012345678           |
+---------------------------+-------------------+
                            |
                            |
                            |
                            +

一旦窗口数量达到并超过窗口大小,此操作将不再有效。那么,在上面的例子中,所有元素都在 5 个窗口内。


脚注:

  1. org.apache.flink.streaming.api.windowing.assigners.SlidingEventTimeWindows#assignWindows

最佳答案

目前无法指定 Flink 作业的有效时间间隔。鉴于您可能也想将您的工作应用于历史数据,这也可能有点问题。

不过,您可以做的是手动过滤在超时开始之前启动的窗口:

val env = StreamExecutionEnvironment.getExecutionEnvironment

env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)

val startTime = 1
val windowLength = 2
val slide = 1

val input = env.fromElements((1,1), (2,2), (3,3))
               .assignAscendingTimestamps(x => x._2)

val windowed = input
      .timeWindowAll(Time.milliseconds(windowLength), Time.milliseconds(slide))
      .apply{ (window, iterable, collector: Collector[Int]) =>
         if (window.getStart >= startTime) {
           collector.collect(iterable.map(_._1).reduce(_ + _))
         } else {
           // discard early windows
         }
       }

windowed.print()

env.execute()

关于apache-flink - Apache 弗林克 : Window Functions and the beginning of time,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/40658477/

相关文章:

java - Apache Flink(集群中的标准输出错误)

java - Flink 按字段 id 对记录进行分组的最佳方式

java - flink SourceFunction<> 在 StreamExecutionEnvironment.addSource() 中被替换?

apache-flink - 我可以将自定义分区器与分组依据一起使用吗?

apache-flink - Apache 弗林克 : Is MapState automatically updated when I modify a stored object?

scala - 使用 Flink 从 kafka 主题的开头进行消费

Python - Apache Beam - Flink 运行器设置 : ReadFromKafka returns error - RuntimeError: cannot encode a null byte[]

json - Apache Flink:无法从 ObjectNode::get 中提取 key

apache-flink - flink-zeppelin-没有响应

apache-flink - Akka 流与 Apache Flink