如果我有多个具有一些重叠 python 包依赖项的 Airflow dag,我该如何保留这些项目部门。解耦?例如。如果我在同一台服务器上有项目 A 和 B,我会运行它们中的每一个,比如......
source /path/to/virtualenv_a/activate
python script_a.py
deactivate
source /path/to/virtualenv_b/activate
python script_b.py
deactivate
基本上,想在相同的情况下运行 dags(例如,每个 dag 使用的 python 脚本可能具有重叠的包 deps。我想单独开发(即,当想要更新时不必使用包更新所有代码)该软件包仅适用于一个项目))。注意,我一直在使用
BashOperator
运行 python 任务,如...do_stuff = BashOperator(
task_id='my_task',
bash_command='python /path/to/script.py'),
execution_timeout=timedelta(minutes=30),
dag=dag)
有没有办法让这个工作?是否有其他一些最佳实践方法可以让人们解决(或避免)这些问题?
最佳答案
根据 apache-airflow 邮件列表中的讨论,解决我使用各种 python 脚本执行任务的模块化方式的最简单答案是直接为每个脚本或模块调用 virtualenv python 解释器二进制文件,例如。
source /path/to/virtualenv_a/activate
python script_a.py
deactivate
source /path/to/virtualenv_b/activate
python script_b.py
deactivate
会转化为类似的东西do_stuff_a = BashOperator(
task_id='my_task_a',
bash_command='/path/to/virtualenv_a/bin/python /path/to/script_a.py'),
execution_timeout=timedelta(minutes=30),
dag=dag)
do_stuff_b = BashOperator(
task_id='my_task_b',
bash_command='/path/to/virtualenv_b/bin/python /path/to/script_b.py'),
execution_timeout=timedelta(minutes=30),
dag=dag)
在 Airflow 中。关于将 args 传递给任务的问题 ,这取决于您要传入的参数的性质。在我的情况下,某些参数取决于运行 dag 当天数据表的样子(例如,表中的最高时间戳记录等.)。要将这些参数添加到任务中,我有一个在此之前运行的“congif dag”。在 config dag 中,有一个 Task 将“真实”dag 的 args 生成为 python dict 并转换为 pickle 文件。然后“config” dag 有一个任务是
TriggerDagRunOperator
激活“真实”dag,该dag具有从“config”dag生成的pickle文件读取的初始逻辑(在我的情况下,作为Dict
),我将其读入bash_command
像 bash_command=f"python script.py {configs['arg1']}"
这样的字符串.
关于airflow - 如何管理 Airflow dags 之间的 python 包?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/58403832/