Python 操作符中的 Airflow 宏

标签 airflow

我正在尝试在 Python Operator 中使用 Airflow 宏,但我不断收到“airflow: error: unrecognized argument:”

因此,我正在导入一个具有 3 个位置参数的函数:(sys.argv,start_date,end_date),我希望使 start_date end_date Airflow 中的执行日期。

函数参数看起来像这样

def main(argv,start_date,end_date):

这是我在 DAG 中的任务:

t1 = PythonOperator(
    task_id='Pull_DCM_Report',
    provide_context=True,
    python_callable=main,
    op_args=[sys.argv,'{{ ds }}','{{ ds }}'],
    dag=dag)

最佳答案

由于您传递的日期需要由 Airflow 渲染,因此您需要使用 templates_dict Python 运算符中的参数。该字段是 Airflow 唯一识别为包含模板的字段。

您可以创建一个自定义 Python 运算符,通过复制现有运算符并将相关字段添加到 template_fields 来将更多字段识别为模板。元组。

def main(**kwargs):
    argv = kwargs.get('templates_dict').get('argv')
    start_date = kwargs.get('templates_dict').get('start_date')
    end_date = kwargs.get('templates_dict').get('end_date')


t1 = PythonOperator(task_id='Pull_DCM_Report',
                    provide_context=True,
                    python_callable=main,
                    templates_dict={'argv': sys.argv,
                                    'start_date': '{{ yesterday_ds }}',
                                    'end_date': '{{ ds }}'},
                    dag=dag)

关于Python 操作符中的 Airflow 宏,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/50708226/

相关文章:

airflow-scheduler - 如何使用 nohup 检查 SSHOperator 以查看工作是否完成以完成 dag?

airflow - 为什么任务会在 Airflow 中卡住而无法执行?

python - 任务组中的 Airflow 2 Xcom

python - 配置 SnakeBite HDFS 客户端以使用高可用性模式

python - 导入错误 : import apache_beam as beam. 找不到模块

postgresql - DOCKER - Airflow 当我进行 docker compose 时,如何在 Airflow DB 中初始化我的 postgres 脚本

google-cloud-platform - 您可以在 Google Cloud Composer 中访问 Airflow CLI 吗?

airflow - 如何从 Airflow 打包的 DAG 中读取配置文件?

python - 使用 Airflow MySqlOperator 时插入查询中的语法错误

postgresql - Airflow 将 postgres 数据库的所有表导出到 BigQuery