我有一个类似于下面的处理程序,它基本上响应命令并将一大堆命令发送到不同的队列。
public void Handle(ISomeCommand message)
{
int i=0;
while (i < 10000)
{
var command = Bus.CreateInstance<IAnotherCommand>();
command.Id = i;
Bus.Send("target.queue@d1555", command);
i++;
}
}
此 block 的问题是,在循环完全完成之前,没有任何消息出现在目标队列或传出队列中。有人可以帮助我理解这种行为吗?
此外,如果我使用任务在处理程序中发送消息,如下所示,消息会立即出现。所以关于这个的两个问题,
- 关于立即执行基于任务的发送的解释是什么?
在消息处理程序中使用任务是否有任何影响?
public void Handle(ISomeCommand message) { int i=0; while (i < 10000) { System.Threading.ThreadPool.QueueUserWorkItem((args) => { var command = Bus.CreateInstance<IAnotherCommand>(); command.Id = i; Bus.Send("target.queue@d1555", command); i++; }); } }
非常感谢您的宝贵时间!
最佳答案
第一个问题:从队列中挑选一条消息,为它运行所有已注册的消息处理程序以及任何其他事务操作(例如写入新消息或写入数据库)在一个事务中执行。要么全部完成,要么一个都不完成。所以您看到的是预期的行为:从队列中挑选一条消息,处理 ISomeCommand 并写入 10000 个新的 IAnotherCommand 要么完全完成,要么什么都不做。要避免此行为,您可以执行以下操作之一:
将您的 NServiceBus 端点配置为非事务性
public class EndpointConfig : IConfigureThisEndpoint, AsA_Publisher,IWantCustomInitialization { public void Init() { Configure.With() .DefaultBuilder() .XmlSerializer() .MsmqTransport() .IsTransactional(false) .UnicastBus(); } }
将 IAnotherCommand 的发送包装在抑制环境事务的事务范围内。
public void Handle(ISomeCommand message) { using (new TransactionScope(TransactionScopeOption.Suppress)) { int i=0; while (i < 10000) { var command = Bus.CreateInstance(); command.Id = i; Bus.Send("target.queue@d1555", command); i++; } } }
通过使用 System.Threading.ThreadPool.QueueUserWorkItem 或 Task 类自己启动一个新线程,在另一个线程上发出 Bus.Send。这是可行的,因为环境事务不会自动转移到新线程。
第二个问题:使用 Tasks 或我提到的任何其他方法的后果是您没有对整个事情的事务保证。
生成了5000个IAnotherMessage突然断电了怎么处理?
如果您使用 2) 或 3),原始 ISomeMessage 将不会完成,并且会在您再次启动端点时由 NServiceBus 自动重试。最终结果:5000 + 10000 IAnotherCommands。
如果您使用 1),您将完全失去 IAnotherMessage,最终只有 5000 个 IAnotherCommand。
使用推荐的事务方式,最初的 5000 个 IAnotherCommand 将被丢弃,原始 ISomeMessage 返回队列并在端点再次启动时重试。净结果:10000 IAnotherCommands。
关于nservicebus - 处理程序中的递归 Bus.Send()(事务、线程、任务),我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/13386745/