该应用程序使用 .NET 4.6.1 和 Microsoft.Azure.ServiceBus.EventProcessorHost nuget package v2.0.2 ,以及它的依赖项 WindowsAzure.ServiceBus package v3.0.1处理 Azure 事件中心消息。
该应用程序具有 IEventProcessor
的实现.当从 ProcessEventsAsync
抛出未处理的异常时方法EventProcessorHost
永远不会将这些消息重新发送到正在运行的 IEventProcessor
实例. (有趣的是,如果托管应用程序停止并重新启动,或者租约丢失并重新获得,它将重新发送。)
有没有办法强制导致异常的事件消息由 EventProcessorHost
重新发送?到 IEventProcessor
执行?
此评论中针对几乎相同的问题提供了一种可能的解决方案:
Redeliver unprocessed EventHub messages in IEventProcessor.ProcessEventsAsync
该评论建议保留最后成功处理的事件消息的副本,并在 ProcessEventsAsync
中发生异常时使用该消息显式检查点。 .但是,在实现并测试了这样的解决方案后,EventProcessorHost
仍然不重新发送。实现非常简单:
private EventData _lastSuccessfulEvent;
public async Task ProcessEventsAsync(
PartitionContext context,
IEnumerable<EventData> messages)
{
try
{
await ProcessEvents(context, messages); // does actual processing, may throw exception
_lastSuccessfulEvent = messages
.OrderByDescending(ed => ed.SequenceNumber)
.First();
}
catch(Exception ex)
{
await context.CheckpointAsync(_lastSuccessfulEvent);
}
}
行动中的事物分析:
部分日志示例可在此处获得:https://gist.github.com/ttbjj/4781aa992941e00e4e15e0bf1c45f316#file-gistfile1-txt
最佳答案
TLDR : 唯一可靠的方法将失败的一批事件重播到 IEventProcessor.ProcessEventsAsync
是- Shutdown
EventProcessorHost
(又名 EPH
)立即 - 使用 eph.UnregisterEventProcessorAsync()
或通过 terminating the process - 视情况而定。这会让其他 EPH
实例获取此分区的租约并从上一个检查点开始。
在解释这个之前 - 我想指出,这是一个 好问题 确实,这是我们必须为 EPH
做出的最艰难的设计选择之一.在我看来,这是一种权衡黑白:usability
/supportability
的EPH
框架,vs Technical-Correctness
.
理想情况本来是:当用户代码在 IEventProcessorImpl.ProcessEventsAsync
中时抛出异常 - EPH
图书馆不应该捕获这个。应该让这个Exception
- 使进程和 crash-dump
崩溃清楚地显示了 callstack
负责任的。我仍然相信-这是最technically-correct
解决方案。
现状 : 契约(Contract)IEventProcessorImpl.ProcessEventsAsync
API & EPH
是,
EventData
可以从 EventHubs 服务接收 - 继续使用 IEventProcessorImplementation.ProcessEventsAsync
调用用户回调( EventData's
) & 如果用户回调在调用时抛出错误,通知 EventProcessorOptions.ExceptionReceived
. IEventProcessorImpl.ProcessEventsAsync
应该处理所有错误并合并 Retry's
有必要的。 EPH
不会在此回调上设置任何超时,以让用户完全控制处理时间。 EventData
具有特殊属性 - 例如:type= poison-event
并重新发送到相同的 EventHub
(包括指向实际事件的指针,将这些 EventData.Offset
和 SequenceNumber
复制到新的 EventData.ApplicationProperties
中)或将其转发到 SERVICEBUS 队列或将其存储在其他地方,基本上,识别并推迟处理中毒事件 . Exceptions
- 捕获他们并关闭 EPH
或 failfast
有这个异常(exception)的过程。当EPH
回来 - 它将从它离开的地方开始。为什么检查点“旧事件”不起作用 (阅读 this 以了解
EPH
一般):幕后制作,
EPH
正在为每个 EventHub 消费者组分区的接收器运行一个泵 - 其工作是从给定的 checkpoint
启动接收器(如果存在)并创建 IEventProcessor
的专用实例实现然后receive
来自指定的 EventHub 分区来自指定的 Offset
在检查点(如果不存在 - EventProcessorOptions.initialOffsetProvider
)并最终调用 IEventProcessorImpl.ProcessEventsAsync
. Checkpoint
的目的是为了能够可靠地开始处理消息,当 EPH
process Shutsdown 和 Partition 的所有权转移到另一个 EPH
实例。所以,checkpoint
只会在启动 PUMP 时消耗,并且会 不是 读,一旦泵启动。在我写这篇文章时,
EPH
是在版本 2.2.10 .more general reading on Event Hubs...
关于c# - 强制 EventProcessorHost 重新传递失败的 Azure 事件中心 eventData 到 IEventProcessor.ProcessEvents 方法,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/41006498/