- 监控容器输入/着陆
- .json 文件以 yy/mm/DD/myfile.json 格式到达
- 如果 json 文件有效 --> 将其移至 input/staging/.json
- 如果无效 --> 复制到 input/rejected/.json
每个子文件夹的函数都会触发多次,并且输出文件夹具有同一文件的 3 个副本。 如何修改函数只触发一次并且只复制文件一次?
导入日志记录 导入 azure.functions 作为 func 导入json
我的init.py
def main(myblob: func.InputStream, inputBlob: bytes, outputBlob1: func.Out[bytes], outputBlob2: func.Out[bytes]):
logging.info(f"Python blob trigger function processed blob \n"
f"Name: {myblob.name}\n"
f"Blob Size: {myblob.length} bytes")
# Read the contents of the input blob
blob_content = myblob.read()
processed_file = validateJSON(blob_content) # returns True or False
# if pass json validation
if processed_file:
outputBlob1.set(myblob.read())
logging.info(f"Blob copied to outputBlob1: {myblob.name}")
else:
outputBlob2.set(myblob.read())
logging.info(f"Blob copied to outputBlob2: {myblob.name}")
# func to validate json data (not file!)
def validateJSON(jsonData):
try:
json.loads(jsonData)
except ValueError as err:
return False
return True
我的 function.json 文件:
{
"scriptFile": "__init__.py",
"bindings": [
{
"name": "myblob",
"type": "blobTrigger",
"direction": "in",
"path": "input/landing/{name}",
"connection": "mystorageaccount"
},
{
"name": "inputBlob",
"type": "blob",
"dataType": "binary",
"direction": "in",
"path": "input/landing/{name}",
"connection": "mystorageaccount"
},
{
"name": "outputBlob1",
"type": "blob",
"dataType": "binary",
"direction": "out",
"path": "input/staging/{rand-guid}.json",
"connection": "mystorageaccount"
},
{
"name": "outputBlob2",
"type": "blob",
"dataType": "binary",
"direction": "out",
"path": "input/regected/{rand-guid}.json",
"connection": "mystorageaccount"
}
]
}
我的终端输出:
[2023-07-08T14:44:03.452Z] Host lock lease acquired by instance ID '000000000000000000000000FA91B3A1'.
[2023-07-08T14:46:27.618Z] Executing 'Functions.BlobTrigger1' (Reason='New blob detected(LogsAndContainerScan): input/landing/2023/07',
[2023-07-08T14:46:28.031Z] Python blob trigger function processed blob
Name: input/landing/2023/07
Blob Size: None bytes
[2023-07-08T14:46:28.164Z] Blob copied to outputBlob2: input/landing/2023/07
[2023-07-08T14:46:28.282Z] Executing 'Functions.BlobTrigger1' (Reason='New blob detected(LogsAndContainerScan): input/landing/2023/07/08',
[2023-07-08T14:46:28.485Z] Python blob trigger function processed blob
Name: input/landing/2023/07/08
Blob Size: None bytes[2023-07-08T14:46:28.500Z] Blob copied to outputBlob2: input/landing/2023/07/08
[2023-07-08T14:46:28.991Z] Executed 'Functions.BlobTrigger1' (Succeeded, Id=6a6e5f58-b49e-46c9-a019-c8814c87e5fb, Duration=1656ms)
[2023-07-08T14:46:29.166Z] Executed 'Functions.BlobTrigger1' (Succeeded, Id=cfe1f858-fe5e-46cd-85fd-281fff7a0204, Duration=1057ms)
[2023-07-08T14:46:29.330Z] Executing 'Functions.BlobTrigger1' (Reason='New blob detected(LogsAndContainerScan): input/landing/2023/07/08/invalidJSON.json', Id=5a81c13f-b633-4be1-bdac-7281389f4403)
[2023-07-08T14:46:29.629Z] Python blob trigger function processed blob
Name: input/landing/2023/07/08/invalidJSON.json
Blob Size: None bytes
[2023-07-08T14:46:29.629Z] Blob copied to outputBlob2: input/landing/2023/07/08/invalidJSON.json
[2023-07-08T14:46:30.211Z] Executed 'Functions.BlobTrigger1' (Succeeded, Id=5a81c13f-b633-4be1-bdac-7281389f4403, Duration=1157ms)
结果:多个副本
最佳答案
Azure blob trigger python function executes multiple times for each subfolder and creates multiple copies of the file
我已经在我的环境中重现了,下面是对我有用的代码:
function.json:
{
"bindings": [
{
"name": "myblob",
"path": "samples-workitems/land/{name}",
"connection": "AzureWebJobsStorage",
"direction": "in",
"type": "blobTrigger"
},
{
"name": "outputBlob1",
"direction": "out",
"type": "blob",
"connection": "AzureWebJobsStorage",
"path": "samples-workitems/approved/{rand-guid}.json"
},
{
"name": "outputBlob2",
"direction": "out",
"type": "blob",
"connection": "AzureWebJobsStorage",
"path": "samples-workitems/rejected/{rand-guid}.json"
}
]
}
init.py:
import logging
import azure.functions as func
import json
def main(myblob: func.InputStream, outputBlob1: func.Out[bytes], outputBlob2: func.Out[bytes]):
logging.info(f"Python blob trigger function processed blob \n"
f"Name: {myblob.name}\n"
f"Blob Size: {myblob.length} bytes")
blob_content1 = myblob.read()
processed_file = validateJSON(blob_content1) # returns True or False
# if pass json validation
if processed_file:
outputBlob1.set(blob_content1)
logging.info(f"Blob copied to outputBlob1: {myblob.name}")
else:
outputBlob2.set(blob_content1)
logging.info(f"Blob copied to outputBlob2: {myblob.name}")
# func to validate json data (not file!)
def validateJSON(jsonData1):
try:
json.loads(jsonData1)
except ValueError as err:
return False
return True
输出:
如果成功:
如果被拒绝:
这是对我有用的代码和过程,尝试更改 function.json
(我观察到 4 个绑定(bind),将其更改为 3)和 init
文件(为什么根据我的说法,您是否使用 inputblob 将其删除)。尝试更改您的代码,您将得到我想要的输出
关于python - Azure blob触发器python函数对每个子文件夹执行多次并创建文件的多个副本,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/76643565/