我正在尝试在 Python Operator 中使用 Airflow 宏,但我不断收到“airflow: error: unrecognized argument:”
因此,我正在导入一个具有 3 个位置参数的函数:(sys.argv,start_date,end_date),我希望使 start_date 和 end_date Airflow 中的执行日期。
函数参数看起来像这样
def main(argv,start_date,end_date):
这是我在 DAG 中的任务:
t1 = PythonOperator(
task_id='Pull_DCM_Report',
provide_context=True,
python_callable=main,
op_args=[sys.argv,'{{ ds }}','{{ ds }}'],
dag=dag)
最佳答案
由于您传递的日期需要由 Airflow 渲染,因此您需要使用 templates_dict
Python 运算符中的参数。该字段是 Airflow 唯一识别为包含模板的字段。
您可以创建一个自定义 Python 运算符,通过复制现有运算符并将相关字段添加到 template_fields
来将更多字段识别为模板。元组。
def main(**kwargs):
argv = kwargs.get('templates_dict').get('argv')
start_date = kwargs.get('templates_dict').get('start_date')
end_date = kwargs.get('templates_dict').get('end_date')
t1 = PythonOperator(task_id='Pull_DCM_Report',
provide_context=True,
python_callable=main,
templates_dict={'argv': sys.argv,
'start_date': '{{ yesterday_ds }}',
'end_date': '{{ ds }}'},
dag=dag)
关于Python 操作符中的 Airflow 宏,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/50708226/