java - @OnTimer 在窗口后不触发

标签 java google-cloud-dataflow apache-beam

我一直在尝试使用 Apache Beam 的计时器,但无法触发它们。

据我所知,您可以在 DoFn 中按以下方式定义计时器。

@TimerId("expiry")
private final TimerSpec expirySpec = TimerSpecs.timer(TimeDomain.PROCESSING_TIME);

我选择了 TimeDomain.PROCESSING_TIME,因为我的事件没有分配时间戳,并且希望在窗口完成后立即触发计时器的执行。

        .apply(
             "FixedWindow",
            Window.<KV<String, GenericRecord>>into(FixedWindows.of(Duration.standardMinutes(1)))
                .triggering(AfterProcessingTime
                    .pastFirstElementInPane()
                    .plusDelayOf(Duration.standardSeconds(1)))
                .withAllowedLateness(Duration.ZERO)
                .accumulatingFiredPanes()
        )
        .apply("ExecuteAfterWindowFn", ParDo.of(new ExecuteAfterWindowFn()));

我期望下面的计时器,它位于 DoFn 内,基本上在缓冲区内累积对象,并在窗口完成后继续管道并处理事件集...

        @OnTimer("expiry")
    public void onExpiry(
        OnTimerContext context,
        @StateId("bufferedSize") ValueState<Integer> bufferedSizeState,
        @StateId("buffered") BagState<GenericRecord> bufferedState) throws IOException {
        flush(context, bufferedState, bufferedSizeState);
    }

...执行成功。我是否遗漏了某些内容或不了解计时器在 Apache Beam 中的工作原理?

最佳答案

您可以检查[1],其中有计时器用法的示例。

您需要设置计时器何时触发[2],其中可能是错过的地方。

[1] https://beam.apache.org/blog/2017/08/28/timely-processing.html

[2] https://github.com/apache/beam/blob/master/sdks/java/core/src/main/java/org/apache/beam/sdk/state/Timer.java#L53

关于java - @OnTimer 在窗口后不触发,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/56869241/

相关文章:

java - 使用 Cloud Dataflow 运行外部库

java - 使用 TextIO 和 ValueProvider 创建数据流模板时出错

java - 在 IntelliJ IDEA 下使用 Lombok 时无法编译项目

java - maven war插件如何配置多个输出目录?

java - 程序化 Servlet 3.0 JSP jsp-property-group 配置

google-cloud-dataflow - GroupBy/Combine 之后如何创建数据流包?

java - 单选按钮被取消选中并在 ListView 滚动中自动选中

python - 如何在私有(private)子网上运行Dataflow python?

java - 尝试使用 DataflowRunner 时出现 ClassNotFound 异常

java - 在 Apache Beam 中使用 BigQuery 处理空 PCollection