.net - 通过Unity的同一条消息的多个使用者在MassTransit中不起作用

标签 .net unity-container esb masstransit command-pattern

我最近遇到了很多问题,因为 MassTransit.UnityIntegration 软件包中似乎有一个错误,主要是由于未考虑注册名称的事实。

例如,如果我像这样注册我的类(class):

var container = new UnityContainer()
    .RegisterType<Consumes<Command1>.All, Handler1>("Handler1")
    .RegisterType<Consumes<Command1>.All, Handler3>("Handler3");

几行后,我使用LoadFrom扩展方法来在容器中获取已注册的使用者,如下所示:
IServiceBus massTransitBus = ServiceBusFactory.New(_sbc =>
    {
        _sbc.UseBinarySerializer();
        _sbc.UseControlBus();
        _sbc.ReceiveFrom("msmq://localhost/MyQueue");
        _sbc.UseMsmq(_x =>
            {
                _x.UseSubscriptionService("msmq://localhost/mt_subscriptions");
                _x.VerifyMsmqConfiguration();
            });
        _sbc.Subscribe(_s => _s.LoadFrom(container));
    });

发生的事情是,当关联的消息到达总线时,永远不会调用我的处理程序。

经过一会儿的思考,我决定看一下实现,并且很清楚为什么会发生这种情况:

这是LoadFrom方法内部的主要代码:
public static void LoadFrom(this SubscriptionBusServiceConfigurator configurator, IUnityContainer container)
{
    IList<Type> concreteTypes = FindTypes<IConsumer>(container, x => !x.Implements<ISaga>());
    if (concreteTypes.Count > 0)
    {
        var consumerConfigurator = new UnityConsumerFactoryConfigurator(configurator, container);

        foreach (Type concreteType in concreteTypes)
            consumerConfigurator.ConfigureConsumer(concreteType);
    }

    ...

}

请注意,它仅查找类型,并且不向前传递名称的任何信息。这是FindTypes<T>的实现:
static IList<Type> FindTypes<T>(IUnityContainer container, Func<Type, bool> filter)
{
    return container.Registrations
                        .Where(r => r.MappedToType.Implements<T>())
                        .Select(r => r.MappedToType)
                        .Where(filter)
                        .ToList();
}

进行几次间接操作后,一切都落到了UnityConsumerFactory<T>类内部的这一行上,该行实际上创建了使用者的实例:
var consumer = childContainer.Resolve<T>();

当存在多个注册时,这绝对不能与Unity一起使用,因为在Unity中注册(然后解析)多个实现的唯一方法是在RegisterType调用中为其命名,然后在Resolve调用中指定该名称。

也许我在所有这一切中都缺少了一些基本的东西,而错误在我身上呢? MassTransit Unity组件的源可以在here中找到。我不研究其他容器的代码,因为我对它们不熟悉,但是我认为这已经通过某种方式处理了?我认为在同一个容器中有多个消费者使用同一个消息类型实际上是很常见的。

在这种特定情况下,最好不仅传递容器中注册的Type,而且传递用于注册的名称。

更新

好了,Travis花了一些时间来解释这个问题,这个问题现在更加清楚了。我应该早些注意到它。

看来我应该直接注册类型,以便在工厂内正确解析它们,如下所示:
var container = new UnityContainer()
    .RegisterType<Handler1>()
    .RegisterType<Handler3>();

使用这种方法,我也可以省略注册名称,因为现在它们在容器中的构建 key 是不同的。

好吧,如果这是我们的真实情况,那么这将完美地工作,但事实并非如此。让我解释一下我们到底在做什么:

在开始使用MassTransit之前,我们已经有一个用于命令模式的接口(interface),称为ICommandHandler<TCommand>,其中TCommand是系统中命令的基本模型。当我们开始考虑使用服务总线时,从一开始就很清楚地知道,以后应该可以很容易地切换到另一种服务总线实现。考虑到这一点,我开始在我们的命令界面上创建一个抽象,使其表现像MT期望的消费者之一。这是我想出的:
public class CommandHandlerToConsumerAdapter<T> : Consumes<T>.All
    where T : class, ICommand
{
    private readonly ICommandHandler<T> _commandHandler;

    public CommandHandlerToConsumerAdapter(ICommandHandler<T> commandHandler)
    {
        _commandHandler = commandHandler;
    }

    public void Consume(T _message)
    {
        _commandHandler.Handle(_message);
    }
}

