我正在尝试将 Brighter 用于命令/事件源。我有一个包含用于将消息放入队列的 .NET Core Web Api 服务的解决方案和另一个包含用于从队列中获取消息的 .NET Core 控制台项目的解决方案。这些服务是隔离的,不在同一个解决方案中。
消息调度程序确实将消息从 Rabbit 中拉出并将其路由到 MessageMapper,但是消息没有找到到达处理程序进行处理的方式。
Visual Studio 2015、.NET Core 1.1、Paramore.Brighter.MessagingGateway.RMQ 7.1.5、Paramore.Brighter.ServiceActivator 7.1.5、StructureMap.Microsoft.DependencyInjection 1.4.0。
控制台应用程序中的配置:
public static void Main(string[] args)
{
RetryPolicy retryPolicy = Policy.Handle<Exception>().WaitAndRetry(new List<TimeSpan>()
{
TimeSpan.FromMilliseconds(50),
TimeSpan.FromMilliseconds(100),
TimeSpan.FromMilliseconds(150)
});
CircuitBreakerPolicy circuitBreakerPolicy = Policy.Handle<Exception>().CircuitBreaker(1, TimeSpan.FromMilliseconds(500));
PolicyRegistry policyRegistry = new PolicyRegistry() { { CommandProcessor.RETRYPOLICY, retryPolicy }, { CommandProcessor.CIRCUITBREAKER, circuitBreakerPolicy } };
var subscriberRegistry = new SubscriberRegistry();
subscriberRegistry.Register<ApplicationUpdateCommand, ApplicationUpdateCommandHandler>();
var rmqConnnection = new RmqMessagingGatewayConnection
{
AmpqUri = new AmqpUriSpecification(new Uri("amqp://guest:guest@localhost:5672/%2f")),
Exchange = new Exchange("api.coverage.exchange"),
};
var rmqMessageConsumerFactory = new RmqMessageConsumerFactory(rmqConnnection);
var rmqMessageProducerFactory = new RmqMessageProducerFactory(rmqConnnection);
Dispatcher dispatcher = null;
var container = new Container();
container.Configure(config =>
{
config.For<IHandleRequests<ApplicationUpdateCommand>>().Use<ApplicationUpdateCommandHandler>();
var servicesMessageMapperFactory = new ServicesMessageMapperFactory(container);
var messageMapperRegistry = new MessageMapperRegistry(servicesMessageMapperFactory)
{
{typeof(ApplicationUpdateCommand), typeof(ApplicationUpdateCommandMessageMapper) }
};
var servicesHandlerFactory = new ServicesHandlerFactory(container);
var commandProcessor = CommandProcessorBuilder.With()
.Handlers(new HandlerConfiguration(subscriberRegistry, servicesHandlerFactory))
.Policies(policyRegistry)
.NoTaskQueues()
.RequestContextFactory(new InMemoryRequestContextFactory())
.Build();
dispatcher = DispatchBuilder.With()
.CommandProcessor(commandProcessor)
.MessageMappers(messageMapperRegistry)
.DefaultChannelFactory(new InputChannelFactory(rmqMessageConsumerFactory, rmqMessageProducerFactory))
.Connections(new List<Connection>()
{
new Connection<ApplicationUpdateCommand>
(
new ConnectionName("Application.Update"),
new ChannelName("Application.Update"),
new RoutingKey("Application.Update")
)
}).Build();
});
dispatcher.Receive();
Console.WriteLine("Press enter to stop ...");
Console.ReadLine();
dispatcher.End().Wait();
}
MessageMapper、命令和处理程序的代码:
public class ApplicationUpdateCommandMessageMapper : IAmAMessageMapper<ApplicationUpdateCommand>
{
public Message MapToMessage(ApplicationUpdateCommand request)
{
var header = new MessageHeader(messageId: request.Id, topic: "Application.Update", messageType: MessageType.MT_EVENT);
var body = new MessageBody(JsonConvert.SerializeObject(request));
var message = new Message(header, body);
return message;
}
public ApplicationUpdateCommand MapToRequest(Message message)
{
// dispatcher will route message here but that is it
ApplicationUpdateCommand command = JsonConvert.DeserializeObject<ApplicationUpdateCommand>(message.Body.Value);
return command;
}
}
public class ApplicationUpdateCommand : Command
{
public int ApplicationId { get; private set; }
public string ApplicantName { get; private set; }
public ApplicationUpdateCommand(Guid id, int applicationId, string applicantName)
: base(id)
{
ApplicationId = applicationId;
ApplicantName = applicantName;
}
}
public class ApplicationUpdateCommandHandler : RequestHandler<ApplicationUpdateCommand>
{
private readonly IAmACommandProcessor _commandProcessor;
public ApplicationUpdateCommandHandler(IAmACommandProcessor commandProcessor)
{
_commandProcessor = commandProcessor;
}
public override ApplicationUpdateCommand Handle(ApplicationUpdateCommand command)
{
// would like to get here to handle command
return base.Handle(command);
}
}
最佳答案
您在 header 中标识为 MessageType.MT_EVENt,但派生自 Command。两者应该一致,要么派生自Event,要么使用MT_COMMAND
关于c# - Brighter Consumer 消息未路由到处理程序,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/46333455/