google-bigquery - Apache Beam/Google Dataflow PubSub 到 BigQuery 管道 : Handling Insert Errors and Unexpected Retry Behavior

标签 google-bigquery google-cloud-dataflow apache-beam google-cloud-pubsub

我已经下载了 Pub/Sub to BigQuery Dataflow template 的副本来自Google's github repository 。我正在本地计算机上使用 direct-runner 运行它.

在测试中,我确认,如果在 UDF 处理或从 JSON 到 TableRow 的转换期间发生错误,模板只会将失败写入“死信”表。

我还希望更优雅地处理插入 BigQuery 时发生的故障,方法是将它们发送到单独的 TupleTag 中,以便它们也可以发送到死信表或其他输出以进行审查和处理。目前,当使用 dataflow-runner 执行时这些错误只会写入 Stackdriver 日志,并继续无限期地重试,直到问题得到解决。

问题一:在本地测试并发布格式与目标表架构不匹配的消息时,会重试插入 5 次,然后管道崩溃,并出现 RuntimeException 以及从对 Google API 的 HTTP 响应。我相信这种行为是在BigQueryServices.Impl内设置的这里:

private static final FluentBackoff INSERT_BACKOFF_FACTORY =
        FluentBackoff.DEFAULT.withInitialBackoff(Duration.millis(200)).withMaxRetries(5);

但是,基于Google's documentation ,

"When running in streaming mode, a bundle including a failing item will be retried indefinitely, which may cause your pipeline to permanently stall."

作为 Beam 的 Pub/Sub.IO ,

create and consume unbounded PCollections

我的印象是,从 Pub/Sub 读取时应默认启用流模式。我什至在 writeTableRows() 调用中添加了 Streaming_Inserts 方法,但它并没有影响此行为。

