google-cloud-platform - BigQueryIO - 不能将 DynamicDestination 与 CREATE_IF_NEEDED 一起用于无界 PCollection 和 FILE_LOADS

标签 google-cloud-platform google-bigquery google-cloud-dataflow apache-beam

我的工作流程:KAFKA -> 数据流流 -> BigQuery

鉴于低延迟对我来说并不重要,我使用 FILE_LOADS 来降低成本。我将 BigQueryIO.Write 与 DynamicDestination 一起使用(每小时一个新表,以当前小时为后缀)。

这个 BigQueryIO.Write 配置如下:

.withCreateDisposition(CreateDisposition.CREATE_IF_NEEDED)
.withMethod(Method.FILE_LOADS)
.withTriggeringFrequency(triggeringFrequency)
.withNumFileShards(100)

第一个表已成功创建并写入。但随后从未创建以下表格,我得到这些异常(exception):
(99e5cd8c66414e7a): java.lang.RuntimeException: Failed to create load job with id prefix 5047f71312a94bf3a42ee5d67feede75_5295fbf25e1a7534f85e25dcaa9f4986_00001_00023, reached max retries: 3, last failed load job: {
  "configuration" : {
    "load" : {
      "createDisposition" : "CREATE_NEVER",
      "destinationTable" : {
        "datasetId" : "dev_mydataset",
        "projectId" : "myproject-id",
        "tableId" : "mytable_20180302_16"
      },

对于第一个表,所使用的 CreateDisposition 是指定的 CREATE_IF_NEEDED,但随后不考虑此参数,默认情况下使用 CREATE_NEVER。

我还创建了 issue在 JIRA 上。

最佳答案

根据documentation of Apache Beam's BigQueryIO ,方法BigQueryIO.Write.CreateDisposition要求使用 the precondition .withSchema() 提供表模式当使用 CREATE_IF_NEEDED 时。

正如 Dataflow documentation 中所述:

Note that if you specify CREATE_IF_NEEDED as the CreateDisposition and you don't supply a TableSchema, the transform may fail at runtime with a java.lang.IllegalArgumentException if the target table does not exist.



文档说明的错误与您收到的错误不同(您得到 java.lang.RuntimeException ),但根据 BigQueryIO.Write()您共享的配置,您没有指定任何表架构,因此,如果缺少表,作业很容易失败。

因此,作为解决问题的第一个措施,您应该创建表架构 TableSchema()匹配您将加载到 BQ 中的数据,然后使用前置条件 .withSchema(schema)因此:
List<TableFieldSchema> fields = new ArrayList<>();
// Add fields like:
fields.add(new TableFieldSchema().setName("<FIELD_NAME>").setType("<FIELD_TYPE>"));
TableSchema schema = new TableSchema().setFields(fields);

// BigQueryIO.Write configuration plus:
    .withSchema(schema)

关于google-cloud-platform - BigQueryIO - 不能将 DynamicDestination 与 CREATE_IF_NEEDED 一起用于无界 PCollection 和 FILE_LOADS,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/49241650/

相关文章:

google-cloud-platform - 无法在 GCP 上启用 API(用户角色=所有者 && 结算帐号已关联)

google-cloud-platform - 为 Pubsub 到 Bigquery 流构建此 GCP 数据流示例时出错

mysql - 将 mysql 与 google 的 bigquery 链接

java - 尝试通过 Dataflow 访问 Google Cloud Datastore 时出现 403 错误

google-bigquery - 从 Dataflow 到 BigQuerySource 的简单查询会引发异常

nlp - FastText 无法打开以从存储桶加载

node.js - 使用 Google 的 Text to Speech API 同时执行多个请求时仅获取最后一个请求的音频

ruby-on-rails - 访问部署在 Google Cloud Run 中的应用程序的 Rails 控制台

kubernetes - 如何解决 Google Cloud 上的 "All hosts are taken by other resources"?

google-bigquery - BigQuery - 计算滑动时间范围内的事件数