python - AirflowExternalTask​​Sensors - 创建 dags 依赖项

标签 python airflow airflow-scheduler

我想在 DAG A 和 DAG B 上创建 dag 依赖关系。DAG A 有两个任务: TASK1 和 TASK2 。 DAG B 有 3 个任务:TASK1、TASK2 和 TASK3。

我的要求是 DAG B 在 DAG A TASK1 之后启动。

两个 DAGS 均按小时运行,DAG A 每小时运行一次,EX: 10.00,DAG B 每小时运行一次,EX:10.30。

我正在使用 Airflow 和操作符 EXternalTask​​Sensors,但它不起作用。

external_dag_id='DAG A',
external_task_id='TASK1',
allowed_states=None,
execution_delta=None,
execution_date_fn=None,

最佳答案

如果你在那里检查execution_delta,你会发现没有,文档[ https://github.com/apache/incubator-airflow/blob/master/airflow/operators/sensors.py#L194]说:

:param execution_delta:time difference with the previous execution to look at, the default is the same execution_date as the current task. For yesterday, use [positive!] datetime.timedelta(days=1). Either execution_delta or execution_date_fn can be passed to ExternalTaskSensor, but not both.

简短的回答是,由于您在不同时间运行 DAG A 和 DAG B,因此您需要放置execution_delta,否则,它假设您的其他 DAG 同时运行,在这种情况下,找不到 DAG 运行,您可能会得到意想不到的东西。所以尝试类似 datetime.timedelta(分钟=30)

关于python - AirflowExternalTask​​Sensors - 创建 dags 依赖项,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/46279361/

相关文章:

python - GAE中的后台进程

python - 如何使用 Python 将嵌套函数调用重构为代码行?

timeout - Airflow 列表 dag 在 30 秒后恰好超时

kubernetes - Airflow 没有名为 'kubernetes' 的模块

python - 将 Airflow 的 PostgresOperator 与 Jinja 模板和 SQL 一起使用时出现 TemplateNotFound

Airflow :schedule_interval = '@once'

Airflow 1.9.0 正在排队但任务未运行

python - 如何用 Python 编写自己的 OrderedDict 类?

python - Selenium 上的图像/文件上传

hadoop - Apache Airflow 分布式处理