Azure 事件中心 : de-mux and aggregate events from all partitions of eventhub using IEventProcessor

标签 azure akka-stream azure-eventhub

我有 1 个带有 2 个分区的 eventhub,我想聚合我的数据一分钟并将该数据保存到数据库,我正在使用 IEventProcessor 从 eventhub 读取事件。

我能够将数据按原样保存到数据库,但是当我聚合数据时,我每分钟得到 2 个条目,而不是 1 个。我认为原因是 IEventProcessor 运行两次,即每次针对 eventhub 中的分区。

有什么方法可以在从 eventhub 读取数据时实现一分钟的流数据聚合,然后保存到数据库吗? (我无法使用流分析,因为我有 protobuf 格式的数据。)

最佳答案

您可以使用Azure IoTHub React Java 和 Scala API,它提供了一个包含来自所有 EventHub 分区的事件的合并 react 流。

从您的角度来看,无论 EventHub 中的分区数量有多少,您都只会看到一个数据流,并且如果需要,您也可以选择分区的子集。

These samples展示 API 是如何工作的,它应该会让你的任务变得非常简单。您需要定义“Sink”,它将是将事件写入数据库的方法,并链接提供的“Source”,例如:

val eventHubRecords = IoTHub().source(java.time.Instant.now())

val myDatabase = Sink.foreach[MessageFromDevice] {
  m ⇒ MyDB.writeRecord(m)
}

eventHubRecords.to(myDatabase).run()

这里是configuration settings ,检查点支持 Cassandra 和 AzureBlob。

注意:该项目以 Azure IoT 命名,但您可以将其用于 EventHub,如果您有任何问题,请告诉我。

关于Azure 事件中心 : de-mux and aggregate events from all partitions of eventhub using IEventProcessor,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/42078355/

相关文章:

google-cloud-dataflow - 是否可以使用 Apache Beam/Google Cloud Dataflow 从 Azure EventHub 主题进行消费?

c# - 改进代码以根据上次消息日期从事件中心读取消息

c# - 在 Windows Azure 中存储 Blob 在数据库中存储附加数据,例如内容类型

c# - 绑定(bind)到 CosmosDB 输入时需要“Id”

azure - 使用 Terraform 和函数扩展版本 4 创建函数应用程序

sql-server - SQL Azure 仅返回 sys.dm_exec_sessions 中的一行

elasticsearch - 在 ES : index name is only evaluated on starting execution 中使用 Alpakka 索引的 Akka Streams

java - Akka grpc 可以消费非 Akka 服务的数据吗

prometheus - Akka Stream 和 Kamon-Prometheus 不返回任何指标但加载一个空页面

azure - 事件中心的入站 IP 是什么?