这是一个非常简单的适配器类。它接收ICommandHandler<T>实现,并使它的行为类似于Consumes<T>.All实例。遗憾的是MT required message models to be classes,因为我们的命令没有该约束,但这是一个小麻烦,我们继续将where T : class约束添加到我们的接口(interface)中。

然后,由于我们的处理程序接口(interface)已经在容器中注册,因此只需在此适配器实现中注册MT接口(interface),然后让容器在其之上注入(inject)实际的实现即可。例如,一个更现实的示例(直接从我们的代码库中获取):
.RegisterType<ICommandHandler<ApplicationInstallationCommand>, CommandRecorder>("Recorder")
.RegisterType<ICommandHandler<ApplicationInstallationCommand>, InstallOperation>("Executor")
.RegisterType<Consumes<ApplicationInstallationResult>.All, CommandHandlerToConsumerAdapter<ApplicationInstallationResult>>()
.RegisterType<Consumes<ApplicationInstallationCommand>.All, CommandHandlerToConsumerAdapter<ApplicationInstallationCommand>>
  ("Recorder", new InjectionConstructor(new ResolvedParameter<ICommandHandler<ApplicationInstallationCommand>>("Recorder")))
.RegisterType<Consumes<ApplicationInstallationCommand>.All, CommandHandlerToConsumerAdapter<ApplicationInstallationCommand>>
  ("Executor", new InjectionConstructor(new ResolvedParameter<ICommandHandler<ApplicationInstallationCommand>>("Executor")))

因为我们现在有两个使用同一条消息的使用者,所以命名的注册有些复杂,但是是必不可少的。尽管没有我们希望的那么干净,但是我们可以忍受,因为这会促进我们的代码与MassTransit特定逻辑的巨大脱钩:适配器类在单独的程序集中,仅由系统的最后一层引用,用于容器注册。 。这似乎是一个很好的主意,但是容器集成类背后的查找逻辑现在不支持这种方法。

请注意,由于中间有一个通用适配器类,因此我无法在此处注册具体的类。

更新2:

在遵循Travis的建议之后,我尝试了这个简单的代码,该代码也不起作用(我似乎看不出原因,因为它看起来非常有效)。这是明确的消费者工厂注册,没有任何自动容器集成:
_sbc.Consume(() => container.resolve<Consumes<ApplicationInstallationCommand>.All>("Recorder"))

该resolve调用正确地给了我以前注册的CommandHandlerToConsumerAdapter<ApplicationInstallationCommand>实例,该实例实现了Consumes<ApplicationInstallationCommand>.All,该实例又应成为受支持的THE基本接口(interface)之一。在此之后立即发布ApplicationInstallationCommand无效,就好像处理程序无效或类似。

但这可以工作:
_sbc.Consume(() => (CommandHandlerToConsumerAdapter<ApplicationInstallationCommand>) container.resolve<Consumes<ApplicationInstallationCommand>.All>("Recorder"))

显然,API的深层内容是以一种非通用的方式来处理编译类型,而不是将其自身基于通用的接口(interface)。

我的意思是...这是可行的,但是注册代码由于没有明显的原因而变得困惑(由于我认为MT的“非标准实现细节”)。也许我只是在这里抓稻草?也许所有这些都归结为“为什么MT不接受它自己的,已经通用的接口(interface)?”为什么即使在编译时也将我传递给它的实例键入为Consumes<X>.All,在编译时仍需要具体类型来查看它是消息处理程序?

更新3:

在下面与Travis讨论之后,我决定完全删除UnityIntegration程序集,并在订阅上进行独立的Consumer调用。

我在我们的MassTransit特定程序集中创建了一个小型扩展类,以简化操作:
public static class CommandHandlerEx
{
    public static CommandHandlerToConsumerAdapter<T> ToConsumer<T>(this ICommandHandler<T> _handler)
        where T : class, ICommand
    {
        return new CommandHandlerToConsumerAdapter<T>(_handler);
    }
}

