python - 如何使用 Airflow 变量动态运行 DAG 的多个作业

标签 python airflow

我知道如何使用变量动态运行 DAG 的任务,并且它工作得很好,直到您触发同一 DAG 的多次运行。

即,一旦在data/to/load/目录下创建了一个包含文件的新目录,我就会在某处编写一个脚本,该脚本将触发airflow变量-set dir data/to/load/$newDir 后跟 airflow trigger_dag dyn_test。现在假设目录“a”和“b”在data/to/load/下创建(同时),这将使airflow变量+airflow trigger_dag调用两次变量集调用有两个不同的输入(一个以“a”为后缀,另一个以“b”为后缀)。我在 Airflow GUI 中看到两个为 DAG 运行的作业,但问题是它们都考虑相同的目录值 a 或 b。这肯定意味着它需要最终的“Airflow 变量设置”调用。我该如何解决?触发多次运行的方法是什么,每次运行都采用不同的值(在 dir 变量中)来动态循环。我的达格看起来像这样:

# Using Airflow Variables
from airflow.models import Variable
dir = Variable.get("dir")


args = {
    'owner': 'airflow',
    'start_date': datetime(2004, 11, 12),
}

dag = DAG(
    dag_id='dyn_test',
    default_args=args,
    schedule_interval='@once'
)


filesInDir = next(os.walk(dir))[2] 

for file in filesInDir:
    task1 = # change 'file' structure
    task2 = # store changed 'file'

    task1 >> task2

最佳答案

您的问题中描述的场景是一个先进先出队列适合的场景,假设您希望保留显式设置要作为单独处理的目录的当前方式序列。

也就是说,Airflow CLI trigger_dags 命令允许传递 --conf 标志来设置在 DagRun 中传递的配置字典,我正如您所描述的那样,在设置变量的地方,就会触发 dag。

http://airflow.apache.org/cli.html#trigger_dag

这是代码中可能的样子。

airflow trigger_dag dyn_test --conf '{"me_seeks.dir": "data/to/load/$newDir"}'

您将在用于任务的 Airflow 运算符中设置 provide_context kwargs

可以在上下文中检索 DagRun 的实例,并在检索的配置中设置 dir

假设您使用 Airflow PythonOperator 定义了任务;那么您在 python_callable 中检索 dir 的代码将类似于以下内容:

def me_seeks(dag_run=None):
    dir = dag_run.conf['me_seeks.dir']

关于python - 如何使用 Airflow 变量动态运行 DAG 的多个作业,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/55674682/

相关文章:

python - 从数据库 html 内容渲染一个 django 模板

python - 为每个文件运行 Airflow DAG

python - MySQL 到云存储桶 Airflow DAG 的 UnicodeDecodeError

airflow - 在命令行中测试 Airflow Dag

ubuntu - 防止禁用的 DAG 在启用时立即运行

rest - 如何使用 Airflow DAG 调用 REST 端点

python - 基于语言测试的阿拉伯语句子过滤列表 : Why so slow?

python - django 创建多种类型用户的最佳方法

python - wxPython:制作可滚动的 DC

python - 计算允许 R 中的 QWERTY 错误的 Levenshtein 距离