google-bigquery - 读取 Avro 文件并将其写入 BigQuery 表

标签 google-bigquery google-cloud-storage google-cloud-dataflow apache-beam

我的目标是从云存储中读取 avro 文件数据并使用 Java 将其写入 BigQuery 表。如果有人提供代码片段/想法来读取 avro 格式数据并使用 Cloud Dataflow 将其写入 BigQuery 表,那就太好了。

最佳答案

我看到两种可能的方法:

  • 使用数据流:

  •     PipelineOptions options = PipelineOptionsFactory.fromArgs(args).withValidation().create();
        Pipeline p = Pipeline.create(options);
    
        // Read an AVRO file.
        // Alternatively, read the schema from a file.
        // https://beam.apache.org/releases/javadoc/2.11.0/index.html?org/apache/beam/sdk/io/AvroIO.html
        Schema avroSchema = new Schema.Parser().parse(
            "{\"type\": \"record\", "
                + "\"name\": \"quote\", "
                + "\"fields\": ["
                + "{\"name\": \"source\", \"type\": \"string\"},"
                + "{\"name\": \"quote\", \"type\": \"string\"}"
                + "]}");
        PCollection<GenericRecord> avroRecords = p.apply(
            AvroIO.readGenericRecords(avroSchema).from("gs://bucket/quotes.avro"));
    
        // Convert Avro GenericRecords to BigQuery TableRows.
        // It's probably better to use Avro-generated classes instead of manually casting types.
        // https://beam.apache.org/documentation/io/built-in/google-bigquery/#writing-to-bigquery
        PCollection<TableRow> bigQueryRows = avroRecords.apply(
            MapElements.into(TypeDescriptor.of(TableRow.class))
                .via(
                    (GenericRecord elem) ->
                        new TableRow()
                            .set("source", ((Utf8) elem.get("source")).toString())
                            .set("quote", ((Utf8) elem.get("quote")).toString())));
    
        // https://cloud.google.com/bigquery/docs/schemas
        TableSchema bigQuerySchema =
            new TableSchema()
                .setFields(
                    ImmutableList.of(
                        new TableFieldSchema()
                            .setName("source")
                            .setType("STRING"),
                        new TableFieldSchema()
                            .setName("quote")
                            .setType("STRING")));
    
        bigQueryRows.apply(BigQueryIO.writeTableRows()
            .to(new TableReference()
                .setProjectId("project_id")
                .setDatasetId("dataset_id")
                .setTableId("avro_source"))
            .withSchema(bigQuerySchema)
            .withCreateDisposition(CreateDisposition.CREATE_IF_NEEDED)
            .withWriteDisposition(WriteDisposition.WRITE_TRUNCATE));
    
        p.run().waitUntilFinish();
    
  • 无需 Dataflow 直接将数据导入 BigQuery。请参阅此文档:https://cloud.google.com/bigquery/docs/loading-data-cloud-storage-avro
  • 关于google-bigquery - 读取 Avro 文件并将其写入 BigQuery 表,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/54529039/

    相关文章:

    pyspark - 为什么需要临时 GCS 存储桶将数据帧写入 BigQuery : pyspark

    google-bigquery - 在 BigQuery 标准 SQL 中限定等效子句

    logging - 尝试导出日志时出现"No Log sinks are configured"

    python - Google Cloud Dataflow Python,检索作业 ID

    go - Go 中的 Apache Beam 左连接

    google-cloud-dataflow - '_UnwindowedValues' 类型的对象没有 len() 是什么意思?

    google-analytics - 将旧版BigQuery转换为Standard并得到错误“表名” s3”无法解决:数据集名称丢失。”

    python - 使用 Google Bigquery 和 Python 进行批处理

    google-cloud-storage - Google Dataflow Pipeline 创建失败,出现 400 : Bad Request/invalid grant

    php - 递归删除Google Cloud Storage中的文件夹