python - Airflow FileSensor 的任何示例?

标签 python airflow

谁能告诉我如何使用 Airflow FileSensor 的示例? 我用谷歌搜索,还没有找到任何东西。任何例子就足够了。我的用例非常简单:

等待预定的 DAG 将文件放到路径中,FileSensor 任务拾取它,读取内容并处理它。

最佳答案

来自documentation & source code :

from airflow.contrib.sensors.file_sensor import FileSensor
from airflow.operators.dummy_operator    import DummyOperator

import datetime
import airflow

# https://airflow.apache.org/code.html#airflow.models.BaseOperator
default_args = {
    "depends_on_past" : False,
    "start_date"      : airflow.utils.dates.days_ago( 1 ),
    "retries"         : 1,
    "retry_delay"     : datetime.timedelta( hours= 5 ),
}

with airflow.DAG( "file_sensor_test_v1", default_args= default_args, schedule_interval= "*/5 * * * *", ) as dag:

    start_task  = DummyOperator(  task_id= "start" )
    stop_task   = DummyOperator(  task_id= "stop"  )
    sensor_task = FileSensor( task_id= "my_file_sensor_task", poke_interval= 30, fs_conn_id= <path>, filepath= <file or directory name> )

start_task >> sensor_task >> stop_task

关于python - Airflow FileSensor 的任何示例?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/54791596/

相关文章:

python - td 数据的网络抓取

python - 在Python/Airflow中加密数据并在BigQuery中解密的方法

测试整个 Airflow DAG 而不是单个任务

python - 在 Airflow 2.0 中运行多个 Athena 查询

python - Airflow : Passing a dynamic value to Sub DAG operator

python - 如何根据标题名称移动 csv 中的数据并将其移动到同一列?

python - 代码在本地主机上运行,​​但在使用 Google App Engine 部署时不起作用

python - 在 x64 架构上获得心理加速?

python - 如何使用 "__str__"方法?

mysql - Airflow需要mysql吗?