我正在本地尝试 Azure 函数的 Azure Blob 绑定(bind),但运行示例代码时出现错误:
主机.json
{
"version": "2.0",
"logging": {
"applicationInsights": {
"samplingSettings": {
"isEnabled": true,
"excludedTypes": "Request"
}
}
},
"extensionBundle": {
"id": "Microsoft.Azure.Functions.ExtensionBundle",
"version": "[3.3.0, 4.0.0)"
},
"extensions": {
"blobs": {
"maxDegreeOfParallelism": 4
}
}
}
init.py
import logging
import azure.functions as func
app = func.FunctionApp()
@app.function_name(name="BlobOutput1")
@app.route(route="file")
@app.blob_input(arg_name="inputblob",
path="sample-workitems/test.txt",
connection="blob__serviceUri")
@app.blob_output(arg_name="outputblob",
path="newblob/test.txt",
connection="blob__serviceUri")
def main(req: func.HttpRequest, inputblob: str, outputblob: func.Out[str]):
logging.info(f'Python Queue trigger function processed {len(inputblob)} bytes')
outputblob.set(inputblob)
return "ok"
我正在 VS Code 中运行代码。 我已准备好所有先决条件,但出现此错误:
[2023-05-12T23:12:53.216Z] Worker failed to load function: 'test' with functionId: '411ec582-2199-44a3-ac80-3e1be543cc33'.
[2023-05-12T23:12:53.219Z] Result: Failure
Exception: TypeError: <azure.functions.decorators.function_app.FunctionBuilder object at 0x000001AA650B3070> is not a module, class, method, or function.
Stack: File "C:\Program Files\Microsoft\Azure Functions Core Tools\workers\python\3.10/WINDOWS/X64\azure_functions_worker\dispatcher.py", line 376, in _handle__function_load_request
self._functions.add_function(
File "C:\Program Files\Microsoft\Azure Functions Core Tools\workers\python\3.10/WINDOWS/X64\azure_functions_worker\functions.py", line 333, in add_function
annotations = typing.get_type_hints(func)
File "C:\Users\2643894\AppData\Local\Programs\Python\Python310\lib\typing.py", line 1856, in get_type_hints
raise TypeError('{!r} is not a module, class, method, '
.
任何帮助将不胜感激。
请提供修复此错误的解决方案。
最佳答案
首先,我在存储帐户中创建了一个容器并上传了一个 blob,如下所示,
然后,我在存储帐户中创建了另一个容器并上传了一个 blob,
当我从 Azure 门户中的第一个容器下载 blob 时,它如下所示,
我从 Azure 门户中的第二个容器下载了 blob,如下所示,并且 blob 中没有文本:
源代码来自MS Doc .
import azure.functions as func
import logging
app = func.FunctionApp()
@app.function_name(name="BlobOutput1")
@app.route(route="file")
@app.blob_input(arg_name="inputblob",
path="sample-workitems/samplekam.txt",
connection="AzureWebJobsStorage")
@app.blob_output(arg_name="outputblob",
path="newblob/demokam.txt",
connection="AzureWebJobsStorage")
def main(req: func.HttpRequest, inputblob: str, outputblob: func.Out[str]):
logging.info(f'Python Queue trigger function processed {len(inputblob)} bytes')
outputblob.set(inputblob)
return "ok"
local.setting.json:
{
"IsEncrypted": false,
"Values": {
"FUNCTIONS_WORKER_RUNTIME": "python",
"AzureWebJobsStorage": "<storage-connction-string>",
"AzureWebJobsFeatureFlags": "EnableWorkerIndexing"
}
}
运行 Blob 存储触发函数,如下所示:-
单击fn + f5或单击运行>调试,它成功运行,如下所示:
输出:
通过上面输出的 URL,我在浏览器中得到了输出,
然后,我从 Azure 门户的第二个容器下载了 blob,如下所示,
并且 blob 内容看起来像第一个容器中的 blob,
在 VS 代码中创建 Blob 存储触发器函数的步骤如下,
我在我的 Vs Code 中打开了一个文件夹,并创建了一个新的函数触发器,如下所示:-
单击“创建函数”,然后选择文件夹,
它将创建如下所示的项目结构,
然后,我创建了一个 Azure Blob 存储触发器函数,如下所示,
它将在function_app.py中创建一个Blob存储触发函数代码,
然后复制并粘贴 MS Doc 中的源代码在 function_app.py 中,如下所示,
我的 local.settings.json:-
{
"IsEncrypted": false,
"Values": {
"FUNCTIONS_WORKER_RUNTIME": "python",
"AzureWebJobsStorage": "your-connection-string",
"AzureWebJobsFeatureFlags": "EnableWorkerIndexing"
}
}
运行 Blob 存储触发函数,如下所示:-
点击fn + f5或点击运行>调试,会提示连接存储帐户,
单击“连接存储帐户”,
从列表中选择存储帐户,或者您可以创建一个新的存储帐户,
再次单击fn + f5或单击运行>调试,它成功运行,如下所示: 输出:
通过上面输出的 URL,我在浏览器中得到了输出,
然后,我从 Azure 门户的第二个容器下载了 blob,如下所示,
并且 blob 内容看起来像第一个容器中的 blob,
关于Azure 函数 (Python) 和 Blob 存储绑定(bind)错误,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/76240442/