背景
我需要设计一个 Airflow 管道来将 CSV 文件加载到 BigQuery 中。
我知道 CSV 的架构经常变化。加载第一个文件后,模式可能是
编号 | ps_1 | ps_1_value
当我加载第二个文件时,它可能看起来像
编号 | ps_1 | ps_1_值 | ps_1 | ps_2_value
.
问题
处理此问题的最佳方法是什么?
我首先想到的是
- 加载第二个文件
- 将架构与当前表进行比较
- 更新表格,添加两列 (ps_2, ps_2_value)
- 插入新行
我会在 PythonOperator 中执行此操作。
如果文件 3 出现并且看起来像 id | ps_2 | ps_2_value
我会填写缺失的列并进行插入。
感谢您的反馈。
最佳答案
加载两个先前的文件 example_data_1.csv
和 example_data_2.csv
后,我可以看到字段被插入到正确的列中,并根据需要添加新列.
编辑:灯泡时刻意识到 schema_update_options
存在。看这里:https://googleapis.dev/python/bigquery/latest/generated/google.cloud.bigquery.job.SchemaUpdateOption.html
csv_to_bigquery = GoogleCloudStorageToBigQueryOperator(
task_id='csv_to_bigquery',
google_cloud_storage_conn_id='google_cloud_default',
bucket=airflow_bucket,
source_objects=['data/example_data_3.csv'],
skip_leading_rows=1,
bigquery_conn_id='google_cloud_default',
destination_project_dataset_table='{}.{}.{}'.format(project, schema, table),
source_format='CSV',
create_disposition='CREATE_IF_NEEDED',
write_disposition='WRITE_APPEND',
schema_update_options=['ALLOW_FIELD_RELAXATION', 'ALLOW_FIELD_ADDITION'],
autodetect=True,
dag=dag
)
关于google-bigquery - 带有架构更改的 Airflow Pipeline CSV 到 BigQuery,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/60749503/