airflow - 如何在 Airflow 中实现轮询?

标签 airflow

我想使用 Airflow 实现定期轮询外部系统(ftp 服务器等)的数据流,检查符合特定条件的新文件,然后为这些文件运行一堆任务。现在,我是 Airflow 的新手,读到传感器是您在这种情况下可以使用的东西,实际上我设法编写了一个传感器,当我为它运行“Airflow 测试”时,它可以正常工作。但是我对传感器的 poke_interval 和 DAG 调度的关系有点困惑。我应该如何为我的用例定义这些设置?或者我应该使用其他方法吗?我只希望 Airflow 在这些文件可用时运行任务,而不是在一段时间内没有新文件可用时让仪表板充满故障。

最佳答案

您的理解是正确的,当您想要轮询时,使用传感器是可行的方法,可以使用现有传感器或实现您自己的传感器。

但是,它们始终是 DAG 的一部分,并且不会在其边界之外执行。 DAG 执行取决于 start_dateschedule_interval,但您可以利用它和传感器根据外部服务器的状态实现某种 DAG:一种可能的方法是使用传感器启动整个 DAG,该传感器检查条件是否发生,如果条件不满足则决定跳过整个 DAG(您可以确保传感器将下游任务标记为 skipped 而不是 failed 通过将它们的 soft_fail 参数设置为 True)。通过使用最频繁的调度选项 (* * * * *),您可以设置一分钟的轮询间隔。如果您确实需要最短的轮询时间,您可以调整传感器的poke_intervaltimeout 参数。

但是请记住,Airflow 本身可能无法保证执行时间,因此对于非常短的轮询时间,您可能需要研究替代方案(或者至少考虑与我刚刚分享的方法不同的方法)。

关于airflow - 如何在 Airflow 中实现轮询?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/48248369/

相关文章:

airflow - 使用 Airflow 的 DataflowPythonOperator 安排数据流作业时出错

python - 我们如何使用 apache airflow API 创建 dataproc 集群

Airflow - 并行执行 X 个动态任务,最多 4 个任务

airflow - 将代码分发到 Airflow 网络服务器/调度程序+工作人员和工作流的最佳方式

deployment - 使用 Airflow 进行零停机部署

celery - 向 Airflow 添加额外的 celery 配置

rabbitmq - Airflow Scheduler 和 Web 服务器在对 RabbitMQ 上运行的任务进行排队时挂起

python - 使用相同的DAGS创建新环境时,缺少FERNET_KEY配置

ssl - 在 GCP Composer Airflow 中使用 AWS SES SMTP 服务器时出现 "SSL: WRONG_VERSION_NUMBER"

python-3.x - 在KubernetesPods上设置HTTP请求设置