java - 在数据流运行程序中运行管道时有时会出现 IllegalStateException

标签 java google-cloud-platform google-cloud-dataflow illegalstateexception

(5d8e3f411b5a4ccb): java.lang.IllegalStateException: TimestampCombiner moved element from 2017-09-25T13:53:08.725Z to earlier time 2017-09-25T13:53:08.718Z for window [2017-09-25T13:53:08.088Z..2017-09-25T13:53:08.719Z)

预期的原因是什么?

WindowFn 代码很简单:

public class BQTablePartitionWindowFn extends NonMergingWindowFn<Object, IntervalWindow> {

/**
 * 
 */
private static final long serialVersionUID = 1L;

private IntervalWindow assignWindow(AssignContext context) {
    TableRow tableRow = (TableRow) context.element();
    String timestamp = tableRow.get(BQConstants.LOG_TIME).toString();
    String currentTime = DateUtil.getFormatedDate(new Date());
    DateTimeFormatter formatter = DateTimeFormat.forPattern(CommonConstants.DATE_FORMAT_YYYYMMDD_HHMMSS_SSS)
            .withZoneUTC();
    Instant start_point = Instant.parse(timestamp, formatter);
    Instant end_point = Instant.parse(currentTime, formatter);

    return new IntervalWindow(start_point, end_point);
};

@Override
public Coder<IntervalWindow> windowCoder() {
    return IntervalWindow.getCoder();
}

@Override
public Collection<IntervalWindow> assignWindows(AssignContext c) throws Exception {
    return Arrays.asList(assignWindow(c));
}

@Override
public boolean isCompatible(WindowFn<?, ?> other) {
    return false;
}

@Override
public WindowMappingFn<IntervalWindow> getDefaultWindowMappingFn() {
    throw new IllegalArgumentException(
            "Attempted to get side input window for GlobalWindow from non-global WindowFn");
}

}

最佳答案

GroupByKey 的默认行为是输出时间戳为窗口中允许的最大时间戳的迭代。对于您的窗口,这是时间戳 13:53:08.718Z

该元素的时间戳 13:53:08.725Z 不在 13:53:08.088Z13:53 的窗口内: 08.719Z.

您可以分享您的 WindowFn 以及任何可调整时间戳的 ParDo 吗?

更新:感谢您分享您的WindowFn。有一些事情会给您带来问题。

<强>1。分配窗口的开始时间不基于元素的时间戳。

您提取元素的列并根据context.element().get(BQConstants.LOG_TIME) 的值分配窗口(忽略强制转换和解析)。从您的错误消息来看,这似乎不是 context.timestamp() 的实际值,它是元素的事件时间时间戳。

相反,您应该编写 WindowFn 以使用 context.timestamp()。您可以根据您的数据是否有界以不同的方式确保时间戳是您想要的:

  • 如果您的数据有界,您可以使用 WithTimestamps 通过提取该字段来分配时间戳。
  • 如果您的数据不受限制,则源需要了解更多信息才能管理水印,因此配置取决于源。例如,PubsubIO 从您可以指定的属性中读取时间戳。

<强>2。分配窗口的结束时间基于系统日期

几个问题:

  • 结束时间向下舍入,并且可能早于开始时间,从而导致窗口无效。
  • 结束时间不确定。 Beam 中的一般期望是,您将主要根据元素的时间戳(必须位于窗口结束之前)、其次根据元素本身来确定性地分配窗口。分配这样的不确定性窗口可能有不可预见的缺点。一个已知问题是您的结果不可重现,如果您需要修复数据处理错误或对存档数据运行实验,这可能会带来麻烦。这取决于您的用例,但您可能会考虑一些更面向 future 的东西。

这里的目标是什么?您设置此功能只是为了提取动态目标的端点吗?如果是这样,我建议根据发生的时间而不是处理的时间对数据进行分区。

关于java - 在数据流运行程序中运行管道时有时会出现 IllegalStateException,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/46418568/

相关文章:

redis - 有没有办法使用内置的 Apache Beam Redis I/O 转换来执行 Redis GET 命令?

google-cloud-dataflow - 在 Apache Beam 中按顺序触发窗口

Java,这个线程安全吗

java - Freemarker 多个配置实例

java - 在等待另一个变量初始化时,如何执行执行某些操作的方法?

kubernetes - 带有Ingress设置的GKE始终会显示状态为“不健康”

java - 将视频文件 (.mp4) 集成到 docx/doc 中

SSH 失败,错误代码 "no error"

google-cloud-platform - 如何利用多个 Google Cloud TPU 训练单个模型

google-bigquery - 如何将 Google Cloud SQL 与 Google Big Query 集成