nservicebus - 处理程序中的递归 Bus.Send()(事务、线程、任务)

标签 nservicebus

我有一个类似于下面的处理程序,它基本上响应命令并将一大堆命令发送到不同的队列。

    public void Handle(ISomeCommand message)
    {
        int i=0;
        while (i < 10000)
        {
            var command = Bus.CreateInstance<IAnotherCommand>();
            command.Id = i;
            Bus.Send("target.queue@d1555", command);
            i++;
        }
    }

此 block 的问题是,在循环完全完成之前,没有任何消息出现在目标队列或传出队列中。有人可以帮助我理解这种行为吗?

此外,如果我使用任务在处理程序中发送消息,如下所示,消息会立即出现。所以关于这个的两个问题,

  1. 关于立即执行基于任务的发送的解释是什么?
  2. 在消息处理程序中使用任务是否有任何影响?

    public void Handle(ISomeCommand message)
    {
        int i=0;
        while (i < 10000)
        {
            System.Threading.ThreadPool.QueueUserWorkItem((args) =>
            {
                var command = Bus.CreateInstance<IAnotherCommand>();
                command.Id = i;
                Bus.Send("target.queue@d1555", command);
                i++;
            });
        }
    }
    

非常感谢您的宝贵时间!

最佳答案

第一个问题:从队列中挑选一条消息,为它运行所有已注册的消息处理程序以及任何其他事务操作(例如写入新消息或写入数据库)在一个事务中执行。要么全部完成,要么一个都不完成。所以您看到的是预期的行为:从队列中挑选一条消息,处理 ISomeCommand 并写入 10000 个新的 IAnotherCommand 要么完全完成,要么什么都不做。要避免此行为,您可以执行以下操作之一:

  1. 将您的 NServiceBus 端点配置为非事务性

    public class EndpointConfig : IConfigureThisEndpoint, AsA_Publisher,IWantCustomInitialization
    {
        public void Init()
        {
            Configure.With()
                .DefaultBuilder()
                .XmlSerializer()
                .MsmqTransport()
                .IsTransactional(false)
                .UnicastBus();
        }
    }
    
  2. 将 IAnotherCommand 的发送包装在抑制环境事务的事务范围内。

    public void Handle(ISomeCommand message)
    { 
        using (new TransactionScope(TransactionScopeOption.Suppress)) 
        { 
            int i=0; 
            while (i < 10000) 
            { 
                var command = Bus.CreateInstance(); 
                command.Id = i; 
                Bus.Send("target.queue@d1555", command); 
                i++; 
            } 
        } 
    } 
    
  3. 通过使用 System.Threading.ThreadPool.QueueUserWorkItem 或 Task 类自己启动一个新线程,在另一个线程上发出 Bus.Send。这是可行的,因为环境事务不会自动转移到新线程。

第二个问题:使用 Tasks 或我提到的任何其他方法的后果是您没有对整个事情的事务保证。

生成了5000个IAnotherMessage突然断电了怎么处理?

如果您使用 2) 或 3),原始 ISomeMessage 将不会完成,并且会在您再次启动端点时由 NServiceBus 自动重试。最终结果:5000 + 10000 IAnotherCommands。

如果您使用 1),您将完全失去 IAnotherMessage,最终只有 5000 个 IAnotherCommand。

使用推荐的事务方式,最初的 5000 个 IAnotherCommand 将被丢弃,原始 ISomeMessage 返回队列并在端点再次启动时重试。净结果:10000 IAnotherCommands。

关于nservicebus - 处理程序中的递归 Bus.Send()(事务、线程、任务),我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/13386745/

相关文章:

c# - Nservicebus 端点命名

dependency-injection - NServiceBus 依赖注入(inject)

nhibernate - NHibernate 的 NServiceBus 警告

web-applications - 在Web应用程序中使用NServiceBus

azure - 本地 NServicebus 应用程序从 Azure ServiceBus 队列接收消息

c# - JsonConvert 和 NServiceBus 命名空间冲突

inversion-of-control - NserviceBus 属性注入(inject)

c# - NServiceBus 基本消息发布器,无需 ravenDB

c# - CQRS NServiceBus UI 反馈

nservicebus - NEventStore NServiceBus 设置