java - 是否可以在用Python编写的Dataflow流管道中导入Java方法 `wrapBigQueryInsertError`?

标签 java python google-bigquery google-cloud-dataflow

我正在尝试使用 Python3 创建一个 Dataflow 流式传输管道,该管道从 Pub/Sub 主题读取消息,最终“从头开始”将它们写入 BigQuery 表上。我在名为 PubSubToBigQuery.java 的 Dataflow Java 模板中看到过(执行我正在寻找的内容)第三步中的一段代码,用于处理那些转换为表行的 Pub/Sub 消息,当您尝试将它们插入 BigQuery 表时,这些消息会失败。最后,在步骤 4 和 5 的代码片段中,这些代码片段被展平并插入到错误表中:

  • 第 3 步:
PCollection<FailsafeElement<String, String>> failedInserts =
        writeResult
            .getFailedInsertsWithErr()
            .apply(
                "WrapInsertionErrors",
                MapElements.into(FAILSAFE_ELEMENT_CODER.getEncodedTypeDescriptor())
                    .via((BigQueryInsertError e) -> wrapBigQueryInsertError(e)))
            .setCoder(FAILSAFE_ELEMENT_CODER);
  • 第 4 步和第 5 步
    PCollectionList.of(
            ImmutableList.of(
                convertedTableRows.get(UDF_DEADLETTER_OUT),
                convertedTableRows.get(TRANSFORM_DEADLETTER_OUT)))
        .apply("Flatten", Flatten.pCollections())
        .apply(
            "WriteFailedRecords",
            ErrorConverters.WritePubsubMessageErrors.newBuilder()
                .setErrorRecordsTable(
                    ValueProviderUtils.maybeUseDefaultDeadletterTable(
                        options.getOutputDeadletterTable(),
                        options.getOutputTableSpec(),
                        DEFAULT_DEADLETTER_TABLE_SUFFIX))
                .setErrorRecordsTableSchema(ResourceUtils.getDeadletterTableSchemaJson())
                .build());


    failedInserts.apply(
        "WriteFailedRecords",
        ErrorConverters.WriteStringMessageErrors.newBuilder()
            .setErrorRecordsTable(
                ValueProviderUtils.maybeUseDefaultDeadletterTable(
                    options.getOutputDeadletterTable(),
                    options.getOutputTableSpec(),
                    DEFAULT_DEADLETTER_TABLE_SUFFIX))
            .setErrorRecordsTableSchema(ResourceUtils.getDeadletterTableSchemaJson())
            .build());

为了做到这一点,我怀疑实现这一点的关键在于模板中第一个导入的库:

package com.google.cloud.teleport.templates;
import static com.google.cloud.teleport.templates.TextToBigQueryStreaming.wrapBigQueryInsertError;

这个方法在Python中可用吗?

如果没有,有某种方法可以在 Python 中执行相同的操作,即不检查应插入的记录字段的结构和数据类型是否与 BigQuery 表所期望的相对应?

这种解决方法会严重减慢我的流传输管道的速度。

最佳答案

在 Beam Python 中,执行流式 BigQuery 写入时,转换会返回 BigQuery 写入期间失败的行。请参阅https://github.com/apache/beam/blob/master/sdks/python/apache_beam/io/gcp/bigquery.py#L1248

所以你可以用与Java模板相同的方式处理这些。

关于java - 是否可以在用Python编写的Dataflow流管道中导入Java方法 `wrapBigQueryInsertError`?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/58995447/

相关文章:

java - Mockito verify + any 行为不可预测

java - Android 应用程序 ListView 启动太慢

python - 在请求上下文之外设置 Jinja 环境全局变量

python - 如何将 Python 3.6 设置为 Zappa 的默认版本?

Python + Flask + Mysql 错误 2006(服务器已经消失)

sql - 是否可以在表之间创建关系?

java - 仅当方法位于同一类中时才存储值

使用 Executors.newCachedThreadPool 时 Java 驻留内存不断增长

google-bigquery - 如何设置 Big Query 表的过期时间?

sql - 尝试根据整数列表过滤行时,BigQuery 没有匹配的签名