python - 如何循环动态生成airflow任务并并行运行?

标签 python airflow

我有一个用例,我正在下载一些 json 文件并解析它们。根据下载的文件,程序需要将数据填充到不同的表中。数据加载到表中后,必须发送电子邮件通知。

例如,如果程序需要填充表 a 和 b(从 table_list 获得),那么工作流应该看起来像下载 >> 解析 >> [load_table_a, load_table_b] >> send_email

如果表 a, b, c, d 是从 table_list 中获取的,那么工作流程应该类似于下载 >> 解析 >> [load_table_a, load_table_b, load_table_c, load_table_d] >> send_email

这是我正在尝试的。有人可以帮忙吗。

from airflow import DAG
from airflow.operators.python_operator import PythonOperator
from airflow.operators.email_operator import EmailOperator

from datetime import datetime

from download_script import download
from parse_script import parse
from load_2_sf_script import get_table_list, insert_into_sf
from airflow.utils.email import send_email_smtp


default_args = {
    'start_date': datetime(2021, 5, 18)
}


with DAG(
    'Test DAG',
    default_args = default_args,
    catchup = False
) as dag:

    download = PythonOperator(
        task_id = 'download',
        python_callable = download,
        email_on_failure = True,
        email = 'example@example.com'
    )

    parse = PythonOperator(
        task_id = 'parse',
        python_callable = parse,
        email_on_failure = True,
        email = 'example@example.com'
    )


    table_list = get_table_list()
    task_list = []
    for table in table_list:
        task_list.append(
            PythonOperator(
                task_id = 'load_table_{}'.format(table),
                python_callable = insert_into_sf,
                email_on_failure = True,
                email = 'example@example.com',
                op_kwargs = {'category': table}
            )
        )


    send_email = EmailOperator(
        task_id = 'send_email',
        to = ['example@example.com'],
        subject = 'Airflow: Success',
        html_content = 'Dag run completed succesfully.'
    )

    download >> parse >> [task for task in task_list] >> send_email

    

最佳答案

如果这是您所期望的: enter image description here

然后这将起作用:

with DAG(
    'medical_device',
    default_args=default_args,
    catchup=False
) as dag:
    download_task = PythonOperator(
        task_id='download_task',
        python_callable=download,
        email_on_failure=True,
        email='example@example.com'
    )

    parse_task = PythonOperator(
        task_id='parse_task',
        python_callable=parse,
        email_on_failure=True,
        email='example@example.com'
    )


    send_email = EmailOperator(
        task_id='send_email',
        to=['example@example.com'],
        subject='Airflow: Success',
        html_content='Dag run completed succesfully.'
    )

    download_task >> parse_task

    table_list = get_table_list()
    for table in table_list:
        op = PythonOperator(
                task_id='load_table_{}'.format(table),
                python_callable=insert_into_sf,
                email_on_failure=True,
                email='example@example.com',
                op_kwargs={'category': table}
            )
        parse_task >> op >> send_email

你不需要构造列表,你可以使用parse_task >> op >> send_email在for循环中动态设置上下游关系。

提示:尽量让您的 task_id 与任务的变量名称保持一致,这不是必需的,但却是一个很好的做法。

关于python - 如何循环动态生成airflow任务并并行运行?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/67746038/

相关文章:

google-cloud-platform - 如何从 Google Cloud Composer 获取 Airflow db 凭据

airflow - Airflow 中的动态任务 ID 名称

python - multiprocessing.pool.imap 是否有允许多个参数的变体(如星图)?

python - 在 python 中,是否有跨平台的方法来确定哪个进程正在监听给定端口?

python - pycharm 无法识别 opencv_createsamples 命令

由于卷中容器创建的链接,Docker-compose 构建失败

Airflow :在 Airflow 中更改 DAG 的 crontab 时间

airflow - 如何从 Airflow 中的变量写入/读取时间戳?

python - 当尝试将函数与Python中的图像匹配时,有没有办法计算残差?

python - 单元测试以检查未生成的迁移