python - Apache Airflow - 即使关键任务失败,DAG 也会注册为成功

标签 python etl pipeline airflow

我是 Apache Airflow 的新手,我想编写一个 DAG 来将一些数据从源数据库中的一组表移动到目标数据库中的一组表。我正在尝试设计 DAG,以便有人可以简单地为新源表编写 create tableinsert into SQL 脚本 --> target table process 并将它们放入文件夹。然后,在下一次 DAG 运行时,DAG 将从文件夹中选取脚本并运行新任务。我像这样设置我的 DAG:

source_data_check_task_1 (Check Operator or ValueCheckOperator)
source_data_check_task_2 (Check Operator or ValueCheckOperator, Trigger on ALL_SUCCESS)
source_data_check_task_3 (Check Operator or ValueCheckOperator, Trigger on ALL_SUCCESS)

source_data_check_task_1 >> source_data_check_task_2 >> source_data_check_task_3

for tbl_name in tbl_name_list:
    tbl_exists_check (Check Operator, trigger on ALL_SUCCESS): check if `new_tbl` exists in database by querying `information_schema`
        tbl_create_task (SQL Operator, trigger on ALL_FAILED): run the `create table` SQL script
    tbl_insert_task (SQL Operator ,trigger on ONE_SUCCESS): run the `insert into` SQL script

    source_data_check_task_3 >> tbl_exists_check
    tbl_exists_check >> tbl_create_task
    tbl_exists_check >> tbl_insert_task
    tbl_create_task >> tbl_insert)task

我在使用此设置时遇到了两个问题:(1) 如果任何数据质量检查任务失败,tbl_create_task 仍会启动,因为它会在 ALL_FAILED 和 ( 2) 无论哪个任务失败,DAG 都会显示运行是SUCCESS。如果 tbl_exists_check 失败,这很好,因为它应该至少失败一次,但如果某些关键任务失败(如任何数据质量检查任务),则不理想。

有没有办法以不同的方式设置我的 DAG 来解决这些问题?

下面是实际代码:

from airflow import DAG
from airflow.operators.postgres_operator import PostgresOperator
from airflow.operators.check_operator import ValueCheckOperator, CheckOperator
from airflow.operators.bash_operator import BashOperator
from airflow.models import Variable
from datetime import datetime, timedelta
from airflow.utils.trigger_rule import TriggerRule

sql_path = Variable.get('sql_path')

default_args = {
    'owner': 'enmyj',
    'depends_on_past':True,
    'email_on_failure': False,
    'email_on_retry': False,
    'retries': 0
}

dag = DAG(
    'test', 
    default_args=default_args, 
    schedule_interval=None,
    template_searchpath=sql_path
)

# check number of weeks in bill pay (made up example)
check_one = CheckOperator(
    task_id='check_one',
    conn_id='conn_name',
    sql="""select count(distinct field) from dbo.table having count(distinct field) >= 4 """,
    dag=dag
)

check_two = CheckOperator(
    task_id='check_two',
    conn_id='conn_name',
    sql="""select count(distinct field) from dbo.table having count(distinct field) <= 100""",
    dag=dag
)

check_one >> check_two

ls = ['foo','bar','baz','quz','apple']
for tbl_name in ls:
    exists = CheckOperator(
        task_id='tbl_exists_{}'.format(tbl_name),
        conn_id='conn_name',
        sql =""" select count(*) from information_schema.tables where table_schema = 'test' and table_name = '{}' """.format(tbl_name),
        trigger_rule=TriggerRule.ALL_SUCCESS,
        depends_on_past=True,
        dag = dag
    )

    create = PostgresOperator(
        task_id='tbl_create_{}'.format(tbl_name),
        postgres_conn_id='conn_name',
        database='triforcedb',
        sql = 'create table test.{} (like dbo.source)'.format(tbl_name), # will be read from SQL file
        trigger_rule=TriggerRule.ONE_FAILED,
        depends_on_past=True,
        dag = dag
    )

    insert = PostgresOperator(
        task_id='tbl_insert_{}'.format(tbl_name),
        postgres_conn_id='conn_name',
        database='triforcedb',
        sql = 'insert into test.{} (select * from dbo.source limit 10)'.format(tbl_name), # will be read from SQL file
        trigger_rule=TriggerRule.ONE_SUCCESS,
        depends_on_past=True,
        dag = dag
    )

    check_two >> exists
    exists >> create
    create >> insert
    exists >> insert

最佳答案

您有一个利用 BranchPythonOperator 的完美用例这将允许您执行检查以查看表是否存在,然后在插入到该表之前继续创建表,而不必担心 TRIGGER_RULES 并使您的 DAG 逻辑在 UI 中更加清晰。

关于python - Apache Airflow - 即使关键任务失败,DAG 也会注册为成功,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/51708356/

相关文章:

python - 本示例中使用什么正则表达式

Python执行powershell命令

python - 通过正则表达式和/或 python 从文本文件中提取信息

sql-server - SSIS:平面文件默认长度

etl - RDB2RDF 方法

linux - : `ls -lt | grep - | head -1 | awk ' {print $9 }' | xargs rm` 是什么意思

python - 使用 WTForms 字段列表

sql-server - 使用SSIS进行数据转换

c++ - 使用非静态消息处理程序获取 gstreamer 总线消息

python - NLP:为什么在 sklearn Pipeline 中使用两个向量化器(Bag of Words/TFIDF)?