我想循环访问 stfp 文件夹中的每个文件并检查它是否是新的,然后将新文件复制到数据湖上
现在我有下面的代码,但我认为它不正确。在第二个 GetLastModifyfromFile
事件中没有使用 @item()
来引用循环中项目的最后日期,而是引用一个名为 SrcLocalFile 的完全不同的数据集。
{
"name": "IncrementalloadfromSingleFolder",
"properties": {
"activities": [
{
"name": "GetFileList",
"type": "GetMetadata",
"policy": {
"timeout": "7.00:00:00",
"retry": 0,
"retryIntervalInSeconds": 30,
"secureOutput": false
},
"typeProperties": {
"dataset": {
"referenceName": "SrcLocalDir",
"type": "DatasetReference"
},
"fieldList": [
"childItems"
]
}
},
{
"name": "ForEachFile",
"type": "ForEach",
"dependsOn": [
{
"activity": "GetFileList",
"dependencyConditions": [
"Succeeded"
]
}
],
"typeProperties": {
"items": {
"value": "@activity('GetFileList').output.childItems",
"type": "Expression"
},
"activities": [
{
"name": "GetLastModifyfromFile",
"type": "GetMetadata",
"policy": {
"timeout": "7.00:00:00",
"retry": 0,
"retryIntervalInSeconds": 30,
"secureOutput": false
},
"typeProperties": {
"dataset": {
"referenceName": "SrcLocalFile",
"type": "DatasetReference"
},
"fieldList": [
"lastModified"
]
}
},
{
"name": "IfNewFile",
"type": "IfCondition",
"dependsOn": [
{
"activity": "GetLastModifyfromFile",
"dependencyConditions": [
"Succeeded"
]
}
],
"typeProperties": {
"expression": {
"value": "@and(less(activity('GetLastModifyfromFile').output.lastModified, pipeline().parameters.current_time), greaterOrEquals(activity('GetLastModifyfromFile').output.lastModified, pipeline().parameters.last_time))",
"type": "Expression"
},
"ifTrueActivities": [
{
"name": "CopyNewFiles",
"type": "Copy",
"policy": {
"timeout": "7.00:00:00",
"retry": 0,
"retryIntervalInSeconds": 30,
"secureOutput": false
},
"typeProperties": {
"source": {
"type": "FileSystemSource",
"recursive": false
},
"sink": {
"type": "BlobSink"
},
"enableStaging": false,
"dataIntegrationUnits": 0
},
"inputs": [
{
"referenceName": "SrcLocalFile",
"type": "DatasetReference"
}
],
"outputs": [
{
"referenceName": "TgtBooksBlob",
"type": "DatasetReference"
}
]
}
]
}
}
]
}
}
],
"parameters": {
"current_time": {
"type": "String",
"defaultValue": "2018-04-01T00:00:00Z"
},
"last_time": {
"type": "String",
"defaultValue": "2018-03-01T00:00:00Z"
}
},
"folder": {
"name": "IncrementalLoadSingleFolder"
}
},
"type": "Microsoft.DataFactory/factories/pipelines"
}
最佳答案
只是一个想法 - 我没有看到你的数据集定义,但是......
是否应该将路径和文件名作为参数传递给数据集?
即将 2 个参数添加到路径和文件的数据集定义中(例如 pathparam 和 fileparam)。在数据集的 fileName 和folderName 设置中使用这些参数作为@dataset().pathparam 和@dataset().fileparam。
在上面的代码中,将参数传递给数据集输入的新“参数”部分,其路径参数和文件参数等于您从先前事件中检索的文件夹和子项。
注意 - 有一个错误,数据集名称中不能包含空格。
关于azure - 循环遍历文件夹目录中的每个文件并检查日期 Azure Data Factory V2 -错误代码,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/53119503/