google-bigquery - 带有架构更改的 Airflow Pipeline CSV 到 BigQuery

标签 google-bigquery airflow

背景

我需要设计一个 Airflow 管道来将 CSV 文件加载到 BigQuery 中。

我知道 CSV 的架构经常变化。加载第一个文件后,模式可能是

编号 | ps_1 | ps_1_value

当我加载第二个文件时,它可能看起来像

编号 | ps_1 | ps_1_值 | ps_1 | ps_2_value.

问题

处理此问题的最佳方法是什么?


我首先想到的是

  1. 加载第二个文件
  2. 将架构与当前表进行比较
  3. 更新表格,添加两列 (ps_2, ps_2_value)
  4. 插入新行

我会在 PythonOperator 中执行此操作。

如果文件 3 出现并且看起来像 id | ps_2 | ps_2_value 我会填写缺失的列并进行插入。

感谢您的反馈。

最佳答案

加载两个先前的文件 example_data_1.csvexample_data_2.csv 后,我可以看到字段被插入到正确的列中,并根据需要添加新列.

编辑:灯泡时刻意识到 schema_update_options 存在。看这里:https://googleapis.dev/python/bigquery/latest/generated/google.cloud.bigquery.job.SchemaUpdateOption.html

csv_to_bigquery = GoogleCloudStorageToBigQueryOperator(
    task_id='csv_to_bigquery',
    google_cloud_storage_conn_id='google_cloud_default',
    bucket=airflow_bucket,
    source_objects=['data/example_data_3.csv'],
    skip_leading_rows=1,
    bigquery_conn_id='google_cloud_default',    
    destination_project_dataset_table='{}.{}.{}'.format(project, schema, table),
    source_format='CSV',
    create_disposition='CREATE_IF_NEEDED',
    write_disposition='WRITE_APPEND',
    schema_update_options=['ALLOW_FIELD_RELAXATION', 'ALLOW_FIELD_ADDITION'],
    autodetect=True,
    dag=dag
)

enter image description here

关于google-bigquery - 带有架构更改的 Airflow Pipeline CSV 到 BigQuery,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/60749503/

相关文章:

apache-spark - 从 Airflow SparkSubmitOperator 提交 Spark 应用程序时如何设置 YARN 队列

kubernetes - Helm stable/airflow - 使用 Helm 图表失败的具有共享持久卷的 Airflow 部署的自定义值

sql - 查询结果中的 google-bigquery 格式日期为 mm/dd/yyyy

hadoop - BigQuery是否支持加载HDFS样式的分区数据?

google-cloud-platform - 多个项目的 BigQuery 服务帐号配置

python - 将参数从 BranchPythonOperator 传递到 PythonOperator

google-bigquery - 使用 BigQuery LegacySQL 处理数组

sql - 上个月的最后一天 - BigQuery

python - Airflow - ModuleNotFoundError : No module named 'kubernetes'

python - 在 Google Cloud Composer 中浏览 DAG 任务信息时,Airflow 抛出与时区相关的 TypeError