c# - 没有依赖注入(inject)的命令总线/调度程序和处理程序注册

标签 c# dependency-injection cqrs service-locator message-bus

我正在尝试实现一个可使用的库,该库在 CQRS 上下文中为每个域提供读/写应用程序服务。 Command Bus(或 Dispatcher,或在这种情况下可以调用的任何名称)接口(interface)可能会公开,也可能不会公开,但应该从消费者那里抽象实现,以鼓励针对接口(interface)定义的契约进行编程。除了使用标准约定之外,我不想要求库的使用者必须在他们的 DI 框架中设置库,因此使用的 DI 框架无关紧要(需要基于约定的 DI 不在这个问题的范围内) .

internal interface ICommandMessage
{
    Guid Id { get; }
    DateTime DateRequestedUtc { get; }
}
internal class BaseCommandMessage
{
    /*... Implementation of ICommandMessage for common command data ...*/
}

internal class ExampleCommand : BaseCommandMessage
{
    /*... Additional data required for command ...*/
}

internal class AnotherExampleCommand : BaseCommandMessage
{
    /*... Additional data required for command ...*/
}

internal interface ICommandHandler<in TCommand> where TCommand : class, ICommandMessage
{
    Task HandleAsync(TCommand command);
}

internal class ExampleCommandHandler : ICommandHandler<ExampleCommand>, ICommandHandler<AnotherExampleCommand>
{
    Task HandleAsync(ExampleCommand command){/*...*/}
    Task HandleAsync(AnotherExampleCommand command){/*...*/}
}

public interface ICommandBus
{
    void Register<TCommand>(ICommandHandler<TCommand> handler) where TCommand : class, ICommandMessage;
    void Dispatch(ICommandMessage command);
}

public interface IDomainWriteService
{
    void Save(int id);
    /*... some other read methods ...*/
}

internal class SomeDomainWriteService
{
    private readonly ICommandBus _Bus;

    public SomeDomainWriteService(ICommandBus bus)
    {
        _Bus = bus;
    }

    public void Save(int id)
    {
        //This will change depending on how ExampleCommand is implemented, etc
        _Bus.Dispatch(new ExampleCommand());
    }
}

主要问题是我希望 ICommandHandler 的内部实现以某种方式自动注册到命令总线,但构造函数不采用泛型,如以下实现所示:

internal public class DependencyInjectedCommandBus : ICommandBus
{
    private readonly Dictionary<Type, Action<ICommandMessage>> _handlers = new Dictionary<Type, Action<ICommandMessage>>();

    public DependencyInjectedCommandBus(List<ICommandHandler<TCommand>> handlers)
    {
        handlers.forEach(h => Register<TCommand>(h));
    }

    public void Register<TCommand>(ICommandHandler<TCommand> handler) where TCommand : class, ICommandMessage
    {
        var type = typeof (TCommand);
        if (_Handlers.ContainsKey(type))
        {
            throw new InvalidOperationException(string.Format("Handler exists for type {0}.", type));
        }
        _Handlers[type] = async command => await handler.HandleAsync((TCommand)command);
    }

    public void Dispatch(ICommandMessage command)
    {
        var type = command.GetType();
        if(!_Handlers.ContainsKey(type)){ return; }
        var handler = _Handlers[type];
        handler(command);
    }
}

使用 DI 容器(在本例中为 Ninject),如果 ICommandBus 稍作更改,则不允许注册的实现可能如下所示:

internal class NinjectCommandBus : ICommandBus
{
    private readonly IKernel _Kernel;

    public NinjectCommandBus(IKernel kernel)
    {
        _Kernel = kernel;
    }

    public void Register<TCommand>(ICommandHandler<TCommand> handler) where TCommand : class, ICommandMessage
    {
        throw new NotImplementedException();
    }

    public async Task DispatchAsync<TCommand>(TCommand command) where TCommand : class, ICommandMessage
    {
        var handler = _Kernel.Get<ICommandHandler<TCommand>>();
        await handler.HandleAsync(command);
    }
}

我也读过类似 this one on Mark Seeman's blog 的文章描述了在没有服务位置或依赖于 DI 容器(通过“穷人的 DI”)的情况下执行此操作的方法,但是我根本无法通过 Ninject 为我提供那种解决方案(更不用说以某种方式使用约定或不依赖在 DI 上为我完成工作),并且似乎需要更多的“锅炉”代码来连接东西。

