java - @OnTimer 方法在触发时接收空引用

标签 java google-cloud-platform google-cloud-dataflow apache-beam

我最近一直在处理一个让我发疯的问题,因为它只是在部署到 Dataflow 中后才会发生,但从来没有在本地运行过,一切都完美无缺。仅供引用,我正在使用 Apache Beam 2.9.0

我正在定义一个 DoFn 步骤,它将事件缓冲一段时间(例如 5 分钟),然后在该时间之后触发一些逻辑。

@StateId("bufferSize")
private final StateSpec<ValueState<Integer>> bufferSizeSpec =
  StateSpecs.value(VarIntCoder.of());

@StateId("eventsBuffer")
private final StateSpec<BagState<String>> eventsBufferSpec =
  StateSpecs.bag(StringUtf8Coder.of());

@TimerId("trigger")
private final TimerSpec triggerSpec = 
  TimerSpecs.timer(TimeDomain.PROCESSING_TIME);

我有我的 processElement 逻辑来添加传入事件...

@ProcessElement
public void processElement(
    ProcessContext processContext,
    @StateId("bufferSize") ValueState<Integer> bufferSize,
    @StateId("eventsBuffer") BagState<String> eventsBuffer,
    @TimerId("trigger") Timer triggerTimer) {

  triggerTimer.offset(Duration.standardMinutes(1)).setRelative();
  int size = ObjectUtils.firstNonNull(bufferSize.read(), 0);
  eventsBuffer.add(processContext.element().getValue());
  bufferSize.write(++size);
}

然后我的触发器...

@OnTimer("trigger")
public void onExpiry(
    @StateId("bufferSize") ValueState<Integer> bufferSize,
    @StateId("eventsBuffer") BagState<String> eventsBuffer) throws Exception {

  doSomethingHere();
}

每当执行onExpiry时,它接收到的参数都是null和0。

集群方面会发生什么?

编辑:

DoFn 之前使用的窗口。

.apply(
  "1min Window",
  Window
    .<KV<String, String>>into(
        FixedWindows.of(Duration.standardMinutes(1)))
    .triggering(AfterProcessingTime
                  .pastFirstElementInPane()
                  .plusDelayOf(Duration.standardSeconds(1)))
    .withAllowedLateness(Duration.ZERO)
    .accumulatingFiredPanes())

最佳答案

需要注意的是,当窗口过期时,状态将被 GC 回收。

因此,对于 key-1,您的 Bag 对象将包含 {key-1, TimeInterval-1} 、 {key-1,TimeInterval-2} 等的数据。

如果您希望输入值和计时器之间具有强语义,您可能需要探索 EventTime 计时器的使用。

关于java - @OnTimer 方法在触发时接收空引用,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/56988591/

相关文章:

java - 分离android应用程序的功能和设计

python - Google App Engine - YouTube Python 客户端库错误

google-cloud-platform - 长时间运行的 Dataflow 作业失败,用户代码中没有错误

google-bigquery - 通过加载作业(非流式)插入 BigQuery

java - 如何查找第三方工具的 java 已弃用 api?

java - 将 Java 泛型参数类型限制为最终类的列表

go - 删除集群时删除所有关联的永久磁盘

google-cloud-platform - GCP Kubeflow 和 GCP cloud composer 有什么区别?

python - 数据流: Look up a previous event in an event stream

java - 如何在 Google map 上的标记上方添加文本?