python - 有没有办法将 azure Eventhub 检查点存储到远程存储桶(例如 Google 云存储桶)?

标签 python azure google-cloud-platform google-cloud-storage azure-eventhub

由于使用案例,我想将检查点从 Azure 事件中心存储到 Google 云平台存储桶,但我无法找到执行此操作的方法。

根据我对 eventhub 检查点的研究,我发现创建了一个 checkpoint_store 对象,该对象依赖于 Azure blob 存储。代码已在下面共享-

import asyncio
from azure.eventhub.aio import EventHubConsumerClient
from azure.eventhub.extensions.checkpointstoreblobaio import BlobCheckpointStore


async def on_event(partition_context, event):
    # Print the event data.
    print("Received the event: \"{}\" from the partition with ID: \"{}\"".format(event.body_as_str(encoding='UTF-8'), partition_context.partition_id))

    # Update the checkpoint so that the program doesn't read the events
    # that it has already read when you run it next time.
    await partition_context.update_checkpoint(event)

async def main():
    # Create an Azure blob checkpoint store to store the checkpoints.
    checkpoint_store = BlobCheckpointStore.from_connection_string("AZURE STORAGE CONNECTION STRING", "BLOB CONTAINER NAME")

    # Create a consumer client for the event hub.
    client = EventHubConsumerClient.from_connection_string("EVENT HUBS NAMESPACE CONNECTION STRING", consumer_group="$Default", eventhub_name="EVENT HUB NAME", checkpoint_store=checkpoint_store)
    async with client:
        # Call the receive method. Read from the beginning of the partition (starting_position: "-1")
        await client.receive(on_event=on_event,  starting_position="-1")

if __name__ == '__main__':
    loop = asyncio.get_event_loop()
    # Run the main method.
    loop.run_until_complete(main())

问题- 如何更改此方法以便能够将检查点存储到 Google 云存储桶,以便我的 eventhub 客户端在发生故障时可以从此检查点读取数据?

引用链接-

  1. Receive events
  2. Checkpoint store python client

最佳答案

消费者客户端与特定数据存储或 Azure 没有关联;它通过提供的检查点存储抽象来执行任何需要存储使用的操作。要将处理器与 Google 云存储桶结合使用,您必须实现自定义检查点存储并将其传递给您的消费者。

SDK提供了一个摘要CheckpointStore定义消费者期望的接口(interface)的类型。还有一个in_memory_checkpoint_store作为一个简化的示例,它可能有助于您入门。

关于python - 有没有办法将 azure Eventhub 检查点存储到远程存储桶(例如 Google 云存储桶)?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/68097284/

相关文章:

python - Numpy:高级索引的转置结果

python - 获取 numpy 以警告整数溢出

google-cloud-platform - 无法通过 Cloud SQL 代理连接到 MySQL

javascript - 如何解密 BigQuery 中的列?

python - 在 Django 中运行并与后台进程通信

azure - 如何更改管道变量以供在 Azure DevOps 的下一个构建中使用

azure - 我可以使用 Azure CLI 执行与 Azure Powershell 相同的操作吗? (上传VHD)

linux - Azure Batch 使用哪种 "command line"来执行 Ubuntu 上的启动任务?

google-app-engine - 应用引擎 : I'm seeing 500 status and 204 error code

python - 如何按元素检查多个 pandas DataFrame.Series 的条件并将结果应用于新列?