azure - Azure事件中心和多个使用者组

标签 azure publish-subscribe azure-eventhub

在以下情况下,需要有关使用Azure事件中心的帮助。我认为,在这种情况下,消费者群体可能是正确的选择,但我无法在网上找到具体的例子。

这是问题的粗略描述以及使用事件中心的建议解决方案(我不确定这是否是最佳解决方案。感谢您的反馈)



我有多个事件源,这些事件源会生成大量事件数据(来自传感器的遥测数据),这些数据需要保存到我们的数据库中,并且应该并行执行一些分析(例如运行平均值,最小-最大)。

发送方只能将数据发送到单个端点,但是事件中心应使该数据可用于两个数据处理程序。

我正在考虑使用两个使用者组,第一个使用者组是一组工作人员角色实例,负责将数据保存到我们的键值存储中,第二个使用者组将是一个分析引擎(可能与Azure流分析一起使用) )。

首先,我该如何设置消费者群体,在发送者/接收者方面我需要做些什么,以便事件的副本出现在所有消费者群体中?

我确实在线阅读了许多示例,但是它们要么使用client.GetDefaultConsumerGroup();和/或使所有分区由同一工作角色的多个实例处理。

在我的场景中,触发事件时,需要由两个不同的辅助角色并行处理该事件(一个用于保存数据,另一个用于进行分析)

谢谢!

最佳答案

TLDR:看起来很合理,只需在CreateConsumerGroupIfNotExists中使用不同的名称即可创建两个Consumer Group。

消费者组主要是一个概念,因此它们的确切工作方式取决于订户的实现方式。如您所知,从概念上讲,它们是一组一起工作的订户,以便每个组接收所有消息,并且在理想情况下(不会发生),每个消息可能消耗一次。这意味着每个消费者组将“具有由同一工作人员角色的多个实例处理的所有分区”。你要这个。

这可以以不同的方式实现。 Microsoft提供了两种直接使用来自事件中心的消息的方式,还提供了使用流分析之类的功能的选项,该选项可能建立在两种直接方式的基础上。第一种是Event Hub Receiver,第二种是更高的级别是Event Processor Host

我没有直接使用Event Hub Receiver,因此此特殊注释基于此类系统的工作原理以及文档中的推测:尽管它们是createdEventHubConsumerGroups用途,但这些目的无济于事,因为这些接收者无法协调彼此。如果使用这些,您将需要(并且可以!)自己完成所有偏移量的协调和提交,这在某些情况下具有优势,例如将偏移量与计算的聚合写入同一事务中的事务性数据库。使用这些low level receivers,使用同一个Azure使用者组使用不同的逻辑使用者组可能不应该(规范性的,不切实际的建议)特别有问题,但是您应该使用不同的名称,以防万一,或者您更改为EventProcessorHosts

现在,在更有用的信息上,EventProcessorHosts可能建立在EventHubReceivers之上。它们是更高级别的工具,并且支持使多台计算机作为逻辑使用者组一起工作。下面,我在代码中包含了一个经过轻松编辑的代码段,该代码段使EventProcessorHost带有一堆注释,可以解释一些选择。

//We need an identifier for the lease. It must be unique across concurrently 
//running instances of the program. There are three main options for this. The 
//first is a static value from a config file. The second is the machine's NETBIOS
//name ie System.Environment.MachineName. The third is a random value unique per run which
//we have chosen here, if our VMs have very weak randomness bad things may happen.

string hostName = Guid.NewGuid().ToString();

//It's not clear if we want this here long term or if we prefer that the Consumer 
//Groups be created out of band. Nor are there necessarily good tools to discover 
//existing consumer groups.
NamespaceManager namespaceManager = 
    NamespaceManager.CreateFromConnectionString(eventHubConnectionString);
EventHubDescription ehd = namespaceManager.GetEventHub(eventHubPath);
namespaceManager.CreateConsumerGroupIfNotExists(ehd.Path, consumerGroupName);

host = new EventProcessorHost(hostName, eventHubPath, consumerGroupName, 
    eventHubConnectionString, storageConnectionString, leaseContainerName);
//Call something like this when you want it to start
host.RegisterEventProcessorFactoryAsync(factory)


您会注意到,我告诉Azure创建一个新的Consumer Group(如果不存在),如果不存在,您会收到一条可爱的错误消息。老实说,我不知道这样做的目的是因为它不包括存储连接字符串,该字符串在各个实例之间必须相同,以便EventProcessorHost的协调(可能是提交)正常工作。

在这里,我提供了Azure Storage Explorer租约的图片,这些租约可能是我在11月尝试的一个消费类群体的抵消额。请注意,尽管我有一个testhub和一个testhub-testcg容器,但这是由于手动命名它们。如果它们在同一个容器中,则将是“ $ Default / 0”与“ testcg / 0”之类的东西。


如您所见,每个分区只有一个Blob。我的假设是这些斑点用于两件事。第一个是Blob租约,用于在实例之间分配分区,请参见here,第二个是将分区中已提交的偏移量存储。

消费实例不是在将数据推送到消费者组,而是在一个分区中向存储系统请求某个偏移量的数据。 EventProcessorHosts是具有逻辑使用者组的一种很好的高级方法,其中每个分区一次只能由一个使用者读取每个分区,并且不会忘记逻辑使用者组在每个分区中取得的进步。

请记住,每个分区的吞吐量是经过测量的,因此,如果要最大限度地提高入口,则只能有两个速度最快的逻辑使用者。因此,您需要确保具有足够的分区和吞吐量单位,您可以:


读取您发送的所有数据。
如果您由于问题而落后几个小时,请在24小时的保留期内赶上来。


结论:消费群体就是您所需要的。您阅读的使用特定使用者组的示例很好,每个逻辑使用者组中的Azure使用者组使用相同的名称,而不同的逻辑使用者组使用不同的使用者。

我尚未使用Azure流分析,但是至少在预览版本中,您是limited to the default consumer group。因此,请勿将默认的使用者组用于其他用途,如果您需要两个单独的Azure Stream Analytics,则可能需要做一些讨厌的事情。但是很容易配置!

关于azure - Azure事件中心和多个使用者组,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/27789320/

相关文章:

Java Publisher Server 聊天程序

Azure EventHub 事件格式

c# - Azure Service Fabric Actor 初始化

azure - 使用历史记录模式在 Azure 应用服务上部署 Vue CLI

c# - 有条件地操作 StackExchange.Redis 中的哈希依赖于一些同步

java - 即使应用程序关闭也尝试保持 MQTT 客户端连接

performance - Azure 流分析太慢 - 时间值也无关紧要

sql - 当我从 Azure 流作业插入数据时,如何在 CosmosDB 中使用 GUID 作为 ID

azure - 如何使用 Automation Runbook 重命名 Azure SQL 数据库?

asp.net - Azure 不会关闭自定义错误