google-bigquery - 如何在 BigQuery 插入错误时崩溃/停止 DataFlow Pub/Sub 摄取

标签 google-bigquery google-cloud-dataflow google-cloud-pubsub

我正在寻找一种方法,使 Google DataFlow 作业在(特定)异常发生时停止从 Pub/Sub 摄取。

来自 Pub/Sub 的事件是通过 PubsubIO.Read.Bound<TableRow> 读取的 JSON使用 TableRowJsonCoder并直接流式传输到 BigQueryBigQueryIO.Write.Bound .
(中间有一个 ParDo,它改变了一个字段的内容和一些按天发生的自定义分区,但这应该与此目的无关。)

当从 PubSub 摄取的事件/行中的字段不是目标 BigQuery 表中的列时,DataFlow 作业在运行时记录 IOExceptions,声称它无法插入行,但似乎确认这些消息并继续运行。

我想要做的是停止从 Pub/Sub 摄取消息和/或使 Dataflow 作业崩溃,以便警报可以基于最旧的未确认消息的年龄。至少我想确保那些未能插入 BigQuery 的 Pub/Sub 消息没有被确认,以便我可以解决问题,重新启动 Dataflow 作业并再次使用这些消息。

我知道这里描述了一种处理错误输入的建议解决方案:https://cloud.google.com/blog/big-data/2016/01/handling-invalid-inputs-in-dataflow

我也知道 Apache Beam 上的这个 PR 允许插入没有违规字段的行:
https://github.com/apache/beam/pull/1778

但是,在我的情况下,我并不是真的想防止错误输入,而是防止程序员错误,即新字段被添加到推送到 Pub/Sub 的 JSON 消息中,但相应的 DataFlow 作业没有更新。所以我并没有真正有错误的数据,我只是想当程序员犯了一个错误,在更改消息格式之前没有部署新的 Dataflow 作业时崩溃。

我认为可以(类似于博客文章解决方案)创建一个自定义 ParDo验证每一行并抛出一个未被捕获并导致崩溃的异常。

但理想情况下,我只想拥有一些不处理插入错误并记录它的配置,而只是使作业崩溃或至少停止摄取。

最佳答案

您可以拥有一个带有 DoFn 的 ParDo,它位于 BQ 写入之前。 DoFn 将负责每 X 分钟获取一次输出表模式,并验证要写入的每条记录是否与预期的输出模式匹配(如果不匹配则抛出异常)。

Old Pipeline:
PubSub -> Some Transforms -> BQ Sink

New Pipeline:
PubSub -> Some Transforms -> ParDo(BQ Sink Validator) -> BQ Sink

这样做的好处是,一旦有人修复了输出表架构,管道就会恢复。你会想要抛出一个很好的错误消息,说明传入的 PubSub 消息有什么问题。

或者,您可以使用 BQ Sink Validator而是将消息输出到 PubSub DLQ(监控其大小)。在操作上,您必须更新表,然后将 DLQ 作为输入重新摄取。这样做的优点是只有坏消息会阻止管道执行。

关于google-bigquery - 如何在 BigQuery 插入错误时崩溃/停止 DataFlow Pub/Sub 摄取,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/44462475/

相关文章:

java - apache.beam.sdk.schemas.Schema.FieldType 中数值的等效数据类型是什么

java - Jetty ALPN/NPN 尚未正确配置

google-bigquery - 如何在 BigQuery 中将分区表复制到另一个分区表

google-analytics - 依靠一个自定义维度,然后使用BigQuery对其进行分组

google-cloud-dataflow - 防止 Google Dataflow 融合的最佳方法?

google-app-engine - 谷歌云数据流 ETL(数据存储 -> 转换 -> BigQuery)

java - 从 Java 使用 Google Cloud PubSub 模拟器

c# - 无法在 C# 中运行 Google Cloud PubSub,DLL 问题

json - BigQuery 在导入 JSON 时处理缺失字段和未知/额外字段

sql - BigQuery:使用标准SQL从Google Analytics(分析)数据中选择星期