我发现后端消息处理中出现了常见的模式:
ServiceA生成大量消息。
ServiceB一次处理一条消息。
ServiceC发出对数据库或Web服务的调用,该调用从批处理中获得了可观的性能和可靠性。
在某些情况下,预先分批来自ServiceA的消息或在ServiceB中批量处理消息是不可行的,因此优先选择单独处理所有消息,直到在ServiceC进行最终调用。这要求在ServiceC调用之前执行批处理步骤。
看似理想的做法是拥有一个NServiceBus处理程序签名,该签名可以有选择地分批传递消息,例如:
public void Handle(FooMessage[] messageBatch)
{
}
在执行处理程序之前,未提交messageBatch中的任何消息。
NServiceBus本机似乎不支持此功能。我可以一次处理队列中的消息并写入内存,直到批量刷新为止。但是在这种情况下,消息是在刷新之前提交的,如果过程崩溃,我们将不为批处理中的所有消息保留传递保证。
所以问题是:由于某种我没有想到的原因,这是一种不好的模式吗?我知道知道何时刷新该批处理存在一个固有的问题,但是似乎至少有一些传输实现将消息缓冲在已经隐藏的批处理中,并且一次只能传递一次。在此级别进行批处理或为定期刷新设置简单的超时似乎可行。
是否有解决方法或我缺少的首选模式?
最佳答案
前期/免责声明:我为NServiceBus的制造商Partial Software工作。我还写了Learning NServiceBus。
历史
在我为Particular工作之前,我曾经发现自己处在确切的位置。我有一种分析类型的情况,其中12台Web服务器通过MSMQ发送相同类型的命令以表明已查看文章。这些计数需要在数据库中进行跟踪,以便可以基于视图数生成“最受欢迎”列表。但是每个页面视图的插入效果都不好,因此我介绍了服务总线。
使用表值参数一次最多可以插入50-100个,插入程序可能会从中受益,但是NServiceBus在事务中一次只给您一条消息。
为什么不使用传奇?
在NServiceBus中,对多个消息进行操作的所有对象通常都需要使用Saga。 (Saga基本上是一堆相关的消息处理程序,在处理每个消息之间保持一些存储状态。)
但是Saga必须将其数据存储在某个地方,这通常意味着一个数据库。因此,让我们比较一下:
现在使用NServiceBus,输入50条消息将意味着插入50个数据库。
假设有一个批量接收,则50条消息意味着1个数据库批量插入。
使用Sagas,50条消息意味着50次读取Saga数据+ 50次Saga数据更新,然后是单个数据库批处理插入。
因此,Saga使“持久性负担”变得更糟。
当然,您可以选择对Saga使用内存持久性。这将使您进行批处理而没有额外的持久性开销,但是如果Saga端点崩溃,则可能会丢失部分批处理。因此,如果您不愿意丢失数据,那不是一种选择。
批量接收会是什么样子?
因此,即使在几年前,我也看到了类似的东西:
// Not a real NServiceBus thing! Only exists in my imagination!
public interface IHandleMessageBatches<TMessage>
{
void Handle(TMessage[] messages);
int MaxBatchSize { get; }
}
想法是,如果消息传输可以向前窥视并看到许多可用消息,则它可以开始接收到MaxBatchSize,您将立即获得所有消息。当然,如果队列中只有1条消息,则将得到包含1条消息的数组。
问题
几年前,我坐在NServiceBus代码库中,以为我会尝试实现这一点。好吧,我失败了。当时,即使MSMQ是唯一的传输(在NServiceBus V3中),该API的结构也使得该传输代码会窥视队列并一次提取一条消息,从而在内存事件中引发了消息处理逻辑,不进行大规模的重大更改就不可能改变这一点。
最新版本中的代码更加模块化,很大程度上是因为现在支持多种消息传输。但是,仍然有一个假设是一次处理一个消息。
V6中的当前实现是在
IPushMessages
接口中。在Initialize
方法中,Core将Func<PushContext, Task> pipe
推送到传输的IPushMessages
实现中。或用英语说:“嘿,运输,当您有可用消息时,执行此操作以将其移交给Core,我们将从那里获取它。”
简而言之,这是因为NServiceBus旨在一次可靠地处理一条消息。从更详细的角度来看,批次接收变得困难的原因有很多:
在进行交易时,要接收批处理,需要处理该交易中的所有消息。如果交易量太大,这很容易失控。
消息类型可以混合在队列中。毕竟,消息类型只是标头。无法说“给我一批T型消息”。如果您收到一个批处理并且其中包含其他消息类型,该怎么办?
多个处理程序可以在相同的消息类型上运行。例如,如果消息
SuperMessage
继承了BaseMessage
,则两种类型的处理程序都可以在同一条消息上运行。考虑一批消息时,多个处理程序和多态消息处理程序的这种可能性变得非常复杂。有关多态消息的更多信息,如果批处理是
Handle(BaseMessage[] batch)
,但是传入的消息是全部都继承自BaseMessage
的不同超类型怎么办?我敢肯定,我还没有想到很多其他事情。
总而言之,将NServiceBus更改为接受批次将需要针对批次优化整个管道。单个消息(当前规范)将是一个专门的批处理,其中数组大小为1。
因此,从根本上来说,这对于更改所提供的有限商业价值而言,风险太大。
推荐建议
我发现,每条消息进行一次插入并不像我想的那样昂贵。不利的是,多个Web服务器上的多个线程试图一次写入数据库,并被困在该RPC操作中,直到完成为止。
当这些操作序列化到队列中,并且有限数量的线程处理这些消息并以数据库可以处理的速率进行数据库插入时,大多数情况下,事情往往会相当顺利地运行。
另外,请仔细考虑您在数据库中的工作。现有行的更新比插入行便宜得多。就我而言,我实际上只关心计数,不需要每个单独的页面浏览量的记录。因此,根据内容ID和5分钟的时间窗口更新记录,并更新该记录的读取计数,而不是每次读取都插入一条记录并强迫自己进行大量汇总查询,这样比较便宜。
如果这绝对行不通,则需要考虑可以在可靠性方面进行哪些折衷。您可以使用具有内存持久性的Saga,但随后您可能(很可能最终)丢失整个批次。根据您的用例,这可能是可以接受的。
您还可以使用消息处理程序写入Redis,这比数据库便宜,然后有一个Saga充当调度程序,将数据批量迁移到数据库。您可能可以使用Kafka或其他许多技术来做类似的事情。在这种情况下,由您来决定需要哪种可靠性保证并设置可以提供的可靠性工具。
关于c# - NServiceBus批处理消息,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/34296771/