python - 为每个文件运行 Airflow DAG

标签 python airflow

所以我在 airflow 中有这个非常好的 DAG,它基本上在二进制文件上运行几个分析步骤(作为 airflow 插件实现)。 DAG 由 ftp 传感器触发,它只检查 ftp 服务器上是否有新文件,然后启动整个工作流程。

所以目前的工作流程是这样的:DAG 已按定义触发 -> 传感器等待 ftp 上的新文件 -> 执行分析步骤 -> 工作流程结束。

我想要的是这样的:DAG 是触发器 -> 传感器等待 ftp 上的新文件 -> 对于 ftp 上的每个文件,分析步骤单独执行 -> 每个工作流单独结束。

如何为 ftp 服务器上的每个文件执行分析工作流,如果服务器上没有文件,只有一个传感器应该等待新文件? 例如,我不想每隔一秒左右启动一个 DAG,因为那时我有很多传感器正在等待新文件。

最佳答案

使用 2 个 DAG 将传感步骤与分析步骤分开。

DAG 1:

传感器等待 ftp 上的新文件 -> 一旦新文件到达,使用 TriggerDagRunOperator 触发 DAG 1 本身 -> 使用 TriggerDagRunOperator 触发 DAG 2

DAG 2:

对文件进行分析

关于python - 为每个文件运行 Airflow DAG,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/54992541/

相关文章:

python - 如何检查python字典内的内部字典中是否存在键?

airflow - 如果下游任务在 Airflow 中失败,如何重新运行上游任务(使用 Sub Dags)

python - Airflow :将 {{ ds }} 作为参数传递给 PostgresOperator

postgresql - Airflow 1.9 - 任务卡在队列中

版本\"v1\"中的 KubernetesPodOperator Pod 不能作为 Pod : v1. Pod.Spec 处理

Python - 按成员查找天数差异分组

python - DiD 或 UNIX 套接字的安全替代方案?

pandas - Apache Airflow 或 Apache Beam 用于数据处理和作业调度

python - 模拟与魔术模拟

python - 如何将新数据 append 到新行