谁能告诉我如何使用 Airflow FileSensor 的示例? 我用谷歌搜索,还没有找到任何东西。任何例子就足够了。我的用例非常简单:
等待预定的 DAG 将文件放到路径中,FileSensor 任务拾取它,读取内容并处理它。
最佳答案
来自documentation & source code :
from airflow.contrib.sensors.file_sensor import FileSensor
from airflow.operators.dummy_operator import DummyOperator
import datetime
import airflow
# https://airflow.apache.org/code.html#airflow.models.BaseOperator
default_args = {
"depends_on_past" : False,
"start_date" : airflow.utils.dates.days_ago( 1 ),
"retries" : 1,
"retry_delay" : datetime.timedelta( hours= 5 ),
}
with airflow.DAG( "file_sensor_test_v1", default_args= default_args, schedule_interval= "*/5 * * * *", ) as dag:
start_task = DummyOperator( task_id= "start" )
stop_task = DummyOperator( task_id= "stop" )
sensor_task = FileSensor( task_id= "my_file_sensor_task", poke_interval= 30, fs_conn_id= <path>, filepath= <file or directory name> )
start_task >> sensor_task >> stop_task
关于python - Airflow FileSensor 的任何示例?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/54791596/