在 WindowAssigner
中,一个元素被分配给一个或多个 TimeWindow
实例。如果是滑动事件时间窗口,则会在 SlidingEventTimeWindows#assignWindows
1 中发生。
如果窗口具有 size=5
和 slide=1
,则具有 timestamp=0
的元素将分配到以下窗口中:
- 窗口(开始=0,结束=5)
- 窗口(开始=-1,结束=4)
- 窗口(开始=-2,结束=3)
- 窗口(开始=-3,结束=2)
- 窗口(开始=-4,结束=1)
在一张图片中:
+-> Beginning of time
|
|
+----------------------------------------------+
| size = 5 +--+ element |
| slide = 1 | |
| v |
| t=[ 0,5[ Window 1 XXXXX |
| t=[-1,4[ Window 2 XXXXX |
| t=[-2,3[ Window 3 XXXXX |
| t=[-3,2[ Window 4 XXXXX |
| t=[-4,1[ Window 5 XXXXX |
| |
| time(-4 to +4) ---- |
| 432101234 |
+---------------------------+------------------+
|
|
|
+
有没有办法告诉Flink有一个时间的开始,之前没有窗口?如果不是,从哪里开始改变这一点?在上述情况下,Flink 应该只有一个窗口(t=[4,8[ Window 1
)用于第一个元素。像这样:
+-> Beginning of time
|
|
+-----------------------------------------------+
| size = 5 +--+ element |
| slide = 1 | |
| v |
| t=[ 0,5[ Window 1 XXXXX |
| t=[ 1,6[ Window 2 XXXXX |
| t=[ 2,7[ Window 3 XXXXX |
| t=[ 3,8[ Window 4 XXXXX |
| t=[ 4,9[ Window 5 XXXXX |
| |
| time(-4 to +8) ---- |
| 4321012345678 |
+---------------------------+-------------------+
|
|
|
+
一旦窗口数量达到并超过窗口大小,此操作将不再有效。那么,在上面的例子中,所有元素都在 5 个窗口内。
脚注:
org.apache.flink.streaming.api.windowing.assigners.SlidingEventTimeWindows#assignWindows
最佳答案
目前无法指定 Flink 作业的有效时间间隔。鉴于您可能也想将您的工作应用于历史数据,这也可能有点问题。
不过,您可以做的是手动过滤在超时开始之前启动的窗口:
val env = StreamExecutionEnvironment.getExecutionEnvironment
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
val startTime = 1
val windowLength = 2
val slide = 1
val input = env.fromElements((1,1), (2,2), (3,3))
.assignAscendingTimestamps(x => x._2)
val windowed = input
.timeWindowAll(Time.milliseconds(windowLength), Time.milliseconds(slide))
.apply{ (window, iterable, collector: Collector[Int]) =>
if (window.getStart >= startTime) {
collector.collect(iterable.map(_._1).reduce(_ + _))
} else {
// discard early windows
}
}
windowed.print()
env.execute()
关于apache-flink - Apache 弗林克 : Window Functions and the beginning of time,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/40658477/