首先,我使用 MySQL 查询从生产数据库获取所有数据,然后将该数据作为 NEW LINE DELIMITED JSON
存储在谷歌云存储中,我想要做的是:
1.检查表是否存在
2.如果表不存在,则使用自动检测模式创建表
3.存储数据
所有这些都将在 airflow 中安排。真正让我困惑的是数字 2
,我如何在 Python 中做到这一点?或者 Airflow 可以自动执行此操作吗?
最佳答案
Airflow 可以自动执行此操作。 create_disposition
参数根据需要创建表。而 autodetect
参数正是您所需要的。这是针对 Airflow 1.10.2 的。
GCS_to_BQ = GoogleCloudStorageToBigQueryOperator(
task_id='gcs_to_bq',
bucket='test_bucket',
source_objects=['folder1/*.csv', 'folder2/*.csv'],
destination_project_dataset_table='dest_table',
source_format='CSV',
create_disposition='CREATE_IF_NEEDED',
write_disposition='WRITE_TRUNCATE',
bigquery_conn_id='bq-conn',
google_cloud_storage_conn_id='gcp-conn',
autodetect=True, # This uses autodetect
dag=dag
)
关于python - 大查询 : Create table if not exist and load data using Python and Apache AirFlow,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/55349732/