java - 如何创建在固定时间间隔内触发一次且仅触发一次的流式Beam管道

标签 java google-cloud-dataflow apache-beam

我需要创建一个 Apache Beam (Java) 流作业,该作业应每 60 秒启动一次(且仅启动一次)。

我通过使用GenerateSequence、Window 和Combine,使用DirectRunner 使其正常工作。

但是,当我在 Google Dataflow 上运行它时,有时它会在 60 秒窗口内触发多次。我猜这与延迟和乱序消息有关。

Pipeline pipeline = Pipeline.create(options);
pipeline
    // Jenerate a tick every 15 seconds
    .apply("Ticker", GenerateSequence.from(0).withRate(1, Duration.standardSeconds(15)))
    // Just to check if individual ticks are being generated once every 15 second
    .apply(ParDo.of(new DoFn<Long, Long>() {
            @ProcessElement
            public void processElement(@Element Long tick, OutputReceiver<Long> out) {
                ZonedDateTime currentInstant = Instant.now().atZone(ZoneId.of("Asia/Jakarta"));
                LOG.warn("-" + tick + "-" + currentInstant.toString());
                out.output(word);
            }
        }
    ))
    // 60 Second window
    .apply("Window", Window.<Long>into(FixedWindows.of(Duration.standardSeconds(60))))
    // Emit once per 60 second 
    .apply("Cobmine window into one", Combine.globally(Count.<Long>combineFn()).withoutDefaults())
    .apply("START", ParDo.of(new DoFn<Long, ZonedDateTime>() {
            @ProcessElement
            public void processElement(@Element Long count, OutputReceiver<ZonedDateTime> out) {
                ZonedDateTime currentInstant = Instant.now().atZone(ZoneId.of("Asia/Jakarta"));
                // LOG just to check
                // This log is sometimes printed more than once within 60 seconds
                LOG.warn("x" + count + "-" + currentInstant.toString());
                out.output(currentInstant);
            }
        }
    ));

Double run at 12:46 and 12:48

它在大多数情况下都有效,除了随机每 5 或 10 分钟一次,我在同一分钟内看到两个输出。如何确保上面的“START”每 60 秒运行一次?谢谢。

最佳答案

简短回答:目前还不能,Beam 模型专注于事件时处理和后期数据的正确处理。

解决方法:您可以定义一个处理时间计时器,但您必须手动处理计时器和延迟数据的输出和处理,see thisthis .

更多详细信息:

Beam 中的窗口和触发器通常在事件时间中定义,而不是在处理时间中定义。这样,如果您在发出窗口结果后有延迟数据,则延迟数据仍然会出现在正确的窗口中,并且可以为该窗口重新计算结果。梁模型允许您表达该逻辑,并且其大部分功能都是为此量身定制的。

这也意味着通常不需要 Beam 管道在某个特定的现实时间(例如说“根据事件本身中的数据聚合属于某个窗口的事件,然后每分钟输出该窗口”这样的说法是没有意义的。 Beam runner 聚合窗口的数据,可能会等待较晚的数据,然后在它认为正确时立即发出结果。数据准备好发出的条件由触发器指定。但这只是 - 当窗口数据准备好发出时的条件,它实际上并不强制运行器发出它。因此,运行程序可以在满足触发条件后的任何时间点发出它,并且结果将是正确的,即,如果自满足计时器条件以来有更多事件到达,则仅处理属于具体窗口的事件在那个窗口中。

事件时间窗口不能与处理时间触发一起使用,并且 Beam 中没有方便的原语(触发器/窗口)来处理存在延迟数据的处理时间。在此模型中,如果您使用仅触发一次的触发器,则会丢失最新数据,并且仍然无法定义强大的处理时间触发器。要构建这样的东西,您必须能够指定诸如现实生活中的时间点之类的内容,从该时间点开始测量处理时间,并且您将必须处理不同处理时间和可能在整个过程中发生的延迟的问题。大量 worker 机器。目前这还不是 Beam 的一部分。

Beam 社区做出了一些努力来实现此用例,例如sink triggersretractions这将允许您在事件时间空间中定义管道,但无需复杂的事件时间触发器。结果可以立即更新/重新计算并发出,或者可以在接收器处指定触发器,例如“我希望输出表每分钟更新一次”。结果将自动更新并重新计算最新数据,无需您的参与。但目前这些努力还远未完成,因此您目前最好的选择是使用 existing triggers 之一或使用 timers 手动处理所有内容。

关于java - 如何创建在固定时间间隔内触发一次且仅触发一次的流式Beam管道,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/57265163/

相关文章:

java - 如何在 Google Dataflow 中组合两组不同的标识符?

python - 慢慢改变 BigQuery 的查找缓存 - Dataflow Python Streaming SDK

python - Google Cloud DataFlow 无法将文件写入临时位置

java - 单例或连接池以获得高性能?

java - java中for循环数组只处理一个元素?

google-bigquery - 通过加载作业(非流式)插入 BigQuery

python - 通过 Airflow 中的 PythonVirtualenvOperator 成功运行多次数据流管道

java - 如何获取元素 ID 中的属性

java - 有没有更好的方法来为 java 编写这段代码?还是让它更干净?

python - 在Python中使用Google数据流从Bigquery到Bigtable数据传输