c# - 如何使用 Masstransit 测试基于命令和事件的系统

标签 c# masstransit

我有一个命令处理程序,它调用域对象上的操作,该操作执行后又会触发一个事件。我想测试事件处理程序在发送相应命令时是否接收到事件(见下文,为简洁起见省略了一些代码)。事件处理程序 (MyEventConsumer.Consume) 永远不会被调用,即使事件消息发布在总线上(在本例中为环回总线)。有什么想法吗?

//Test
[TestFixture]
public class TestSendCommandReceiveEvent
{
    [Given]
    public void installation_of_infrastructure_objects()
    {
        container.Register(Component.For<MyEventConsumer>().UsingFactoryMethod(() => new MyEventConsumer(_received)));
        container.Register(
        Component.For<IServiceBus>()
        .UsingFactoryMethod(() => ServiceBusFactory.New(x => { x.ReceiveFrom("loopback://localhost/mt_client"); x.Subscribe(conf => conf.LoadFrom(container));                                                      })));
    }

    [When]
    public void sending_a_command()
    {
         var LocalBus = container.Resolve<IServiceBus>();
         LocalBus.Publish(new DoSomething(_aggregateId));
    }
    [Then]
    public void corresponding_event_should_be_received_by_consumer()
    {
        _received.WaitOne(5000).ShouldBeTrue();
    }
}
public class MyEventConsumer : Consumes<SomethingDone>.All
{
     private readonly ManualResetEvent _received;
     public MyEventConsumer(ManualResetEvent received)
     {
         _received = received;
     }
     public void Consume(SomethingDone message)
     {
         _received.Set();
     }
}

//Command handler
public class DoSomethingCommandHandler : Consumes<DoSomething>.All where T:class
{
    public void Consume(DoSomething message)
    {
       var ar = Repository.GetById<SomeAR>(message.ArId);
       ar.DoSomething();
       Repository.Save(ar, Guid.NewGuid(), null);
    }
}
//Domain object
public class SomeDomainObject : AggregateBase
{
    public void DoSomething()
    {
       RaiseEvent(new SomethingDone(Id, 1));
    }
}

最佳答案

这对我来说是:

// Copyright 2012 Henrik Feldt
//  
// Licensed under the Apache License, Version 2.0 (the "License"); you may not use 
// this file except in compliance with the License. You may obtain a copy of the 
// License at 
// 
//     http://www.apache.org/licenses/LICENSE-2.0 
// 
// Unless required by applicable law or agreed to in writing, software distributed 
// under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR 
// CONDITIONS OF ANY KIND, either express or implied. See the License for the 
// specific language governing permissions and limitations under the License.

using System;
using System.Threading;
using Castle.MicroKernel.Registration;
using Castle.Windsor;
using Magnum.Extensions;
using Magnum.TestFramework;
using MassTransit;
using NUnit.Framework;

namespace ConsoleApplication11
{
    [TestFixture]
    public class TestSendCommandReceiveEvent
    {
        ManualResetEventSlim _received = new ManualResetEventSlim(false);
        IWindsorContainer _container;

        [Given]
        public void installation_of_infrastructure_objects()
        {
            _container = new WindsorContainer();
            _container.Register(
                Component.For<IServiceBus>()
                    .UsingFactoryMethod(() => ServiceBusFactory.New(x =>
                        {
                            x.ReceiveFrom("loopback://localhost/mt_client");
                            x.Subscribe(conf =>
                                {
                                    conf.Consumer(() => new MyEventConsumer(_received));
                                    conf.Consumer(() => new MyCmdConsumer());
                                });
                        })));

            when();
        }

        public void when()
        {
            var localBus = _container.Resolve<IServiceBus>();
            // wait for startup
            localBus.Endpoint.InboundTransport.Receive(c1 => c2 => { }, 1.Milliseconds()); 

            localBus.Publish(new DoSomething());
        }

        [Then]
        public void corresponding_event_should_be_received_by_consumer()
        {
            _received.Wait(5000).ShouldBeTrue();
        }
    }

    [Serializable]
    public class DoSomething
    {
    }

    [Serializable]
    public class SomethingDone
    {
    }

    public class MyEventConsumer : Consumes<SomethingDone>.All
    {
        readonly ManualResetEventSlim _received;

        public MyEventConsumer(ManualResetEventSlim received)
        {
            _received = received;
        }

        public void Consume(SomethingDone message)
        {
            _received.Set();
        }
    }

    public class MyCmdConsumer : Consumes<DoSomething>.Context
    {
        public void Consume(IConsumeContext<DoSomething> ctx)
        {
            Console.WriteLine("consumed cmd");
            ctx.Bus.Publish(new SomethingDone());
        }
    }
}

关于c# - 如何使用 Masstransit 测试基于命令和事件的系统,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/9061708/

相关文章:

C# 函数引用重载方法

C# mysql executenonqueryasync 不是异步的

rabbitmq - 公共(public)交通 : ensure message processing order when there are different message types

rabbitmq - MassTransit 丢失消息 - Rabbit MQ - 当发布者和消费者端点名称相同时,

c# - DI 与大众运输消费者和简单注入(inject)器的问题

c# - 在绑定(bind)字符串上使用时无法使简单转换器工作

c# - 我可以停止服务引用生成 ArrayOfString 而不是 string[] 吗?

c# - 监听另一个应用程序的退出

validation - MassTransit 3.2.1 - 验证

c# - 为什么 MassTransit 不将消息推送到 MSMQ,除非有订阅者?这是如何解决的?