我有一个用例,我正在下载一些 json 文件并解析它们。根据下载的文件,程序需要将数据填充到不同的表中。数据加载到表中后,必须发送电子邮件通知。
例如,如果程序需要填充表 a 和 b(从 table_list 获得),那么工作流应该看起来像下载 >> 解析 >> [load_table_a, load_table_b] >> send_email
如果表 a, b, c, d 是从 table_list 中获取的,那么工作流程应该类似于下载 >> 解析 >> [load_table_a, load_table_b, load_table_c, load_table_d] >> send_email
这是我正在尝试的。有人可以帮忙吗。
from airflow import DAG
from airflow.operators.python_operator import PythonOperator
from airflow.operators.email_operator import EmailOperator
from datetime import datetime
from download_script import download
from parse_script import parse
from load_2_sf_script import get_table_list, insert_into_sf
from airflow.utils.email import send_email_smtp
default_args = {
'start_date': datetime(2021, 5, 18)
}
with DAG(
'Test DAG',
default_args = default_args,
catchup = False
) as dag:
download = PythonOperator(
task_id = 'download',
python_callable = download,
email_on_failure = True,
email = 'example@example.com'
)
parse = PythonOperator(
task_id = 'parse',
python_callable = parse,
email_on_failure = True,
email = 'example@example.com'
)
table_list = get_table_list()
task_list = []
for table in table_list:
task_list.append(
PythonOperator(
task_id = 'load_table_{}'.format(table),
python_callable = insert_into_sf,
email_on_failure = True,
email = 'example@example.com',
op_kwargs = {'category': table}
)
)
send_email = EmailOperator(
task_id = 'send_email',
to = ['example@example.com'],
subject = 'Airflow: Success',
html_content = 'Dag run completed succesfully.'
)
download >> parse >> [task for task in task_list] >> send_email
最佳答案
然后这将起作用:
with DAG(
'medical_device',
default_args=default_args,
catchup=False
) as dag:
download_task = PythonOperator(
task_id='download_task',
python_callable=download,
email_on_failure=True,
email='example@example.com'
)
parse_task = PythonOperator(
task_id='parse_task',
python_callable=parse,
email_on_failure=True,
email='example@example.com'
)
send_email = EmailOperator(
task_id='send_email',
to=['example@example.com'],
subject='Airflow: Success',
html_content='Dag run completed succesfully.'
)
download_task >> parse_task
table_list = get_table_list()
for table in table_list:
op = PythonOperator(
task_id='load_table_{}'.format(table),
python_callable=insert_into_sf,
email_on_failure=True,
email='example@example.com',
op_kwargs={'category': table}
)
parse_task >> op >> send_email
你不需要构造列表,你可以使用parse_task >> op >> send_email
在for循环中动态设置上下游关系。
提示:尽量让您的 task_id 与任务的变量名称保持一致,这不是必需的,但却是一个很好的做法。
关于python - 如何循环动态生成airflow任务并并行运行?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/67746038/