最后注册这样的处理程序:
var container = new UnityContainer()
    .RegisterType<ICommandHandler<ApplicationInstallationCommand>, CommandRecorder>("Recorder")
    .RegisterType<ICommandHandler<ApplicationInstallationCommand>, InstallOperation>("Executor");

IServiceBus massTransitBus = ServiceBusFactory.New(_sbc =>
    {
        _sbc.UseBinarySerializer();
        _sbc.UseControlBus();
        _sbc.ReceiveFrom("msmq://localhost/MyQueue");
        _sbc.UseMsmq(_x =>
            {
                _x.UseSubscriptionService("msmq://localhost/mt_subscriptions");
                _x.VerifyMsmqConfiguration();
            });
        _sbc.Subscribe(RegisterConsumers);
    });

private void RegisterConsumers(SubscriptionBusServiceConfigurator _s)
{
    _s.Consumer(() => container.Resolve<ICommandHandler<ApplicationInstallationCommand>>("Recorder").ToConsumer());
    _s.Consumer(() => container.Resolve<ICommandHandler<ApplicationInstallationCommand>>("Executor").ToConsumer());
}

在昨天花费了一整天时间尝试解决问题之后,强烈建议您不要使用容器扩展程序集,如果您希望将预期的行为排除在容器之外,并且/或者如果要自定义类等(例如,我做了解耦的话)我们来自MT特定代码的消息传递类)的主要原因有两个:
  • 扩展中的逻辑遍历容器中的注册以查找使用者类。我认为这是糟糕的设计。如果某人想要从容器中实现,则应仅在其接口(interface)上调用ResolveResolveAll(或非Unity术语中的等效项),而不必关心确切注册的内容及其具体类型。假设容器可以返回未显式注册的类型,则这会对代码产生严重后果。幸运的是,这些类并非如此,但是我们有一个容器扩展,可以根据构建键自动创建装饰器类型,并且不需要在容器上显式注册它们。
  • 消费者注册使用MappedToType实例上的ContainerRegistration属性来调用容器上的Resolve。这在任何情况下都是完全错误的,而不仅仅是在MassTransit的上下文中。 Unity中的类型要么注册为映射(如上述摘录中的FromTo组件),要么直接注册为单个具体类型。在两种情况下,逻辑都应使用RegisteredType类型从容器中解析。现在的工作方式是,如果您碰巧使用其接口(interface)注册了处理程序,则MT将完全绕过您的注册逻辑,而是对具体类型works in Unity out of the box调用resolve,这可能会导致不可预测的行为,因为您认为它应该是单例就像您注册的一样,但是最终它变成了一个临时对象(默认值)。

  • 现在回头看,我可以看到它比我最初相信的要复杂得多。在此过程中也学习了很多东西,所以很好。

    更新4:

    昨天,我决定在进行最终 checkin 之前对整个适配器方法进行一些重构。我也使用MassTransit的接口(interface)模式来创建我的适配器,因为我认为这是一种非常不错且干净的语法。

    结果如下:
    public sealed class CommandHandlerToConsumerAdapter<T>
        where T : class, ICommand
    {
        public sealed class All : Consumes<T>.All
        {
            private readonly ICommandHandler<T> m_commandHandler;
    
            public All(ICommandHandler<T> _commandHandler)
            {
                m_commandHandler = _commandHandler;
            }
    
            public void Consume(T _message)
            {
                m_commandHandler.Handle(_message);
            }
        }
    }
    

    不幸的是,由于引用的Magnum库中的实用程序方法(称为ToShortTypeName的扩展方法)上未处理的异常,这破坏了MassTransit的代码。

    这是异常(exception):

    at System.String.Substring(Int32 startIndex, Int32 length)
    at Magnum.Extensions.ExtensionsToType.ToShortTypeName(Type type)
    at MassTransit.Pipeline.Sinks.ConsumerMessageSink2.<>c__DisplayClass1.<Selector>b__0(IConsumeContext1 context) in d:\BuildAgent-02\work\aa063b4295dfc097\src\MassTransit\Pipeline\Sinks\ConsumerMessageSink.cs:line 51 at MassTransit.Pipeline.Sinks.InboundConvertMessageSink`1.<>c__DisplayClass2.<>c__DisplayClass4.b__1(IConsumeContext x) in d:\BuildAgent-02\work\aa063b4295dfc097\src\MassTransit\Pipeline\Sinks\InboundConvertMessageSink.cs:line 45 at MassTransit.Context.ServiceBusReceiveContext.DeliverMessageToConsumers(IReceiveContext context) in d:\BuildAgent-02\work\aa063b4295dfc097\src\MassTransit\Context\ServiceBusReceiveContext.cs:line 162

    最佳答案

    尽管我不了解Unity与所有容器的集成,但是您必须将使用者注册为容器中的具体类型,而不是Consumes<>接口(interface)。我认为这只是RegisterType<Handler1, Handler1>(),但我对此不太确定。

    如果您不喜欢容器的LoadFrom扩展名,则无需使用它。您始终可以自己解决使用者,并通过_sbc.Consume(() => container.resolve<YourConsumerType>())在配置中注册他们。 LoadFrom扩展对于以常见方式使用容器的人来说服了。

    以下代码可以正常工作,它以我期望的方式使用容器,而又不了解您的域,因此可以使用它。如果您想更好地了解消息的绑定(bind)方式,建议您使用RabbitMQ,因为您可以通过放弃交换绑定(bind)来轻松了解最终的结果。在这一点上,这已经远远超出了SO的问题,如果您还有其他任何疑问,我会将其带到邮件列表中。

    using System;
    using MassTransit;
    using Microsoft.Practices.Unity;
    
    namespace MT_Unity
    {
        class Program
        {
            static void Main(string[] args)
            {
                using (var container = new UnityContainer()
                    .RegisterType<ICommandHandler<MyCommand>, MyCommandHandler>()
                    .RegisterType<CommandHandlerToConsumerAdapter<MyCommand>>())
    
                using (IServiceBus consumerBus = ServiceBusFactory.New(sbc =>
                        {
                            sbc.ReceiveFrom("rabbitmq://localhost/consumer");
                            sbc.UseRabbitMq();
    
    
                            sbc.Subscribe(s => s.Consumer(() => container.Resolve<CommandHandlerToConsumerAdapter<MyCommand>>()));
                        }))
                using (IServiceBus publisherBus = ServiceBusFactory.New(sbc =>
                        {
                            sbc.ReceiveFrom("rabbitmq://localhost/publisher");
                            sbc.UseRabbitMq();
                        }))
                {
                    publisherBus.Publish(new MyCommand());
    
                    Console.ReadKey();
                }
            }
        }
    
        public class CommandHandlerToConsumerAdapter<T> : Consumes<T>.All where T : class, ICommand
        {
            private readonly ICommandHandler<T> _commandHandler;
    
            public CommandHandlerToConsumerAdapter(ICommandHandler<T> commandHandler)
            {
                _commandHandler = commandHandler;
            }
    
            public void Consume(T message)
            {
                _commandHandler.Handle(message);
            }
        }
    
        public interface ICommand { }
        public class MyCommand : ICommand { }
    
        public interface ICommandHandler<T> where T : class, ICommand
        {
            void Handle(T message);
        }
    
        public class MyCommandHandler : ICommandHandler<MyCommand>
        {
            public MyCommandHandler()
            {
    
            }
            public void Handle(MyCommand message)
            {
                Console.WriteLine("Handled MyCommand");
            }
        }
    
    }
    

    关于.net - 通过Unity的同一条消息的多个使用者在MassTransit中不起作用,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/20710607/

    相关文章:

    wpf - Prism:创建外壳后将模块加载到目录中

    dependency-injection - 需要 WCF 的完整 DI 示例

    asp.net-web-api - 统一容器 : what is default lifetimemanager

    javascript - Mule API 控制台如何工作?

    java - 如何为自定义消息处理器实现自定义 SamplingService?在检索元素之后和执行序列之前记录

    java - Apache Camel 中按时间顺序排列两个来源的消息

    .net - ORACLE SDO_GEOMETRY ToString()?

    c# - 查询时丢失对子对象的引用

    .net - 在 .net 中验证和读取 xml 文件的最简单方法?

    c# - 如何更改 WCF 中的 SOAP 信封架构?