我有一个 DAG,用于检查文件是否已上传到特定目录中的 Azure DataLake。如果是这样,则允许其他 DAG 运行。
我考虑过使用 FileSensor,但我认为 fsconnid 参数不足以针对 DataLake 进行身份验证
最佳答案
Azure provider 中没有 AzureDataLakeSensor
但您可以轻松地实现一个,因为 AzureDataLakeHook
有check_for_file
函数,因此所需要做的就是用实现 poke()
的 Sensor 类包装该函数BaseSensorOperator
的函数。通过这样做,您可以使用 Microsoft Azure Data Lake Connection直接。
我没有测试它,但这应该有效:
from airflow.providers.microsoft.azure.hooks.data_lake import AzureDataLakeHook
from airflow.sensors.base import BaseSensorOperator
class MyAzureDataLakeSensor(BaseSensorOperator):
"""
Sense for files in Azure Data Lake
:param path: The Azure Data Lake path to find the objects. Supports glob
strings (templated)
:param azure_data_lake_conn_id: The Azure Data Lake conn
"""
template_fields: Sequence[str] = ('path',)
ui_color = '#901dd2'
def __init__(
self, *, path: str, azure_data_lake_conn_id: str = 'azure_data_lake_default', **kwargs
) -> None:
super().__init__(**kwargs)
self.path = path
self.azure_data_lake_conn_id = azure_data_lake_conn_id
def poke(self, context: "Context") -> bool:
hook = AzureDataLakeHook(azure_data_lake_conn_id=self.azure_data_lake_conn_id)
self.log.info('Poking for file in path: %s', self.path)
try:
hook.check_for_file(file_path=self.path)
return True
except FileNotFoundError:
pass
return False
使用示例:
MyAzureDataLakeSensor(
task_id='adls_sense',
path='folder/file.csv',
azure_data_lake_conn_id='azure_data_lake_default',
mode='reschedule'
)
关于airflow - 使用 Apache Airflow 检查 Azure Datalake 上是否存在文件的最佳方法是什么?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/72138993/