我正在寻找类似的东西
CreateBQTableOperator(
query='select * from my_table',
output_table='my_other_table'
)
我正在寻找现有的运算符或此类运算符的代码。运算符(operator)应该使用另一个参数来决定是否在重新创建表或将查询附加到当前表之前删除该表(如果该表存在)。
最佳答案
对于 Airflow >= 1.10 以及提供程序,您可以使用 BigQueryInsertJobOperator
此运算符正在使用 JobConfigurationQuery
您可以使用 configuration
参数配置 API 支持的任何选项:
from airflow.providers.google.cloud.operators.bigquery import BigQueryInsertJobOperator
execute_query_save = BigQueryInsertJobOperator(
task_id="execute_query_save",
configuration={
"query": {
"query": "select * from my_table",
"useLegacySql": False,
"writeDisposition": "WRITE_EMPTY",
'destinationTable': {
'projectId': "my-project",
'datasetId': "my_data_set",
'tableId': "table2"
},
}
},
)
对于较旧的 Airflow 版本,您可以使用 BigQueryExecuteQueryOperator
:
运营商有destination_dataset_table
:
from airflow.providers.google.cloud.operators.bigquery import BigQueryExecuteQueryOperator
execute_query_save = BigQueryExecuteQueryOperator(
task_id="execute_query_save",
sql="SELECT * FROM my_data_set.table1",
use_legacy_sql=False,
destination_dataset_table="my_data_set.table2",
location="southamerica-east1",
write_disposition="WRITE_EMPTY",
create_disposition="CREATE_IF_NEEDED",
)
您可以通过设置参数值来控制请求的行为(引用 Google docs 中的值)。
write_disposition
选项为:
WRITE_TRUNCATE:如果表已存在,BigQuery 将覆盖表数据并使用查询结果中的架构。
WRITE_APPEND:如果表已存在,BigQuery 会将数据附加到表中。
WRITE_EMPTY:如果表已存在并包含数据,则作业结果中将返回“重复”错误。
create_disposition
选项是:
CREATE_IF_NEEDED:如果该表不存在,BigQuery 将创建该表。
CREATE_NEVER:该表必须已经存在。如果没有,作业结果中将返回“notFound”错误。
关于python - Airflow 中是否有运算符可以根据 BigQuery 中的查询创建表?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/68575367/