我想在 DAG A 和 DAG B 上创建 dag 依赖关系。DAG A 有两个任务: TASK1 和 TASK2 。 DAG B 有 3 个任务:TASK1、TASK2 和 TASK3。
我的要求是 DAG B 在 DAG A TASK1 之后启动。
两个 DAGS 均按小时运行,DAG A 每小时运行一次,EX: 10.00,DAG B 每小时运行一次,EX:10.30。
我正在使用 Airflow 和操作符 EXternalTaskSensors,但它不起作用。
external_dag_id='DAG A',
external_task_id='TASK1',
allowed_states=None,
execution_delta=None,
execution_date_fn=None,
最佳答案
如果你在那里检查execution_delta
,你会发现没有,文档[ https://github.com/apache/incubator-airflow/blob/master/airflow/operators/sensors.py#L194]说:
:param execution_delta:time difference with the previous execution to look at, the default is the same execution_date as the current task. For yesterday, use [positive!] datetime.timedelta(days=1). Either execution_delta or execution_date_fn can be passed to ExternalTaskSensor, but not both.
简短的回答是,由于您在不同时间运行 DAG A 和 DAG B,因此您需要放置execution_delta,否则,它假设您的其他 DAG 同时运行,在这种情况下,找不到 DAG 运行,您可能会得到意想不到的东西。所以尝试类似 datetime.timedelta(分钟=30)
关于python - AirflowExternalTaskSensors - 创建 dags 依赖项,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/46279361/