airflow - 如何将 BigQueryOperator 与 execution_date 一起使用?

标签 airflow

这是我的代码:

EXEC_TIMESTAMP  = "{{  execution_date.strftime('%Y-%m-%d %H:%M')  }}"
query = """
        select ... where date_purchased between TIMESTAMP_TRUNC(cast ( {{ params.run_timestamp }} as TIMESTAMP), HOUR, 'UTC') ...
        """
generate_op = BigQueryOperator(
                    bql=query,
                    destination_dataset_table=table_name,
                    task_id='generate',
                    bigquery_conn_id=CONNECTION_ID,
                    use_legacy_sql=False,
                    write_disposition='WRITE_TRUNCATE',
                    create_disposition='CREATE_IF_NEEDED',
                    query_params={'run_timestamp': EXEC_TIMESTAMP},
                    dag=dag)

这应该有效,但实际上无效。 渲染选项卡显示:

between TIMESTAMP_TRUNC(cast (  as TIMESTAMP), HOUR, 'UTC')

缺少日期。它正在变得虚无。

我该如何解决这个问题?此运算符没有 provide_context=True。我不知道该怎么办。

最佳答案

Luis,query_params 不是您可以在模板上下文中引用的params。它们没有添加到其中。由于 params 为空,因此您的 {{ params.run_timestamp }}""None。如果您将其更改为 params={'run_timestamp':…} 它仍然会有问题,因为 params 值未模板化。因此,当您使用模板化字段 bql 来包含 {{ params.run_timestamp }} 时,您将准确了解 params: {'run_timestamp': …str ... } 在没有对该值进行任何递归扩展的情况下填写。你应该得到 {{ execution_date.strftime('%Y-%m-%d %H:%M') }}

让我尝试为您重写这个(但我不确定周围的括号是否正确转换):

generate_op = BigQueryOperator(
                    sql="""
select ...
where date_purchased between
  TIMESTAMP_TRUNC(cast('{{execution_date.strftime('%Y-%m-%d %H:%M')}}') as TIMESTAMP), HOUR, 'UTC')
...
                    """,
                    destination_dataset_table=table_name,
                    task_id='generate',
                    bigquery_conn_id=CONNECTION_ID,
                    use_legacy_sql=False,
                    write_disposition='WRITE_TRUNCATE',
                    create_disposition='CREATE_IF_NEEDED',
                    dag=dag,
)

您可以 see the bql and sql fields are templated .然而 bql field is deprecated and removed在后面的代码中。

关于airflow - 如何将 BigQueryOperator 与 execution_date 一起使用?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/53828163/

相关文章:

apache-spark - 通过 Airflow 调度在 Kubernetes 上运行的 Spark 作业

python - 如何在 Airflow 中通过 SSH 并运行 PythonOperator

python - Airflow scheduler 是否有可能在开始下一天之前先完成前一天的周期?

hadoop - 如何使用 Airflow 运行 HDFS Copy 命令?

即使端点进程完成后,Airflow 任务仍在运行

airflow - 如何触发 Airflow 任务运行的电子邮件

python - BranchPythonOperator 后的 Airflow 任务不会失败并正确成功

airflow - 如何定义不应定期运行的 Airflow DAG/任务

python - 如何隐藏/屏蔽 Airflow 连接和可变截面的敏感数据?

airflow - 如何使用 Apache Airflow 的 DataprocCreateClusterOperator 在 Dataproc(GCP) 上启用 Spark Web 界面