(5d8e3f411b5a4ccb): java.lang.IllegalStateException: TimestampCombiner moved element from 2017-09-25T13:53:08.725Z to earlier time 2017-09-25T13:53:08.718Z for window [2017-09-25T13:53:08.088Z..2017-09-25T13:53:08.719Z)
预期的原因是什么?
WindowFn 代码很简单:
public class BQTablePartitionWindowFn extends NonMergingWindowFn<Object, IntervalWindow> {
/**
*
*/
private static final long serialVersionUID = 1L;
private IntervalWindow assignWindow(AssignContext context) {
TableRow tableRow = (TableRow) context.element();
String timestamp = tableRow.get(BQConstants.LOG_TIME).toString();
String currentTime = DateUtil.getFormatedDate(new Date());
DateTimeFormatter formatter = DateTimeFormat.forPattern(CommonConstants.DATE_FORMAT_YYYYMMDD_HHMMSS_SSS)
.withZoneUTC();
Instant start_point = Instant.parse(timestamp, formatter);
Instant end_point = Instant.parse(currentTime, formatter);
return new IntervalWindow(start_point, end_point);
};
@Override
public Coder<IntervalWindow> windowCoder() {
return IntervalWindow.getCoder();
}
@Override
public Collection<IntervalWindow> assignWindows(AssignContext c) throws Exception {
return Arrays.asList(assignWindow(c));
}
@Override
public boolean isCompatible(WindowFn<?, ?> other) {
return false;
}
@Override
public WindowMappingFn<IntervalWindow> getDefaultWindowMappingFn() {
throw new IllegalArgumentException(
"Attempted to get side input window for GlobalWindow from non-global WindowFn");
}
}
最佳答案
GroupByKey
的默认行为是输出时间戳为窗口中允许的最大时间戳的迭代。对于您的窗口,这是时间戳 13:53:08.718Z
。
该元素的时间戳 13:53:08.725Z
不在 13:53:08.088Z
到 13:53 的窗口内: 08.719Z
.
您可以分享您的 WindowFn
以及任何可调整时间戳的 ParDo
吗?
更新:感谢您分享您的WindowFn
。有一些事情会给您带来问题。
<强>1。分配窗口的开始时间不基于元素的时间戳。
您提取元素的列并根据context.element().get(BQConstants.LOG_TIME) 的值分配窗口(忽略强制转换和解析)。从您的错误消息来看,这似乎不是 context.timestamp()
的实际值,它是元素的事件时间时间戳。
相反,您应该编写 WindowFn
以使用 context.timestamp()
。您可以根据您的数据是否有界以不同的方式确保时间戳是您想要的:
- 如果您的数据有界,您可以使用
WithTimestamps
通过提取该字段来分配时间戳。 - 如果您的数据不受限制,则源需要了解更多信息才能管理水印,因此配置取决于源。例如,
PubsubIO
从您可以指定的属性中读取时间戳。
<强>2。分配窗口的结束时间基于系统日期
几个问题:
- 结束时间向下舍入,并且可能早于开始时间,从而导致窗口无效。
- 结束时间不确定。 Beam 中的一般期望是,您将主要根据元素的时间戳(必须位于窗口结束之前)、其次根据元素本身来确定性地分配窗口。分配这样的不确定性窗口可能有不可预见的缺点。一个已知问题是您的结果不可重现,如果您需要修复数据处理错误或对存档数据运行实验,这可能会带来麻烦。这取决于您的用例,但您可能会考虑一些更面向 future 的东西。
这里的目标是什么?您设置此功能只是为了提取动态目标的端点吗?如果是这样,我建议根据发生的时间而不是处理的时间对数据进行分区。
关于java - 在数据流运行程序中运行管道时有时会出现 IllegalStateException,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/46418568/