python - 适用于 python 的 Azure eventhub 库

标签 python azure azure-eventhub

我正在使用 eventhub 来摄取大量事件。我有多个消费者正在一个扩展组后面运行,从具有多个分区的 eventhub 读取这些事件。我正在使用 python 浏览 Azure SDK,但对于使用什么感到困惑。有eventhubconsumerclient、eventprocessorHost ....

我想使用一个库,我的多个消费者可以使用消费者组进行连接,分区动态分配给每个消费者,并在存储帐户中进行检查点,就像我使用 kafka 的方式一样。

最佳答案

更新:

对于生产使用,我建议您应该使用稳定版本的 event hub sdk。可以使用eph,示例代码为here .

<小时/>

我可以使用pre-release eventhub 5.0.0b6使用消费者组并设置检查点。

但奇怪的是,在 blob 存储中,我可以看到为 eventhub 创建的 2 个文件夹:checkpointownership 文件夹。在文件夹内,为分区创建了 blob,但 blob 是空的。更奇怪的是,即使blob是空的,每次我从eventhub读取时,它总是读取最新的数据(意味着它永远不会读取同一消费者组中已经读取的数据)。

您需要安装azure-eventhub 5.0.0b6并使用 pip install --pre azure-eventhub-checkpointstoreblob 安装 azure-eventhub-checkpointstoreblob。对于 blob 存储,您应该安装最新的 version 12.1.0 of azure-storage-blob .

我关注这个sample 。在此示例中,它使用事件中心级别连接字符串(不是事件中心命名空间级别连接字符串)。您需要通过 nav 创建一个事件中心级别连接字符串到 azure 门户 -> 您的 eventhub 命名空间 -> 您的事件中心实例 -> 共享访问策略 -> 单击“添加” -> 然后指定策略名称,然后选择权限。如果您只想接收数据,则只能选择监听权限。截图如下:

enter image description here

创建策略后,您可以复制连接字符串,如下图所示:

enter image description here

然后您可以按照以下代码操作:

import os
from azure.eventhub import EventHubConsumerClient
from azure.eventhub.extensions.checkpointstoreblob import BlobCheckpointStore

CONNECTION_STR = 'Endpoint=sb://ivanehubns.servicebus.windows.net/;SharedAccessKeyName=saspolicy;SharedAccessKey=xxx;EntityPath=myeventhub'
STORAGE_CONNECTION_STR = 'DefaultEndpointsProtocol=https;AccountName=xx;AccountKey=xxx;EndpointSuffix=core.windows.net'


def on_event(partition_context, event):
    # do something with event
    print(event)
    print('on event')
    partition_context.update_checkpoint(event)


if __name__ == '__main__':

    #the "a22" is the blob container name
    checkpoint_store = BlobCheckpointStore.from_connection_string(STORAGE_CONNECTION_STR, "a22")

    #the "$default" is the consumer group
    client = EventHubConsumerClient.from_connection_string(
        CONNECTION_STR, "$default", checkpoint_store=checkpoint_store)

    try:
        print('ok')
        client.receive(on_event)
    except KeyboardInterrupt:
        client.close()

测试结果:

enter image description here

关于python - 适用于 python 的 Azure eventhub 库,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/59188944/

相关文章:

python - 什么是 UML 中的(python-)模块的等价物

Python:将列表分成变量

python - Google Colab - Spotipy 没有将我重定向到指定的redirect_uri

python - 无法正确识别 4 维解空间中的 Pareto 观测值

azure - 访问 Azure Keyvault 时出现问题 - DefaultAzureCredential 无法检索 token

java - JBOSS 上的此 URL 不支持 HTTP 方法 POST

azure - 目前,在 VS Code 中,我的 Azure 资源仅显示 Functions App 而没有其他内容(例如,它应该显示的其他内容是 Cosmos DB、VM)

asp.net-mvc - 事件中心错误 : Exception: Put token failed. 状态代码:401 未经授权的 TrackingId

c# - 从 EventProcessorHost 中获取数据

azure - RabbitMQ 到 Azure 事件中心 : AMQP 0. 9.1 与 AMQP 1.0 的兼容性