我一直在尝试使用 Apache Beam 的计时器,但无法触发它们。
据我所知,您可以在 DoFn 中按以下方式定义计时器。
@TimerId("expiry")
private final TimerSpec expirySpec = TimerSpecs.timer(TimeDomain.PROCESSING_TIME);
我选择了 TimeDomain.PROCESSING_TIME
,因为我的事件没有分配时间戳,并且希望在窗口完成后立即触发计时器的执行。
.apply(
"FixedWindow",
Window.<KV<String, GenericRecord>>into(FixedWindows.of(Duration.standardMinutes(1)))
.triggering(AfterProcessingTime
.pastFirstElementInPane()
.plusDelayOf(Duration.standardSeconds(1)))
.withAllowedLateness(Duration.ZERO)
.accumulatingFiredPanes()
)
.apply("ExecuteAfterWindowFn", ParDo.of(new ExecuteAfterWindowFn()));
我期望下面的计时器,它位于 DoFn 内,基本上在缓冲区内累积对象,并在窗口完成后继续管道并处理事件集...
@OnTimer("expiry")
public void onExpiry(
OnTimerContext context,
@StateId("bufferedSize") ValueState<Integer> bufferedSizeState,
@StateId("buffered") BagState<GenericRecord> bufferedState) throws IOException {
flush(context, bufferedState, bufferedSizeState);
}
...执行成功。我是否遗漏了某些内容或不了解计时器在 Apache Beam 中的工作原理?
最佳答案
您可以检查[1],其中有计时器用法的示例。
您需要设置计时器何时触发[2],其中可能是错过的地方。
[1] https://beam.apache.org/blog/2017/08/28/timely-processing.html
关于java - @OnTimer 在窗口后不触发,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/56869241/