关于如何在不必在某处显式注册处理程序的情况下继续进行此操作的任何建议?我关于允许通过注册命令处理程序进行扩展的想法是否有效?

最佳答案

如果您不介意扫描程序集以查找命令处理程序,这里有一个与 DI 容器一起使用的简单解决方案。

namespace SimpleCqrs {
  using System;
  using System.Collections.Generic;
  using System.Linq;
  using System.Reflection;
  using System.Threading.Tasks;

  public interface ICommandMessage {
    Guid Id { get; }
    DateTime DateRequestedUtc { get; }
  }

  internal abstract class BaseCommandMessage : ICommandMessage {
    protected BaseCommandMessage() {
      DateRequestedUtc = DateTime.UtcNow;
      Id = Guid.NewGuid();
    }

    public DateTime DateRequestedUtc {
      get; private set;
    }

    public Guid Id {
      get; private set;
    }
  }

  internal class ExampleCommand : BaseCommandMessage {
    public string Message { get; set; }
  }

  internal class AnotherExampleCommand : BaseCommandMessage {
    public string Message { get; set; }
  }

  internal interface ICommandHandler<in TCommand> where TCommand : class, ICommandMessage
  {
    Task HandleAsync(TCommand command);
  }

  internal class WriteService : ICommandHandler<ExampleCommand>, ICommandHandler<AnotherExampleCommand>
  {
    public Task HandleAsync(AnotherExampleCommand command) {
      return Task.Run(() => {
        Console.WriteLine(command.Message);
      });
    }

    public Task HandleAsync(ExampleCommand command)
    {
      return Task.Run(() =>
      {
        Console.WriteLine(command.Message);
      });
    }
  }

  public interface ICommandBus
  {
   void Dispatch(ICommandMessage command);
  }

  public class SimpleCommandBus : ICommandBus
  {
    Dictionary<Type, Type> handlers;
    MethodInfo dispatchCommand;

    public SimpleCommandBus()
    {
      this.handlers = RegisterCommandHandlers();
      this.dispatchCommand = GetType().GetMethod("DispatchCommand", BindingFlags.NonPublic | BindingFlags.Instance);
    }

    public void Dispatch(ICommandMessage command)
    {
      var cmdType = command.GetType();
      var handler = Activator.CreateInstance(handlers[cmdType]);
      var genericMethod = dispatchCommand.MakeGenericMethod(cmdType);
      genericMethod.Invoke(this, new object[] { handler, command });
    }

    async void DispatchCommand<T>(ICommandHandler<T> handler, T command) where T : class, ICommandMessage
    {
      await handler.HandleAsync(command);
    }

    Dictionary<Type, Type> RegisterCommandHandlers()
    {
      Func<Type, bool> isCommandHandler = t => 
        t.GetInterfaces()
         .Any(i => i.IsGenericType && i.GetGenericTypeDefinition() == typeof(ICommandHandler<>));

      Func<Type, IEnumerable<Tuple<Type, Type>>> collect = t =>
        t.GetInterfaces().Select(i =>       
          Tuple.Create(i.GetGenericArguments()[0], t));

      return Assembly.GetCallingAssembly()
                     .GetTypes()
                     .Where(t => !t.IsAbstract && !t.IsGenericType)
                     .Where(isCommandHandler)
                     .SelectMany(collect)
                     .ToDictionary(x => x.Item1, x => x.Item2);
    }
  }


  class Program
  {
    static void Main(string[] args)
    {
      var bus = new SimpleCommandBus();
      bus.Dispatch(new ExampleCommand { Message = "Hello" });
      bus.Dispatch(new AnotherExampleCommand { Message = "World" });
      Console.ReadKey();
    }
  }
}

关于c# - 没有依赖注入(inject)的命令总线/调度程序和处理程序注册,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/30739865/

相关文章:

java - 如何正确地将许多服务注入(inject) Spring MVC Controller ?

domain-driven-design - CQRS 命令和查询 - 它们是否属于域?

c# - 内存记录过多的 LINQ to SQL

c# - 如何 : Dynamically Creating the XML file content and sending as a text file in asp.net c#

c# - XPathDocument 与 XPathNavigator VS Xmlreader?

java - Dagger :注入(inject)一个类的不同实例

delphi - 如何将字段接口(interface)注入(inject)到对象中

c# - 控制台与 WebAPI 中共享类库的 MediatR 设置

c# - CQRS与提交数据时的冲突

c# - 在 TPL 中中止长时间运行的任务