azure - 循环遍历文件夹目录中的每个文件并检查日期 Azure Data Factory V2 -错误代码

标签 azure azure-data-factory

我想循环访问 stfp 文件夹中的每个文件并检查它是否是新的,然后将新文件复制到数据湖上 现在我有下面的代码,但我认为它不正确。在第二个 GetLastModifyfromFile 事件中没有使用 @item() 来引用循环中项目的最后日期,而是引用一个名为 SrcLocalFile 的完全不同的数据集。

{
"name": "IncrementalloadfromSingleFolder",
"properties": {
    "activities": [
        {
            "name": "GetFileList",
            "type": "GetMetadata",
            "policy": {
                "timeout": "7.00:00:00",
                "retry": 0,
                "retryIntervalInSeconds": 30,
                "secureOutput": false
            },
            "typeProperties": {
                "dataset": {
                    "referenceName": "SrcLocalDir",
                    "type": "DatasetReference"
                },
                "fieldList": [
                    "childItems"
                ]
            }
        },
        {
            "name": "ForEachFile",
            "type": "ForEach",
            "dependsOn": [
                {
                    "activity": "GetFileList",
                    "dependencyConditions": [
                        "Succeeded"
                    ]
                }
            ],
            "typeProperties": {
                "items": {
                    "value": "@activity('GetFileList').output.childItems",
                    "type": "Expression"
                },
                "activities": [
                    {
                        "name": "GetLastModifyfromFile",
                        "type": "GetMetadata",
                        "policy": {
                            "timeout": "7.00:00:00",
                            "retry": 0,
                            "retryIntervalInSeconds": 30,
                            "secureOutput": false
                        },
                        "typeProperties": {
                            "dataset": {
                                "referenceName": "SrcLocalFile",
                                "type": "DatasetReference"
                            },
                            "fieldList": [
                                "lastModified"
                            ]
                        }
                    },
                    {
                        "name": "IfNewFile",
                        "type": "IfCondition",
                        "dependsOn": [
                            {
                                "activity": "GetLastModifyfromFile",
                                "dependencyConditions": [
                                    "Succeeded"
                                ]
                            }
                        ],
                        "typeProperties": {
                            "expression": {
                                "value": "@and(less(activity('GetLastModifyfromFile').output.lastModified, pipeline().parameters.current_time), greaterOrEquals(activity('GetLastModifyfromFile').output.lastModified, pipeline().parameters.last_time))",
                                "type": "Expression"
                            },
                            "ifTrueActivities": [
                                {
                                    "name": "CopyNewFiles",
                                    "type": "Copy",
                                    "policy": {
                                        "timeout": "7.00:00:00",
                                        "retry": 0,
                                        "retryIntervalInSeconds": 30,
                                        "secureOutput": false
                                    },
                                    "typeProperties": {
                                        "source": {
                                            "type": "FileSystemSource",
                                            "recursive": false
                                        },
                                        "sink": {
                                            "type": "BlobSink"
                                        },
                                        "enableStaging": false,
                                        "dataIntegrationUnits": 0
                                    },
                                    "inputs": [
                                        {
                                            "referenceName": "SrcLocalFile",
                                            "type": "DatasetReference"
                                        }
                                    ],
                                    "outputs": [
                                        {
                                            "referenceName": "TgtBooksBlob",
                                            "type": "DatasetReference"
                                        }
                                    ]
                                }
                            ]
                        }
                    }
                ]
            }
        }
    ],
    "parameters": {
        "current_time": {
            "type": "String",
            "defaultValue": "2018-04-01T00:00:00Z"
        },
        "last_time": {
            "type": "String",
            "defaultValue": "2018-03-01T00:00:00Z"
        }
    },
    "folder": {
        "name": "IncrementalLoadSingleFolder"
    }
},
"type": "Microsoft.DataFactory/factories/pipelines"

}

最佳答案

只是一个想法 - 我没有看到你的数据集定义,但是......

是否应该将路径和文件名作为参数传递给数据集?

即将 2 个参数添加到路径和文件的数据集定义中(例如 pathparam 和 fileparam)。在数据集的 fileName 和folderName 设置中使用这些参数作为@dataset().pathparam 和@dataset().fileparam。

在上面的代码中,将参数传递给数据集输入的新“参数”部分,其路径参数和文件参数等于您从先前事件中检索的文件夹和子项。

注意 - 有一个错误,数据集名称中不能包含空格。

关于azure - 循环遍历文件夹目录中的每个文件并检查日期 Azure Data Factory V2 -错误代码,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/53119503/

相关文章:

arrays - 是否可以在数据工厂复制事件中压平 json 文件?

azure - Dynamics 365(通过 Azure 数据工厂)到 Azure SQL Server 之间的数据传输存在时区差异

azure - 当 token 是 url 的一部分时,创建 REST 链接服务

java - Azure Java : cannot add plan information while creating VM

azure - 我可以通过arm模板在现有命名空间中创建新的事件中心吗?

javascript - 在 Azure 应用服务上部署 NextJS 应用后无法运行 npm start

azure - 为什么 Azure 数据工厂中的某些容错设置被禁用?

azure - 负载均衡器规则规范所需的映射列表

azure - 在上传到 Azure Blob 存储之前更改 IFormFile 的文件名

azure - Azure 数据工厂中的客户管理 key