airflow - 使用 Apache Airflow 检查 Azure Datalake 上是否存在文件的最佳方法是什么?

标签 airflow azure-data-lake

我有一个 DAG,用于检查文件是否已上传到特定目录中的 Azure DataLake。如果是这样,则允许其他 DAG 运行。

我考虑过使用 FileSensor,但我认为 fsconnid 参数不足以针对 DataLake 进行身份验证

最佳答案

Azure provider 中没有 AzureDataLakeSensor但您可以轻松地实现一个,因为 AzureDataLakeHookcheck_for_file函数,因此所需要做的就是用实现 poke() 的 Sensor 类包装该函数BaseSensorOperator 的函数。通过这样做,您可以使用 Microsoft Azure Data Lake Connection直接。

我没有测试它,但这应该有效:

from airflow.providers.microsoft.azure.hooks.data_lake import AzureDataLakeHook
from airflow.sensors.base import BaseSensorOperator

class MyAzureDataLakeSensor(BaseSensorOperator):
    """
    Sense for files in Azure Data Lake

    :param path: The Azure Data Lake path to find the objects. Supports glob
        strings (templated)
    :param azure_data_lake_conn_id: The Azure Data Lake conn
    """

    template_fields: Sequence[str] = ('path',)
    ui_color = '#901dd2'

    def __init__(
        self, *, path: str, azure_data_lake_conn_id: str = 'azure_data_lake_default', **kwargs
    ) -> None:
        super().__init__(**kwargs)
        self.path = path
        self.azure_data_lake_conn_id = azure_data_lake_conn_id

    def poke(self, context: "Context") -> bool:
        hook = AzureDataLakeHook(azure_data_lake_conn_id=self.azure_data_lake_conn_id)
        self.log.info('Poking for file in path: %s', self.path)
        try:
            hook.check_for_file(file_path=self.path)
            return True
        except FileNotFoundError:
            pass
        return False

使用示例:

MyAzureDataLakeSensor(
    task_id='adls_sense',
    path='folder/file.csv',
    azure_data_lake_conn_id='azure_data_lake_default',
    mode='reschedule'
)

关于airflow - 使用 Apache Airflow 检查 Azure Datalake 上是否存在文件的最佳方法是什么?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/72138993/

相关文章:

python - 无法从另一个文件夹导入类

amazon-web-services - AWS MWAA Cloudformation 堆栈创建失败并出现 NotStabilized

迭代文件夹时 Azure Datalake Gen1 权限被拒绝

python - 如何使用python在azure-data-lake中的文件上应用elasticsearch?

azure - 如何备份 ADLS GEN1 的 ACL?

excel - 如何在 Azure Data Lake、Azure SQL、Azure Data Lake Analytics 和 Azure SQL VM 之间做出决定?

c# - 将 C# 应用程序连接到 Azure Databricks

python - AirFlow DAG 在 DST 后运行两次

docker - 即使在我的 github 配置文件中创建并保存了公钥,远程 github 访问也被拒绝

smtp - Airflow :使用电子邮件运算符时无法分配请求的地址错误