.net-core - 如何将 EventProcessorClient 配置为仅读取特定分区键(而不是分区 ID)的事件?

标签 .net-core azure-eventhub azure-sdk-.net

我有一个带有 2 个分区的事件中心,并使用以下代码使用不同的分区键向其发送事件(基于 https://github.com/Azure/azure-sdk-for-net/tree/master/sdk/eventhub/Azure.Messaging.EventHubs 中的文档)。我正在使用适用于 .NET 的 Azure.Messaging.EventHubs 库(带有 .net core 3.1)

await using (var producer = new EventHubProducerClient(connectionString, eventHubName))
{
    using EventDataBatch eventBatch = await producer.CreateBatchAsync(new CreateBatchOptions() { PartitionKey = "MyPartitionA" });

    eventBatch.TryAdd(new EventData(Encoding.UTF8.GetBytes("First")));
    eventBatch.TryAdd(new EventData(Encoding.UTF8.GetBytes("Second")));
    await produce.SendAsync(eventBatch);

    using EventDataBatch eventBatch2 = await producer.CreateBatchAsync(new CreateBatchOptions() { PartitionKey = "MyPartitionB" });

    eventBatch2.TryAdd(new EventData(Encoding.UTF8.GetBytes("Third")));
    eventBatch2.TryAdd(new EventData(Encoding.UTF8.GetBytes("Fourth")));

    await producer.SendAsync(eventBatch2);
}

如您所见,我使用分区键作为 MyPartitionA 发送了包含 2 个事件的第一批,使用分区键作为 MyPartitionB 发送了包含 2 个事件的第二批。有趣的是,来自两个分区键的事件进入同一分区(即事件中心上的分区 0)。

在接收端,我尝试使用 https://github.com/Azure/azure-sdk-for-net/tree/master/sdk/eventhub/Azure.Messaging.EventHubs.Processor#start-and-stop-processing 处的代码示例如下所示(我正在使用适用于.NET的Azure.Messaging.EventHubs.Processor库。)

async Task processEventHandler(ProcessEventArgs eventArgs)
{
    try
    {
        // Perform the application-specific processing for an event
        await DoSomethingWithTheEvent(eventArgs.Partition, eventArgs.Data);
    }
    catch
    {
        // Handle the exception from handler code
    }
}

async Task processErrorHandler(ProcessErrorEventArgs eventArgs)
{
    try
    {
        // Perform the application-specific processing for an error
        await DoSomethingWithTheError(eventArgs.Exception);
    }
    catch
    {
        // Handle the exception from handler code
    }   
}

private async Task ProcessUntilCanceled(CancellationToken cancellationToken)
{
    var storageClient = new BlobContainerClient(storageConnectionString, blobContainerName);
    var processor = new EventProcessorClient(storageClient, consumerGroup, eventHubsConnectionString, eventHubName);

    processor.ProcessEventAsync += processEventHandler;
    processor.ProcessErrorAsync += processErrorHandler;
    
    await processor.StartProcessingAsync();
    
    try
    {
        while (!cancellationToken.IsCancellationRequested)
        {
            await Task.Delay(TimeSpan.FromSeconds(1));
        }
        
        await processor.StopProcessingAsync();
    }
    finally
    {
        // To prevent leaks, the handlers should be removed when processing is complete
        processor.ProcessEventAsync -= processEventHandler;
        processor.ProcessErrorAsync -= processErrorHandler;
    }
}

我在上面的代码中找不到一种方法来仅接收给定分区(例如 MyPartitionA)的事件,而不接收来自其他分区(例如 MyPartitionB)的事件。

  1. 是否可以注册处理器以接收基于特定分区键(而不是分区 ID)的事件?
  2. 如果具有分区键 MyPartitionA 和 MyPartitionB 的事件均发送到事件中心中的分区 0,是否仍然可以仅接收单个分区键(例如 MyPartitionA)的事件,而不接收不具有相同分区的其他事件key,即使它们可能驻留在事件中心的同一分区中?

最佳答案

您无法使用 SDK 中的任何客户端读取基于分区键的事件。

分区键是一个综合概念,事件发布后不会保留该概念。当您使用分区键进行发布时,该键将被散列,结果值用于选择将事件路由到的分区;其目的是确保相关事件被路由到同一个分区,但不需要了解选择了哪个分区,并且不提供任何公平分配的保证。

要完成您想要执行的过滤,您需要将分区键存储为 application property事件,然后将该值用作 ProcessEventAsync 处理程序中的过滤器。请注意,您将接收来自所有分区的所有事件 - 这是 EventProcessorClient 的主要目标。

我认为我们对您的应用场景的背景了解不够,无法帮助确定最佳方法,但根据我们所知道的情况,我建议考虑替代方案。由于您似乎需要显式读取一组事件,因此使用其 Id 而不是 key 发布到众所周知的分区可能会有所帮助。然后,您将能够使用 EventHubConsumerClient::ReadEventsFromPartitionAsync 专门从该分区读取事件。方法。当然,这还需要您显式控制应用程序中发布其他事件的位置,以确保它们被路由到您的第二个分区。

关于.net-core - 如何将 EventProcessorClient 配置为仅读取特定分区键(而不是分区 ID)的事件?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/63762483/

相关文章:

apache-spark - 将包含 JSON 字符串的列拆分为每个包含字符串中的一个键值对的列

c# - HDInsight SDK [用于Hadoop的Microsoft .NET SDK]

mysql - MySql 中多对多关系的问题

c# - EventHubs Azure 上的时间序列数据

odbc - AWS RedShift - .NET Core(ODBC 支持?)

azure - Elk Logstash 无法连接到事件中心 Azure

azure-storage - 从 Uri 和连接字符串创建 Azure BlobClient

azure - 将 Sitefinity 6.3(Windows Azure Webproject)升级到 Sitefinity 7.0 后出现问题

asp.net-core - WebHost 从 appsettings.json 读取哪些值

c# - 无法为基本属性翻译 LINQ 表达式