python - 大查询 : Create table if not exist and load data using Python and Apache AirFlow

标签 python mysql json google-bigquery airflow

首先,我使用 MySQL 查询从生产数据库获取所有数据,然后将该数据作为 NEW LINE DELIMITED JSON 存储在谷歌云存储中,我想要做的是:
1.检查表是否存在
2.如果表不存在,则使用自动检测模式创建表
3.存储数据

所有这些都将在 airflow 中安排。真正让我困惑的是数字 2,我如何在 Python 中做到这一点?或者 Airflow 可以自动执行此操作吗?

最佳答案

Airflow 可以自动执行此操作。 create_disposition 参数根据需要创建表。而 autodetect 参数正是您所需要的。这是针对 Airflow 1.10.2 的。

GCS_to_BQ = GoogleCloudStorageToBigQueryOperator(
    task_id='gcs_to_bq',
    bucket='test_bucket',
    source_objects=['folder1/*.csv', 'folder2/*.csv'],
    destination_project_dataset_table='dest_table',
    source_format='CSV',
    create_disposition='CREATE_IF_NEEDED',
    write_disposition='WRITE_TRUNCATE',
    bigquery_conn_id='bq-conn',
    google_cloud_storage_conn_id='gcp-conn',
    autodetect=True, # This uses autodetect
    dag=dag
)

关于python - 大查询 : Create table if not exist and load data using Python and Apache AirFlow,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/55349732/

相关文章:

php - 使用 PHP 和 MYSQL 对已知列进行搜索

json - 如何使用从 Swift 3 中的 slider 选择的 JSON 数据更新 tableviewcell?

javascript - 使用 $.each 和 .append 从 JSON 对象构造 HTML

python - 如何在 Django 中使用点作为千位分隔符

Python:使用鼠标在任何窗口中围绕对象绘制一个矩形?还将开始和结束坐标存储为相对于所述窗口的变量?

python - 计算numpy数组的边界框

python - Python 中的 HTTPS session 重用

php - mySQL - 根据一个字段对表中的列求和

mysql - 在rails中如何在表中插入数千条记录

json - 如何从 get 请求中获取 json 响应而不是 html 响应?