c# - 将消息定向给消费者

标签 c# azure azure-eventhub

我的客户端正在尝试向接收者发送消息。但是我注意到接收者有时没有收到客户端发送的所有消息,因此丢失了一些消息(不确定问题出在哪里?客户端还是接收者)。 关于为什么会发生这种情况的任何建议。这就是我目前正在做的事情

在接收方,这就是我正在做的事情。

这是事件处理器

        async Task IEventProcessor.ProcessEventsAsync(PartitionContext context, IEnumerable<EventData> messages)
        {
            foreach (var eventData in messages)
            {
                var data = Encoding.UTF8.GetString(eventData.Body.Array, eventData.Body.Offset, eventData.Body.Count);
            }
        }

这是客户端连接到事件中心的方式

var StrBuilder = new EventHubsConnectionStringBuilder(eventHubConnectionString)
{
 EntityPath = eventHubName,
};
this.eventHubClient = EventHubClient.CreateFromConnectionString(StrBuilder.ToString());

如何将消息定向给特定消费者

最佳答案

我正在使用来自 eventhub 官方文档的示例代码,用于 sendingreceiving .

我有 2 个消费者组:$Defaultnewcg。假设您有 2 个客户端,client_1 使用默认消费组($Default),client_2 使用另一个消费组(newcg)

首先,创建发送客户端后,在 SendMessagesToEventHub 方法中,我们需要添加一个带有值的属性。该值应该是消费者组名称。示例代码如下:

    private static async Task SendMessagesToEventHub(int numMessagesToSend)
    {
        for (var i = 0; i < numMessagesToSend; i++)
        {
            try
            {
                var message = "444 Message";
                Console.WriteLine($"Sending message: {message}");
                EventData mydata = new EventData(Encoding.UTF8.GetBytes(message));

                //here, we add a property named "cg", it's value is the consumer group. By setting this property, then we can read this message via this specified consumer group.
                mydata.Properties.Add("cg", "newcg");

                await eventHubClient.SendAsync(mydata);

            }
            catch (Exception exception)
            {
                Console.WriteLine($"{DateTime.Now} > Exception: {exception.Message}");
            }

            await Task.Delay(10);
        }

        Console.WriteLine($"{numMessagesToSend} messages sent.");
    }

然后在client_1中,创建接收器项目后,它使用默认消费者组($Default) -> 在SimpleEventProcessor类中->ProcessEventsAsync方法,我们可以过滤掉不需要的事件数据。 ProcessEventsAsync 方法的示例代码:

        public Task ProcessEventsAsync(PartitionContext context, IEnumerable<EventData> messages)
        {
            foreach (var eventData in messages)
            {
                //filter the data here
                if (eventData.Properties["cg"].ToString() == "$Default")
                {                    
                    var data = Encoding.UTF8.GetString(eventData.Body.Array, eventData.Body.Offset, eventData.Body.Count);

                    Console.WriteLine($"Message received. Partition: '{context.PartitionId}', Data: '{data}'");
                    Console.WriteLine(context.ConsumerGroupName);
                }
            }

            return context.CheckpointAsync();
        }

在另一个客户端中,例如 client_2,它使用另一个消费者组,例如其名称为 newcg,我们可以按照 client_1 中的步骤进行操作,只需在 ProcessEventsAsync 中进行一些更改> 方法如下:

            public Task ProcessEventsAsync(PartitionContext context, IEnumerable<EventData> messages)
            {
                foreach (var eventData in messages)
                {
                    //filter the data here, using another consumer group name
                    if (eventData.Properties["cg"].ToString() == "newcg")
                    {  
                       //other code
                    }
                   }

                 return context.CheckpointAsync();
               }

关于c# - 将消息定向给消费者,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/59014664/

相关文章:

azure - 如何从 Eventhub 中删除事件

c# - WPF 延迟绑定(bind)无法正常工作

C# Windows 应用程序 - 许多线程使用相同的连接?

c# - C# 中的静态链接?

c# - 我可以使用 LINQ 跳过集合并仅返回 100 条记录吗?

azure - 在 Azure Databricks 中反序列化事件中心消息

c# - 如何使用 C# 仅返回包含符号、字母和数字的大字符串中的数字

c# - 为什么我们不需要返回 `using` 范围之外的值?

python - Azure函数应用程序: Microsoft. Azure.WebJobs.EventHubs : Value cannot be null.(参数 'receiverConnectionString')

azureservicebus - 无法从服务总线资源管理器连接到事件中心