python - 如何使用 EventHubConsumerClient 类运行异步 Python Event-Hub 触发器 Azure Functions?

标签 python azure azure-functions python-asyncio azure-eventhub

我正在尝试开发一个事件中心触发器azure函数,它可以从第一个事件中心接收事件并将这些事件发送到第二个事件中心。 作为附加功能我希望我的函数是异步的并使用 Azure Blob 存储中的存储检查点。 为此,我想使用 azure-eventhub 库的 EventHubConsumerClient 类( https://pypi.org/project/azure-eventhub/https://learn.microsoft.com/en-us/javascript/api/@azure/event-hubs/eventhubconsumerclient?view=azure-node-latest )

但是,当我在 VSCode 上本地测试该函数时,似乎一开始就无法接收到事件。

我正在监听的事件中心有两个分区。其共享访问策略设置为发送和监听。 我有一个小脚本可以向他发送消息进行测试,效果很好。 我的 Azure 函数运行时是 4.x,使用 python 3.9.13,并在本地使用 conda 基础。

这是我的函数代码,用于在 init.py 中使用 EventHubConsumerClient 类接收事件:

import logging
import asyncio
import os
from azure.eventhub.aio import EventHubConsumerClient
from azure.eventhub.extensions.checkpointstoreblobaio import BlobCheckpointStore
import azure.functions as func

CONNECTION_STR = os.environ.get("EVENT_HUB_CONN_STR")
EVENTHUB_NAME = os.environ.get("EVENT_HUB_NAME")
STORAGE_CONNECTION_STR = os.environ.get("AZURE_STORAGE_CONN_STR")
BLOB_CONTAINER_NAME = os.environ.get("AZURE_STORAGE_NAME")


async def on_event(partition_context, event):
    logging.info("Received event with body: {} from partition: {}.".format(event.body_as_str(encoding="UTF-8"), partition_context.partition_id))
    await partition_context.update_checkpoint(event)


async def receive(client):
    await client.receive(
        on_event=on_event,
        starting_position="-1",  # "-1" is from the beginning of the partition.
    )


async def main(one_event: func.EventHubEvent):
    checkpoint_store = BlobCheckpointStore.from_connection_string(STORAGE_CONNECTION_STR, BLOB_CONTAINER_NAME)
    client = EventHubConsumerClient.from_connection_string(
        CONNECTION_STR,
        consumer_group="$Default",
        eventhub_name=EVENTHUB_NAME,
        checkpoint_store=checkpoint_store,
    )
    async with client:
        await receive(client)


if __name__ == '__main__':
    asyncio.run(main())

来源:https://github.com/Azure/azure-sdk-for-python/blob/main/sdk/eventhub/azure-eventhub/samples/async_samples/recv_with_checkpoint_store_async.py

注意:我知道 main 中的 one_event 没有在主代码中使用,但我希望他充当运行 main 的触发器。

我的 function.json 文件是:

{
  "scriptFile": "__init__.py",
  "bindings": [
    {
      "type": "eventHubTrigger",
      "name": "one_event",
      "direction": "in",
      "eventHubName": "<My_event_hub_name>",
      "connection": "<My_event_hub_co_str>",
      "cardinality": "one",
      "consumerGroup": "$Default"
    }
  ]
}

我在其中定义了一个事件中心输入绑定(bind)以用作触发器。

我还有一个 local.settings.json,其中包含一些变量和requirements.txt,它似乎不缺少任何库。

仅供引用:我测试了另一种方法(此处: https://learn.microsoft.com/en-us/azure/azure-functions/functions-bindings-event-hubs-trigger?tabs=in-process%2Cfunctionsv2%2Cextensionv5&pivots=programming-language-python )(不使用 EventHubConsumerClient 类)来接收事件,它工作正常,但我没有检查点和异步功能。

在使用“func start”在本地运行该函数时,我没有接收和打印有关已接收事件的一些基本信息,而是在终端中连续打印了大量消息。 它继续打印并锁定我的终端,因此我必须手动终止它并创建一个新终端。

看来我的代码无法正常工作。

*我可能搞乱了 main() 和 asyncio.run() 方法。 * 您知道问题可能是什么吗? 非常感谢!

最佳答案

我不是 Python 专家,但在概念层面上我可以看出,当使用常规事件中心触发器时,checkpointing still takes places ,使用a storage account :

AzureWebJobsStorage

The Azure Functions runtime uses this storage account connection string for normal operation. Some uses of this storage account include key management, timer trigger management, and Event Hubs checkpoints. The storage account must be a general-purpose one that supports blobs, queues, and tables

在引擎盖下,触发器使用 EventProcessorHost ,它类似于 EventHubConsumerClient (我想 azure 函数运行时很快就会更新以也使用 EventHubConsumerClient)。

所以,我不确定您想要实现什么目标。似乎您已将事件中心触发功能与您自己的事件中心监听器结合起来。您正在使用的 EventHubConsumerClient 将等待新的事件中心消息到达并阻止进一步执行,直到明确停止为止。这对于 azure 函数来说是行不通的,它的执行时间应该很短,默认限制为 5 分钟。例如,如果您有一个连续运行的 Azure Web 作业,则使用 EventHubConsumerClient 是有意义的。

I'm trying to develop an event-hub trigger azure function that could receive events from a first event-hub and send these events to a second event-hub.

我想说你需要一个 event hub triggered functionEvent Hub output binding将消息从一个事件中心传递到另一个事件中心。

关于python - 如何使用 EventHubConsumerClient 类运行异步 Python Event-Hub 触发器 Azure Functions?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/75200195/

相关文章:

azure - 使用 ARM 的函数应用程序上的运行时白名单 APIM IP

python - 内核统计的最大值-python

c# - 识别 Azure ACS 中的角色

python - 在 python 中替换它们之前存储定界符和定界符位置

Azure SQL PaaS 和 Azure Policy 交互

c# - Azure 上的 Microsoft.Glee.GraphViewerGdi.GViewer 引发异常

azure - 如何限制/停止公共(public)(互联网)访问Azure功能?

c# - 将所有子域添加到 CORS azure 功能

python - 如何在Python中使用sqlalchemy在查询中创建sql server表变量

python - 添加路径后未找到模块错误