我有一个管道如下:
Window<String> fixedWindow = Window.<String>into(FixedWindows.of(Duration.standardSeconds(options.getWindowDuration())))
.triggering(
AfterWatermark.pastEndOfWindow()
.withEarlyFirings(AfterProcessingTime
.pastFirstElementInPane().plusDelayOf(Duration.standardSeconds(options.getWindowDuration()))))
.withAllowedLateness(Duration.ZERO)
.discardingFiredPanes();
PCollectionTuple productProcessorPT = pipeline
.apply(READ_PRODUCT_FROM_PUBSUB.getName(), PubsubIO.readStrings()
.fromSubscription(options.getProductSubscription()))
.apply(PRODUCT_WINDOW.getName(), fixedWindow)
.apply(PROCESS_PRODUCT.getName(), ParDo.of(new ProductProcessor()))
.apply(GROUP_PRODUCT_DATA.getName(), GroupByKey.create())
.apply(COMBINE_PRODUCT_DATA.getName(), ParDo.of(new ProductCombiner())
.withOutputTags(KV_STRING_OBJECTNODE, TupleTagList.of(PIPELINE_ERROR)));
我想要实现的是设置一个窗口/触发器,每 60 秒收集一次数据,然后将数据发送到下一个转换。我怎样才能做到这一点?我不关心事件时间戳。
上面的代码每 60 秒将数据发送到下一个转换,但即使没有新数据进入管道,它也会不断触发/发送(相同的)数据。不确定为什么会发生这种情况?
最佳答案
您可以删除触发并仅使用 FixedWindows
如下所示每 60 秒发出一次记录
Window<String> fixedWindow = Window.<String>into(FixedWindows.of(Duration.standardSeconds(options.getWindowDuration())));
这将使用默认触发和处理延迟事件,这基本上意味着数据在窗口末尾发出,并且所有延迟事件都将被忽略。
关于java - Apache Beam/Java,如何设置窗口/触发器为每个窗口仅发送一次数据,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/61037001/