.apply(
            "WriteSuccessfulRecords",      
            BigQueryIO.writeTableRows()
                .withMethod(Method.STREAMING_INSERTS)
  1. 这种行为是否受到我所在运行者的影响 使用?如果不是,我的理解哪里有缺陷?

问题二:

  • 使用 BigQueryIO.write 时性能是否有差异与 BigQueryIO.writeTableRows
  • 我问这个问题是因为我不知道如何在不创建自己的静态类(覆盖扩展方法并使用 ParDo 和 DoFn )的情况下捕获与插入相关的错误,我可以在其中添加自己的自定义逻辑来为成功记录创建单独的 TupleTags和故障记录,类似于 JavascriptTextTransformer 中的操作方式对于 FailsafeJavascriptUdf。

    更新:

    public static PipelineResult run(DirectOptions options) {
    
    options.setRunner(DirectRunner.class);
    
        Pipeline pipeline = Pipeline.create(options);
    
        // Register the coder for pipeline
        FailsafeElementCoder<PubsubMessage, String> coder =
            FailsafeElementCoder.of(PubsubMessageWithAttributesCoder.of(), StringUtf8Coder.of());
    
        CoderRegistry coderRegistry = pipeline.getCoderRegistry();
        coderRegistry.registerCoderForType(coder.getEncodedTypeDescriptor(), coder);
    
         PCollectionTuple transformOut =
            pipeline
                 //Step #1: Read messages in from Pub/Sub
                .apply(
                    "ReadPubsubMessages",
      PubsubIO.readMessagesWithAttributes().fromTopic(options.getInputTopic()))
    
                 //Step #2: Transform the PubsubMessages into TableRows
                .apply("ConvertMessageToTableRow", new PubsubMessageToTableRow(options));
    
        WriteResult writeResult = null;
    
        try {
          writeResult = 
                transformOut
            .get(TRANSFORM_OUT)
            .apply(
                "WriteSuccessfulRecords",      
                BigQueryIO.writeTableRows()
                    .withMethod(Method.STREAMING_INSERTS)
                    .withoutValidation()
                    .withCreateDisposition(CreateDisposition.CREATE_NEVER)
                    .withWriteDisposition(WriteDisposition.WRITE_APPEND)
                    .withFailedInsertRetryPolicy(InsertRetryPolicy.retryTransientErrors())
                    .to("myproject:MyDataSet.MyTable"));
        } catch (Exception e) {
            System.out.print("Cause of the Standard Insert Failure is: ");
            System.out.print(e.getCause());
        }
    
        try {
            writeResult
                .getFailedInserts()
                .apply(
                        "WriteFailedInsertsToDeadLetter",
                        BigQueryIO.writeTableRows()
                            .to(options.getOutputDeadletterTable())
                            .withCreateDisposition(CreateDisposition.CREATE_IF_NEEDED)
                            .withWriteDisposition(WriteDisposition.WRITE_APPEND));
        } catch (Exception e) {
            System.out.print("Cause of the Error Insert Failure is: ");
            System.out.print(e.getCause());
        }
    
         PCollectionList.of(transformOut.get(UDF_DEADLETTER_OUT))
            .and(transformOut.get(TRANSFORM_DEADLETTER_OUT))
            .apply("Flatten", Flatten.pCollections())
            .apply(
                "WriteFailedRecords",
                WritePubsubMessageErrors.newBuilder()
                    .setErrorRecordsTable(
                        maybeUseDefaultDeadletterTable(
                            options.getOutputDeadletterTable(),
                            options.getOutputTableSpec(),
                            DEFAULT_DEADLETTER_TABLE_SUFFIX))
                    .setErrorRecordsTableSchema(getDeadletterTableSchemaJson())
                    .build());
    
        return pipeline.run();
      }
    

    错误:

    Cause of the Error Insert Failure is: null[WARNING] 
    java.lang.NullPointerException: Outputs for non-root node WriteFailedInsertsToDeadLetter are null
        at org.apache.beam.repackaged.beam_sdks_java_core.com.google.common.base.Preconditions.checkNotNull(Preconditions.java:864)
        at org.apache.beam.sdk.runners.TransformHierarchy$Node.visit(TransformHierarchy.java:672)
        at org.apache.beam.sdk.runners.TransformHierarchy$Node.visit(TransformHierarchy.java:660)
        at org.apache.beam.sdk.runners.TransformHierarchy$Node.access$600(TransformHierarchy.java:311)
        at org.apache.beam.sdk.runners.TransformHierarchy.visit(TransformHierarchy.java:245)
        at org.apache.beam.sdk.Pipeline.traverseTopologically(Pipeline.java:458)
        at org.apache.beam.sdk.Pipeline.validate(Pipeline.java:575)
        at org.apache.beam.sdk.Pipeline.run(Pipeline.java:310)
        at org.apache.beam.sdk.Pipeline.run(Pipeline.java:297)
        at com.google.cloud.teleport.templates.PubSubToBigQuery.run(PubSubToBigQuery.java:312)
        at com.google.cloud.teleport.templates.PubSubToBigQuery.main(PubSubToBigQuery.java:186)
        at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
        at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
        at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
        at java.lang.reflect.Method.invoke(Method.java:498)
        at org.codehaus.mojo.exec.ExecJavaMojo$1.run(ExecJavaMojo.java:282)
        at java.lang.Thread.run(Thread.java:748)
    

    最佳答案

    在最新版本的 Beam 中,BigQueryIO.Write转换返回 WriteResult对象,使您能够检索未能输出到 BigQuery 的 TableRows 的 PCollection。使用此功能,您可以轻松检索失败,将其格式化为死信输出的结构,然后将记录重新提交到 BigQuery。这样就不需要单独的类来管理成功和失败的记录。

    下面是您的管道的示例。

    // Attempt to write the table rows to the output table.
    WriteResult writeResult =
        pipeline.apply(
            "WriteRecordsToBigQuery",
            BigQueryIO.writeTableRows()
                .to(options.getOutputTable())
                .withCreateDisposition(CreateDisposition.CREATE_NEVER)
                .withWriteDisposition(WriteDisposition.WRITE_APPEND)
                .withFailedInsertRetryPolicy(InsertRetryPolicy.retryTransientErrors()));
    
    /*
     * 1) Get the failed inserts
     * 2) Transform to the deadletter table format.
     * 3) Output to the deadletter table.
    */
    writeResult
      .getFailedInserts()
        .apply("FormatFailedInserts", ParDo.of(new FailedInsertFormatter()))
        .apply(
            "WriteFailedInsertsToDeadletter",
            BigQueryIO.writeTableRows()
                .to(options.getDeadletterTable())
                .withCreateDisposition(CreateDisposition.CREATE_NEVER)
                .withWriteDisposition(WriteDisposition.WRITE_APPEND));
    

    此外,回答您的问题:

    1. 根据光束docs ,您必须设置 对于 DirectRunner,将选项设置为 true
    2. 不应该有 性能差异。无论哪种情况,您都需要将 输入记录到TableRow对象。应该没有什么区别 如果您事先在 ParDo 中或在可序列化的内部执行此操作 使用 BigQueryIO.Write.withFormatFunction 的函数.

    关于google-bigquery - Apache Beam/Google Dataflow PubSub 到 BigQuery 管道 : Handling Insert Errors and Unexpected Retry Behavior,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/52044349/

    相关文章:

    google-apps-script - App Script BigQuery 标准 SQL 插入或更新语句

    sql - 在 Google BigQuery 中的最近日期左加入

    java - 如何通过java nio writer追加文件?

    java - 将 PCollection<TableRow> 导出为具有未知标题的 CSV/具有未知架构的表

    python - 如何在 Apache Beam Google DataFlow runner 中使用 matplotlib 模块

    google-cloud-dataflow - 数据流映射侧输入问题

    google-sheets - 在运行 BigQuery 作业之前确定资源使用情况

    google-bigquery - 大查询 : "Invalid function name"

    python - 定义 Apache Beam 管道的正确方法

    google-cloud-platform - Airflow 安装故障梁[gcp]