我正在使用 eventhub 来摄取大量事件。我有多个消费者正在一个扩展组后面运行,从具有多个分区的 eventhub 读取这些事件。我正在使用 python 浏览 Azure SDK,但对于使用什么感到困惑。有eventhubconsumerclient、eventprocessorHost ....
我想使用一个库,我的多个消费者可以使用消费者组进行连接,分区动态分配给每个消费者,并在存储帐户中进行检查点,就像我使用 kafka 的方式一样。
最佳答案
更新:
对于生产使用,我建议您应该使用稳定版本的 event hub sdk。可以使用eph,示例代码为here .
<小时/>我可以使用pre-release eventhub 5.0.0b6使用消费者组并设置检查点。
但奇怪的是,在 blob 存储中,我可以看到为 eventhub 创建的 2 个文件夹:checkpoint 和 ownership 文件夹。在文件夹内,为分区创建了 blob,但 blob 是空的。更奇怪的是,即使blob是空的,每次我从eventhub读取时,它总是读取最新的数据(意味着它永远不会读取同一消费者组中已经读取的数据)。
您需要安装azure-eventhub 5.0.0b6并使用 pip install --pre azure-eventhub-checkpointstoreblob
安装 azure-eventhub-checkpointstoreblob。对于 blob 存储,您应该安装最新的 version 12.1.0 of azure-storage-blob .
我关注这个sample 。在此示例中,它使用事件中心级别连接字符串(不是事件中心命名空间级别连接字符串)。您需要通过 nav 创建一个事件中心级别连接字符串到 azure 门户 -> 您的 eventhub 命名空间 -> 您的事件中心实例 -> 共享访问策略 -> 单击“添加” -> 然后指定策略名称,然后选择权限。如果您只想接收数据,则只能选择监听权限。截图如下:
创建策略后,您可以复制连接字符串,如下图所示:
然后您可以按照以下代码操作:
import os
from azure.eventhub import EventHubConsumerClient
from azure.eventhub.extensions.checkpointstoreblob import BlobCheckpointStore
CONNECTION_STR = 'Endpoint=sb://ivanehubns.servicebus.windows.net/;SharedAccessKeyName=saspolicy;SharedAccessKey=xxx;EntityPath=myeventhub'
STORAGE_CONNECTION_STR = 'DefaultEndpointsProtocol=https;AccountName=xx;AccountKey=xxx;EndpointSuffix=core.windows.net'
def on_event(partition_context, event):
# do something with event
print(event)
print('on event')
partition_context.update_checkpoint(event)
if __name__ == '__main__':
#the "a22" is the blob container name
checkpoint_store = BlobCheckpointStore.from_connection_string(STORAGE_CONNECTION_STR, "a22")
#the "$default" is the consumer group
client = EventHubConsumerClient.from_connection_string(
CONNECTION_STR, "$default", checkpoint_store=checkpoint_store)
try:
print('ok')
client.receive(on_event)
except KeyboardInterrupt:
client.close()
测试结果:
关于python - 适用于 python 的 Azure eventhub 库,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/59188944/