c# - Brighter Consumer 消息未路由到处理程序

标签 c# .net-core command event-sourcing brighter

我正在尝试将 Brighter 用于命令/事件源。我有一个包含用于将消息放入队列的 .NET Core Web Api 服务的解决方案和另一个包含用于从队列中获取消息的 .NET Core 控制台项目的解决方案。这些服务是隔离的,不在同一个解决方案中。

消息调度程序确实将消息从 Rabbit 中拉出并将其路由到 MessageMapper,但是消息没有找到到达处理程序进行处理的方式。

Visual Studio 2015、.NET Core 1.1、Paramore.Brighter.MessagingGateway.RMQ 7.1.5、Paramore.Brighter.ServiceActivator 7.1.5、StructureMap.Microsoft.DependencyInjection 1.4.0。

控制台应用程序中的配置:

public static void Main(string[] args)
{
    RetryPolicy retryPolicy = Policy.Handle<Exception>().WaitAndRetry(new List<TimeSpan>()
    {
        TimeSpan.FromMilliseconds(50),
        TimeSpan.FromMilliseconds(100),
        TimeSpan.FromMilliseconds(150)
    });

    CircuitBreakerPolicy circuitBreakerPolicy = Policy.Handle<Exception>().CircuitBreaker(1, TimeSpan.FromMilliseconds(500));
    PolicyRegistry policyRegistry = new PolicyRegistry() { { CommandProcessor.RETRYPOLICY, retryPolicy }, { CommandProcessor.CIRCUITBREAKER, circuitBreakerPolicy } };

    var subscriberRegistry = new SubscriberRegistry();
    subscriberRegistry.Register<ApplicationUpdateCommand, ApplicationUpdateCommandHandler>();

    var rmqConnnection = new RmqMessagingGatewayConnection
    {
        AmpqUri = new AmqpUriSpecification(new Uri("amqp://guest:guest@localhost:5672/%2f")),
        Exchange = new Exchange("api.coverage.exchange"),
    };

    var rmqMessageConsumerFactory = new RmqMessageConsumerFactory(rmqConnnection);
    var rmqMessageProducerFactory = new RmqMessageProducerFactory(rmqConnnection);

    Dispatcher dispatcher = null;
    var container = new Container();
    container.Configure(config =>
    {
        config.For<IHandleRequests<ApplicationUpdateCommand>>().Use<ApplicationUpdateCommandHandler>();
        var servicesMessageMapperFactory = new ServicesMessageMapperFactory(container);
        var messageMapperRegistry = new MessageMapperRegistry(servicesMessageMapperFactory)
        {
            {typeof(ApplicationUpdateCommand), typeof(ApplicationUpdateCommandMessageMapper) }
        };

        var servicesHandlerFactory = new ServicesHandlerFactory(container);

        var commandProcessor = CommandProcessorBuilder.With()
            .Handlers(new HandlerConfiguration(subscriberRegistry, servicesHandlerFactory))
            .Policies(policyRegistry)
            .NoTaskQueues()
            .RequestContextFactory(new InMemoryRequestContextFactory())
            .Build();

            dispatcher = DispatchBuilder.With()
                                        .CommandProcessor(commandProcessor)
                                       .MessageMappers(messageMapperRegistry)
                                        .DefaultChannelFactory(new InputChannelFactory(rmqMessageConsumerFactory, rmqMessageProducerFactory))
                                        .Connections(new List<Connection>()
                                        {
                                            new Connection<ApplicationUpdateCommand>
                                            (
                                                new ConnectionName("Application.Update"), 
                                                new ChannelName("Application.Update"), 
                                                new RoutingKey("Application.Update")
                                            )
                                        }).Build();
        });

        dispatcher.Receive();

        Console.WriteLine("Press enter to stop ...");
        Console.ReadLine();

        dispatcher.End().Wait();
    }

MessageMapper、命令和处理程序的代码:

public class ApplicationUpdateCommandMessageMapper : IAmAMessageMapper<ApplicationUpdateCommand>
{
    public Message MapToMessage(ApplicationUpdateCommand request)
    {
        var header = new MessageHeader(messageId: request.Id, topic: "Application.Update", messageType: MessageType.MT_EVENT);
        var body = new MessageBody(JsonConvert.SerializeObject(request));
        var message = new Message(header, body);
        return message;
    }

    public ApplicationUpdateCommand MapToRequest(Message message)
    {
        // dispatcher will route message here but that is it
        ApplicationUpdateCommand command = JsonConvert.DeserializeObject<ApplicationUpdateCommand>(message.Body.Value);
        return command;
    }
}

public class ApplicationUpdateCommand : Command
{
    public int ApplicationId { get; private set; }
    public string ApplicantName { get; private set; }

    public ApplicationUpdateCommand(Guid id, int applicationId, string applicantName)
        : base(id)
    {
        ApplicationId = applicationId;
        ApplicantName = applicantName;
    }
}

public class ApplicationUpdateCommandHandler : RequestHandler<ApplicationUpdateCommand>
{
    private readonly IAmACommandProcessor _commandProcessor;

    public ApplicationUpdateCommandHandler(IAmACommandProcessor commandProcessor)
    {
        _commandProcessor = commandProcessor;
    }

    public override ApplicationUpdateCommand Handle(ApplicationUpdateCommand command)
    {
        // would like to get here to handle command

        return base.Handle(command);
    }
}

最佳答案

您在 header 中标识为 MessageType.MT_EVENt,但派生自 Command。两者应该一致,要么派生自Event,要么使用MT_COMMAND

关于c# - Brighter Consumer 消息未路由到处理程序,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/46333455/

相关文章:

azure - 需要帮助将项目插入 Cosmos Collection

powershell - 电源外壳 : A command as a variable with parameter

regex - 带正则表达式的 sed 命令

c# - 抛出了 'Microsoft.Online.Administration.Automation.MicrosoftOnlineException' 类型的异常

c# - MVVM WPF : Get usercontrol name from ViewModel

c# - OData 返回 'Cannot find the services container for route ' odata-Unversioned'

c# - 克隆 Office Open XML 文档的最有效方法是什么?

c# - 为什么在我的 WebClient DownloadFileAsync 方法中下载空文件?

c# - 在 Mongodb C# 中使用 AsQueryable 进行过滤 - ExpandoObject

node.js - `npm i`命令有什么作用?