google-cloud-dataflow - 具体版本 : PubSub/Dataflow acknowledgement of unbounded data

标签 google-cloud-dataflow apache-beam google-cloud-pubsub

我有一个项目,它有一个 apache beam 管道,其依赖项的设置方式使得我必须使用 PubSub 的 0.20.0-beta 版。 此管道一直运行(无限制)。

[+] 问题: PubSub 消息每 30 分钟左右重复一次。

[+] 我尝试过的: 我读过许多解决方案,其中提到数据流运行器如何具有确认发生的检查点。我还读过,使用 GroupByKey 等 PTransform 可以更快地确认这些消息。所以我尝试了窗口化、按键触发和分组,但我仍然从 PubSub 收到重复的消息。

[+] 问题: 我究竟做错了什么?为什么消息没有被确认? (如果我理解正确,它不会在管道结束执行之前被确认??但我的管道需要很长时间,如何提前确认?)

这是特定于 0.20.0-beta 的“版本”错误,还是我应该能够将 PubsubIO.Reader 与窗口和触发一起使用以便更早确认?

[+]代码:

窗口时间为 10 秒,PubSub ack 截止时间为 60 秒。

     .apply("Listen_To_PubSub", PubsubIO.readStrings().fromSubscription(subscription))
            .apply("Windowing", Window.<String> into(window).triggering(AfterProcessingTime.pastFirstElementInPane().plusDelayOf(timeLimit)).withAllowedLateness(Duration.ZERO).discardingFiredPanes())
            .apply("DeleteFromBQ", ParDo.of(new DeleteFromBQ()))
            .apply("Mapping", ParDo.of(new Mapping()))
            .apply("GroupByKey", GroupByKey.<String,String>create())
            .apply("Acknowledge", ParDo.of(new Grouped()))
            .apply("DoSomething1", ParDo.of(new DoSomething1()))
            .apply("Flatten_Iterable", Flatten.iterables())
            .apply("DoSomething2", ParDo.of(new DoSomething2()))
            .apply("DoSomething3", ParDo.of(new DoSomething3()))
            .apply("DoSomething4", ParDo.of(new DoSomething4()))
            .apply("Write_To_BigQuery", BigQueryIO.writeTableRows()
                    .to(output)
                    .withSchema(schema)
                    .withCreateDisposition(BigQueryIO.Write.CreateDisposition.CREATE_NEVER)
                    .withWriteDisposition(BigQueryIO.Write.WriteDisposition.WRITE_APPEND)
            );

提前致谢!任何输入表示赞赏。

最佳答案

当您应用如此多的转换时,您似乎超过了 60 秒的确认期限。要查看需要多长时间,我建议使用 Logging Pipeline Messages .我认为您可能需要尽快移动确认。

您可以做的另一件事是使用更高的机器类型来更快地处理消息。

关于google-cloud-dataflow - 具体版本 : PubSub/Dataflow acknowledgement of unbounded data,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/51271594/

相关文章:

google-cloud-platform - 我们可以将 webhook 与 Google PubSub 连接吗?

java - 将 STRUCT 数组从 Dataflow 写入大查询

google-cloud-dataflow - 为什么在 Dataflow beta 中 #sideInput() 方法从 Context 移至 ProcessContext

google-cloud-dataflow - 如何为在 Direct Runner 上运行的 apache 光束管道设置日志级别

python - 为什么我需要洗牌我的 PCollection 才能在 Cloud Dataflow 上自动缩放?

java - Google pubsub 流量控制

python - 使用 google-api-python-client 拉超时

google-bigquery - 与Google Pub/Sub + Dataflow相比,直接流入BigQuery的利弊

google-cloud-dataflow - 数据流中的 Cloud Bigtable 多前缀扫描

python - 在美国位置未找到数据集