我找到了以下链接:
https://www.linkedin.com/pulse/airflow-lesson-1-triggerdagrunoperator-siddharth-anand
这确实解释了如何使用 TriggerDagRunOperator
来执行单独的 Airflow dag。该文档使用 Airflow 自己的示例 dag,但我很难理解它们,因为它们没有使用任何传感器。
有人可以解释一下如何使用 TriggerDagRunOperator
和 SqlSensor
启动单独的 dag 吗?当我的 SQL Server 作业任务完成时,我正在尝试启动单独的 DAG。我知道如何使用 SqlSensor
检查 SQL Server 作业的状态,但我不知道如何将结果附加到 TriggerDagRunOperator
以启动单独的 DAG。
我不想使用 Airflow CLI 或在一个 DAG 中执行这两项任务。基本上,我希望它只是触发 dag。
下面是我当前的代码,缺少关键的 conditionally_trigger
# File Name: check-when-db1-sql-task-is-done
from airflow import DAG
from airflow.operators import TriggerDagRunOperator
from airflow.operators import SqlSensor
from datetime import datetime
default_args = {
'owner': 'airflow',
'retry_delay': timedelta(minutes=5),
}
dag = DAG('check-when-db1-sql-task-is-done',
description='Check-when-DB1-SQL-task-is-done',
default_args=default_args,
schedule_interval='@once',
start_date=datetime.now(),
)
# returns-0-or-1-based-on-job-task-status
sqlsensor = SqlSensor (
task_id='sql-sensor',
poke_interval=30,
timeout=3200,
sql="""select last_run_outcome from msdb.dbo.sysjobsteps where job_id = '249A5A5D-6AFC-4D6B-8CB1-27C16724A450' and step_id = '1' and last_run_date = (select convert(varchar(24),getdate(),112)); """,
mssql_conn_id='db1',
dag=dag,
)
# dag-to-start
trigger = TriggerDagRunOperator (
task_id='start-ssh-job',
trigger_dag_id="qa-knime-ssh-task",
python_callable=conditionally_trigger,
params={'condition_param': True,
'message': 'Hello World'},
dag=dag)
最佳答案
我的理解是,TriggerDagRunOperator
用于当你想使用 python 函数来确定是否触发 SubDag 时。该函数在您的代码和示例中称为 conditionally_trigger
。
在您的情况下,您使用传感器来控制流量并且不需要传递函数。您可以使用 SubDagOperator
而不是 TriggerDagRunOperator
或传递一个简单的始终为真的函数作为 python_callable
:
...
python_callable=lambda(context, dag_run_obj):dag_run_obj,
...
关于python - 如何使用 TriggerDagRunOperator 触发 Airflow -dag,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/45568439/