c# - 强制 EventProcessorHost 重新传递失败的 Azure 事件中心 eventData 到 IEventProcessor.ProcessEvents 方法

标签 c# .net azure azure-eventhub

该应用程序使用 .NET 4.6.1 和 Microsoft.Azure.ServiceBus.EventProcessorHost nuget package v2.0.2 ,以及它的依赖项 WindowsAzure.ServiceBus package v3.0.1处理 Azure 事件中心消息。

该应用程序具有 IEventProcessor 的实现.当从 ProcessEventsAsync 抛出未处理的异常时方法EventProcessorHost永远不会将这些消息重新发送到正在运行的 IEventProcessor 实例. (有趣的是,如果托管应用程序停止并重新启动,或者租约丢失并重新获得,它将重新发送。)

有没有办法强制导致异常的事件消息由 EventProcessorHost 重新发送?到 IEventProcessor执行?

此评论中针对几乎相同的问题提供了一种可能的解决方案:
Redeliver unprocessed EventHub messages in IEventProcessor.ProcessEventsAsync

该评论建议保留最后成功处理的事件消息的副本,并在 ProcessEventsAsync 中发生异常时使用该消息显式检查点。 .但是,在实现并测试了这样的解决方案后,EventProcessorHost仍然不重新发送。实现非常简单:

private EventData _lastSuccessfulEvent;

public async Task ProcessEventsAsync(
    PartitionContext context,
    IEnumerable<EventData> messages)
{
    try
    {
        await ProcessEvents(context, messages);     // does actual processing, may throw exception
        _lastSuccessfulEvent = messages
            .OrderByDescending(ed => ed.SequenceNumber)
            .First();
    }
    catch(Exception ex)
    {
        await context.CheckpointAsync(_lastSuccessfulEvent);
    }
}

行动中的事物分析:
enter image description here

部分日志示例可在此处获得:https://gist.github.com/ttbjj/4781aa992941e00e4e15e0bf1c45f316#file-gistfile1-txt

最佳答案

TLDR : 唯一可靠的方法将失败的一批事件重播到 IEventProcessor.ProcessEventsAsync是- Shutdown EventProcessorHost (又名 EPH)立即 - 使用 eph.UnregisterEventProcessorAsync()或通过 terminating the process - 视情况而定。这会让其他 EPH实例获取此分区的租约并从上一个检查点开始。

在解释这个之前 - 我想指出,这是一个 好问题 确实,这是我们必须为 EPH 做出的最艰难的设计选择之一.在我看来,这是一种权衡黑白:usability/supportabilityEPH框架,vs Technical-Correctness .

理想情况本来是:当用户代码在 IEventProcessorImpl.ProcessEventsAsync 中时抛出异常 - EPH图书馆不应该捕获这个。应该让这个Exception - 使进程和 crash-dump 崩溃清楚地显示了 callstack负责任的。我仍然相信-这是最technically-correct解决方案。

现状 : 契约(Contract)IEventProcessorImpl.ProcessEventsAsync API & EPH是,

  • 只要EventData可以从 EventHubs 服务接收 - 继续使用 IEventProcessorImplementation.ProcessEventsAsync 调用用户回调( EventData's ) & 如果用户回调在调用时抛出错误,通知 EventProcessorOptions.ExceptionReceived .
  • 里面的用户代码 IEventProcessorImpl.ProcessEventsAsync应该处理所有错误并合并 Retry's有必要的。 EPH不会在此回调上设置任何超时,以让用户完全控制处理时间。
  • 如果特定事件是问题的原因 - 标记 EventData具有特殊属性 - 例如:type= poison-event并重新发送到相同的 EventHub (包括指向实际事件的指针,将这些 EventData.OffsetSequenceNumber 复制到新的 EventData.ApplicationProperties 中)或将其转发到 SERVICEBUS 队列或将其存储在其他地方,基本上,识别并推迟处理中毒事件 .
  • 如果您处理了所有可能的情况并且仍然遇到 Exceptions - 捕获他们并关闭 EPHfailfast有这个异常(exception)的过程。当EPH回来 - 它将从它离开的地方开始。


  • 为什么检查点“旧事件”不起作用 (阅读 this 以了解 EPH 一般):

    幕后制作,EPH正在为每个 EventHub 消费者组分区的接收器运行一个泵 - 其工作是从给定的 checkpoint 启动接收器(如果存在)并创建 IEventProcessor 的专用实例实现然后receive来自指定的 EventHub 分区来自指定的 Offset在检查点(如果不存在 - EventProcessorOptions.initialOffsetProvider)并最终调用 IEventProcessorImpl.ProcessEventsAsync . Checkpoint的目的是为了能够可靠地开始处理消息,当 EPH process Shutsdown 和 Partition 的所有权转移到另一个 EPH实例。所以,checkpoint只会在启动 PUMP 时消耗,并且会 不是 读,一旦泵启动。

    在我写这篇文章时,EPH是在版本 2.2.10 .

    more general reading on Event Hubs...

    关于c# - 强制 EventProcessorHost 重新传递失败的 Azure 事件中心 eventData 到 IEventProcessor.ProcessEvents 方法,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/41006498/

    相关文章:

    c# - 使用edsdk 2.8拍照并直接将图像保存到电脑

    c# - DevExpress:如何更改 SearchLookUpEdit 控件的背景颜色

    c# - c# 控制台应用程序也可以充当类库吗?

    c# - 使用 char 值与 C# 中的变量名进行比较?

    azure - 在 Azure 数据工厂中形成 SFTP 连接 - 无法协商 key 交换算法

    X 轴数据时间的 C# 图表缩放问题

    c# - C# 中的局部变量

    c# - 将导致 "A potentially dangerous Request.Form value was detected..."错误的输入值列表

    c# - 需要帮助为新的 Azure 队列客户端版本设置 QueueMessageEncoding.Base64

    Azure Boards 无法添加用户