java - Apache Beam/Java,如何设置窗口/触发器为每个窗口仅发送一次数据

标签 java apache-beam

我有一个管道如下:

Window<String> fixedWindow = Window.<String>into(FixedWindows.of(Duration.standardSeconds(options.getWindowDuration())))
      .triggering(
        AfterWatermark.pastEndOfWindow()
          .withEarlyFirings(AfterProcessingTime
            .pastFirstElementInPane().plusDelayOf(Duration.standardSeconds(options.getWindowDuration()))))
      .withAllowedLateness(Duration.ZERO)
      .discardingFiredPanes();

PCollectionTuple productProcessorPT = pipeline
  .apply(READ_PRODUCT_FROM_PUBSUB.getName(), PubsubIO.readStrings()
    .fromSubscription(options.getProductSubscription()))
  .apply(PRODUCT_WINDOW.getName(), fixedWindow)
  .apply(PROCESS_PRODUCT.getName(), ParDo.of(new ProductProcessor()))
  .apply(GROUP_PRODUCT_DATA.getName(), GroupByKey.create())
  .apply(COMBINE_PRODUCT_DATA.getName(), ParDo.of(new ProductCombiner())
    .withOutputTags(KV_STRING_OBJECTNODE, TupleTagList.of(PIPELINE_ERROR)));

我想要实现的是设置一个窗口/触发器,每 60 秒收集一次数据,然后将数据发送到下一个转换。我怎样才能做到这一点?我不关心事件时间戳。

上面的代码每 60 秒将数据发送到下一个转换,但即使没有新数据进入管道,它也会不断触发/发送(相同的)数据。不确定为什么会发生这种情况?

最佳答案

您可以删除触发并仅使用 FixedWindows 如下所示每 60 秒发出一次记录

Window<String> fixedWindow = Window.<String>into(FixedWindows.of(Duration.standardSeconds(options.getWindowDuration())));

这将使用默认触发和处理延迟事件,这基本上意味着数据在窗口末尾发出,并且所有延迟事件都将被忽略。

关于java - Apache Beam/Java,如何设置窗口/触发器为每个窗口仅发送一次数据,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/61037001/

相关文章:

java - Android Studio:从Github导入Project并重新构建/清理/与Gradle文件同步后的 “cannot resolve symbol R”

google-cloud-platform - Apache 梁 : ReadFromText versus ReadAllFromText

java - 是否可以在从 Pub/Sub 写入 BigQuery 的 Google Cloud Dataflow 管道中捕获丢失的数据集 java.lang.RuntimeException?

python - 在 Apache Beam 中导入 Google Firestore Python 客户端

java - RecyclerViewAdapter OnBindViewHolder 的 ID 为空

java - 解决org.apache.hadoop.conf.Configuration时出错

java - 数据属性的 OWL 类表达式

java - 将参数传递给 JDBC PreparedStatement

java - Apache 光束 : Unable to find registrar for gs

java - Apache Beam - 与无限 PCollection 的集成测试