airflow - 如何管理 Airflow dags 之间的 python 包?

标签 airflow

如果我有多个具有一些重叠 python 包依赖项的 Airflow dag,我该如何保留这些项目部门。解耦?例如。如果我在同一台服务器上有项目 A 和 B,我会运行它们中的每一个,比如......

source /path/to/virtualenv_a/activate
python script_a.py
deactivate
source /path/to/virtualenv_b/activate
python script_b.py
deactivate

基本上,想在相同的情况下运行 dags(例如,每个 dag 使用的 python 脚本可能具有重叠的包 deps。我想单独开发(即,当想要更新时不必使用包更新所有代码)该软件包仅适用于一个项目))。注意,我一直在使用 BashOperator运行 python 任务,如...

do_stuff = BashOperator(
        task_id='my_task',
        bash_command='python /path/to/script.py'),
        execution_timeout=timedelta(minutes=30),
        dag=dag)

有没有办法让这个工作?是否有其他一些最佳实践方法可以让人们解决(或避免)这些问题?

最佳答案

根据 apache-airflow 邮件列表中的讨论,解决我使用各种 python 脚本执行任务的模块化方式的最简单答案是直接为每个脚本或模块调用 virtualenv python 解释器二进制文件,例如。

source /path/to/virtualenv_a/activate
python script_a.py
deactivate
source /path/to/virtualenv_b/activate
python script_b.py
deactivate
会转化为类似的东西
do_stuff_a = BashOperator(
        task_id='my_task_a',
        bash_command='/path/to/virtualenv_a/bin/python /path/to/script_a.py'),
        execution_timeout=timedelta(minutes=30),
        dag=dag)
do_stuff_b = BashOperator(
        task_id='my_task_b',
        bash_command='/path/to/virtualenv_b/bin/python /path/to/script_b.py'),
        execution_timeout=timedelta(minutes=30),
        dag=dag)
在 Airflow 中。

关于将 args 传递给任务的问题 ,这取决于您要传入的参数的性质。在我的情况下,某些参数取决于运行 dag 当天数据表的样子(例如,表中的最高时间戳记录等.)。要将这些参数添加到任务中,我有一个在此之前运行的“congif dag”。在 config dag 中,有一个 Task 将“真实”dag 的 args 生成为 python dict 并转换为 pickle 文件。然后“config” dag 有一个任务是 TriggerDagRunOperator激活“真实”dag,该dag具有从“config”dag生成的pickle文件读取的初始逻辑(在我的情况下,作为Dict),我将其读入bash_commandbash_command=f"python script.py {configs['arg1']}" 这样的字符串.

关于airflow - 如何管理 Airflow dags 之间的 python 包?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/58403832/

相关文章:

python - 当尝试运行 dag 和 Airflow 返回状态代码2时

python - 获取 apache Airflow 任务的 unique_id

airflow - 如何限制 Airflow 一次仅运行一个 DAG 实例?

airflow - Apache Airflow 任务卡在 'up_for_retry' 状态

python - Airflow DAG 中的动态任务生成

airflow - 如何修复错误 "AirflowException("作业运行器的主机名不匹配“)”?

flask - Airflow基本身份验证-无法使用创建的用户登录

python - Airflow 不会触发并发 DAG `LocalExecutor`

airflow - Prometheus:如何根据任何 Airflow Dag 而不是特定 Airflow Dag 的结果创建警报

python - 如何在 Google Composer 上重启 Airflow 服务器?