python - 如何使用 TriggerDagRunOperator 触发 Airflow -dag

标签 python sql triggers directed-acyclic-graphs apache-airflow

我找到了以下链接:

https://www.linkedin.com/pulse/airflow-lesson-1-triggerdagrunoperator-siddharth-anand

这确实解释了如何使用 TriggerDagRunOperator 来执行单独的 Airflow dag。该文档使用 Airflow 自己的示例 dag,但我很难理解它们,因为它们没有使用任何传感器。

有人可以解释一下如何使用 TriggerDagRunOperatorSqlSensor 启动单独的 dag 吗?当我的 SQL Server 作业任务完成时,我正在尝试启动单独的 DAG。我知道如何使用 SqlSensor 检查 SQL Server 作业的状态,但我不知道如何将结果附加到 TriggerDagRunOperator 以启动单独的 DAG。

我不想使用 Airflow CLI 或在一个 DAG 中执行这两项任务。基本上,我希望它只是触发 dag。

下面是我当前的代码,缺少关键的 conditionally_trigger

# File Name: check-when-db1-sql-task-is-done

from airflow import DAG
from airflow.operators import TriggerDagRunOperator
from airflow.operators import SqlSensor
from datetime import datetime


default_args = {
        'owner': 'airflow',
        'retry_delay': timedelta(minutes=5),
}

dag = DAG('check-when-db1-sql-task-is-done',
        description='Check-when-DB1-SQL-task-is-done',
        default_args=default_args,
        schedule_interval='@once',
        start_date=datetime.now(),
        )

# returns-0-or-1-based-on-job-task-status
sqlsensor = SqlSensor (
        task_id='sql-sensor',
        poke_interval=30,
        timeout=3200,
        sql="""select last_run_outcome from msdb.dbo.sysjobsteps where job_id = '249A5A5D-6AFC-4D6B-8CB1-27C16724A450' and step_id = '1' and last_run_date = (select convert(varchar(24),getdate(),112)); """,    
        mssql_conn_id='db1',
        dag=dag,
        )

# dag-to-start
trigger = TriggerDagRunOperator (
        task_id='start-ssh-job',
        trigger_dag_id="qa-knime-ssh-task",
        python_callable=conditionally_trigger,
        params={'condition_param': True,
                'message': 'Hello World'},
        dag=dag)

最佳答案

我的理解是,TriggerDagRunOperator 用于当你想使用 python 函数来确定是否触发 SubDag 时。该函数在您的代码和示例中称为 conditionally_trigger

在您的情况下,您使用传感器来控制流量并且不需要传递函数。您可以使用 SubDagOperator 而不是 TriggerDagRunOperator 或传递一个简单的始终为真的函数作为 python_callable:

...
python_callable=lambda(context, dag_run_obj):dag_run_obj,
...

关于python - 如何使用 TriggerDagRunOperator 触发 Airflow -dag,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/45568439/

相关文章:

创建巨大对象后,Python 在函数结束时挂起数小时

java - 如果多个用户输入为空则返回 SQL 搜索

mysql - 如何在 mysql 中执行删除级联?提供的示例

python 回合问题

python - 如何使用 Django 获取两条随机记录

sql - 在多个查询中使用一个结果集

mysql - 使用Prepare Statement MySQL时出现语法错误

function - 需要有关 Postgres 触发器和功能的帮助

python - Django 获取相关对象的多对多关系

sql - 简单的自连接查询性能不佳