python - Azure Eventhub (Python) : checkpointing with blob storage - keyerror issue in EventProcessor when checkpointing is enabled

标签 python azure azure-eventhub azure-blob-storage keyerror

我在 eventhub 中遇到了 Blob 存储检查点问题。如果我在获取消费者客户端时没有设置 checkpoint_store,我的应用程序运行正常。每当我尝试设置 checkpoint_store 变量并运行我的代码时,它都会引发以下异常:

EventProcessor instance 'xxxxxxxxxxx' of eventhub <name of my eventhub> consumer group <name of my consumer group>. An error occurred while load-balancing and claiming ownership. The exception is KeyError('ownerid'). Retrying after xxxx seconds

我能找到的唯一提到这种错误的 github 条目是 this one ,但是问题本身从未得到解决,有问题的人最终使用了不同的库。

我正在使用的相关库是 azure-eventhub 和 azure-eventhub-checkpointstoreblob-aio

以下是我正在使用的代码的相关片段 ( I used this tutorial as a guide ):

import asyncio
from azure.eventhub.aio import EventHubConsumerClient, EventHubProducerClient
from azure.eventhub import EventData
from azure.eventhub.extensions.checkpointstoreblobaio import BlobCheckpointStore
async def on_event(partition_context, event):
    await partition_context.update_checkpoint(event)
    #<do stuff with event data>
checkpoint_store = BlobCheckpointStore.from_connection_string(blob_connection_string, container_name)
client = EventHubConsumerClient.from_connection_string(connection_str, consumer_group, eventhub_name=input_eventhub_name, checkpoint_store=checkpoint_store)

async def main():
  async with client:
    await client.receive(
      on_event=on_event,
    )
    print("Terminated.")

if __name__ == '__main__':
    loop = asyncio.get_event_loop()
    loop.run_until_complete(main())

问题似乎仅与 Blob 存储检查点有关;如果我在创建消费者客户端时注释掉“checkpoint_store=checkpoint_store”,则一切运行都不会出现问题。

与 Blob 存储的连接看起来很好,因为我进行了一些挖掘,发现在 Blob 存储中创建了一些文件夹“检查点”和“所有权”: blob storage snapshot 后者包含一些元数据中带有“ownerid”的文件: owner files metadata

即 key 肯定存在。我认为正在发生的情况是,EventProcessor 正在尝试获取这些 blob 的所有权元数据,但不知何故未能成功。如果有人知道如何解决这个问题,我将非常感激!

最佳答案

这看起来像是从 Blob 之一检索“ownerid”时出现的问题。您能帮我测试一下这些场景吗?

  1. 从 Blob 容器中删除所有内容并重试。
  2. 如果问题仍然存在,您能否检查每个 blob 是否都具有元数据“ownerid”?
  3. 如果问题仍然存在,您能否将库 azure-eventhub-checkpointstoreblob-aio 版本 1.1.0 中文件 azure.eventhub.extensions.checkpointstoreblobaio._blobstoragecsaio.py 的第 144 行替换为以下内容,然后重试?
"owner_id": blob.metadata.get("ownerid"),

关于python - Azure Eventhub (Python) : checkpointing with blob storage - keyerror issue in EventProcessor when checkpointing is enabled,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/63354884/

相关文章:

python - BeautifulSoup错误消息?

azure - 在 terraform 的 azure data explorer 中使用 eventhub 的默认消费者组

python - 在没有您的用户名和密码的情况下写入公开的 Google 电子表格

python - 属性错误 : Class Instance has no __call__ method

azure - 使用 SSH 公钥的 HDInsight 群集如何对多个用户可见?

azure - 使用同名 DLL 处理多个 Azure Functions

azure - 使用同一帐户 + Azure 应用程序网关的多个用户登录的 session 值更改

Azure数据工厂将EnqueuedTimeUtc转换为Unix时间戳

python-3.x - 使用 AMQP 从 Azure Eventhub 接收事件

python - 使用换行符作为 numpy.savetxt 的分隔符不起作用