azure - 关于事件中心处理器中的检查点策略

标签 azure azure-eventhub event-processor-host

我使用事件中心处理器主机来接收和处理来自事件中心的事件。为了获得更好的性能,我每 3 分钟调用一次检查点,而不是每次接收事件时调用检查点:

public async Task ProcessEventAsync(context, messages)
{
 foreach (var eventData in messages)
 {
    // do something
 }

 if (checkpointStopWatth.Elapsed > TimeSpan.FromMinutes(3);
 {
     await context.CheckpointAsync();
 }
}

但问题是,如果没有新事件发送到事件中心,可能有些事件永远不会成为检查点,因为如果没有新消息,则不会调用 ProcessEventAsync。

有什么建议可以确保所有已处理的事件都进行检查点,但仍每隔几分钟检查点?

更新:根据 Sreeram 的建议,我更新了代码如下:

public async Task ProcessEventAsync(context, messages)
{
    foreach (var eventData in messages)
    {
     // do something    
    }

    this.lastProcessedEventsCount += messages.Count();

    if (this.checkpointStopWatth.Elapsed > TimeSpan.FromMinutes(3);
    {
        this.checkpointStopWatch.Restart();
        if (this.lastProcessedEventsCount > 0)
        {
            await context.CheckpointAsync();
            this.lastProcessedEventsCount = 0;
        }
    }
}

最佳答案

很好的案例 - 你正在报道!

您可能会丢失 event checkpoints (并因此 event replay )在以下两种情况下:

  1. 当您有稀疏数据流时(例如:每 5 分钟一批消息,检查点间隔为 3 分钟)且 EventProcessorHost实例由于某种原因关闭 - 您可以看到 2 minEventData - 重新处理。为了处理这个案子, 跟踪lastProcessedEvent完成后IEventProcessor.onEvents/IEventProcessor.ProcessEventsAsync当您收到关闭通知时和检查点 - IEventProcessor.onClose/IEventProcessor.CloseAsync .

  2. 可能存在这样的情况:- 特定 EventHubs partition 不再有任何事件。在这种情况下,您永远不会看到最后一个事件被检查点 - 您的 Checkpointing strategy 。然而,当您有连续的 EventData 流时,这种情况并不常见。并且您没有发送到特定的 EventHubs 分区 ( EventHubClient.send(EventData_Without_PartitionKey) )。如果您认为 - 您可能会遇到这种情况,请使用:

    EventProcessorOptions.setInvokeProcessorAfterReceiveTimeout(true); // in java or EventProcessorOptions.InvokeProcessorAfterReceiveTimeout = true; // in C#

唤醒processEventsAsync的标志每隔一段时间。然后,跟踪 LastProcessedEventDataLastCheckpointedEventData没有时判断是否检查点 Events已收到,基于EventData.SequenceNumber这些事件的属性。

关于azure - 关于事件中心处理器中的检查点策略,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/51865942/

相关文章:

azure - 在 Kusto 中使用 union 和循环运算符将多个表合并为一个表

azure - 如何使用 Azure ARM 创建公共(public)虚拟机镜像?

python - 错误: "Your deployment does not have an associated swagger.json" - ACI deployment on Stream Analytics Job

Android 使用 Azure 移动服务删除 Null 异常

azure - 删除 eventhub 和删除命名空间有什么区别?

azure - Azure 事件中心和 Azure 服务总线之间的根本区别?

azure - Azure 事件中心每秒处理 50,000 个事件的分区数和吞吐量是多少?

azure - Microsoft.Azure.EventHubs.Processor 中的异常导致 CPU 阻塞

java - Azure 事件中心偏移