我有两个组件(我们称之为生产者和消费者)连接到同一个基于 RabbitMQ 主题的交换。
生产者可以发送两种不同的消息类型; Foo
和 Bar
(每条消息的内容无关,但我们只能说它们都有一个 id
字段)。每条消息使用的路由键分别是 msg.foo
和 msg.bar
。生产者使用 Jackson2JsonMessageConverter
,而不是依赖默认的 Java 序列化。
消费者有一个队列,该队列绑定(bind)到同一个交换器,路由键为msg.#
。一旦使用,消费者所要做的就是在日志文件中打印每条消息的id
。为了检索 id 字段的值,需要将 JSON 负载转换为某种对象。
消息类(Foo
和 Bar
)不在两个组件之间共享。消费者的对象在其消息表示中可能具有更多或更少的字段。这很好,在这种情况下,任何空字段都可以设置为 null。
是否有一种优雅的方式将这些消息从 JSON 转换/序列化为 Foo
和 Bar
类型的对象?我能想到的唯一解决方案是手动编写读取 amqp_receivedRoutingKey 或 json__TypeId__ header 的代码,以确定对象类型。示例:
@Bean
public IntegrationFlow inbound(ConnectionFactory connectionFactory, Queue queue) {
return IntegrationFlows.from(Amqp.inboundAdapter(connectionFactory, queue))
.handle(message -> {
final byte[] payload = (byte[]) message.getPayload();
final String payloadStr = new String(payload, Charset.defaultCharset());
final String routingKey = (String) message.getHeaders().get("amqp_receivedRoutingKey");
try {
if (routingKey.equals("msg.foo")) {
final Foo foo = new Jackson2JsonObjectMapper().fromJson(payloadStr, Foo.class);
System.out.println("FOO id: " + foo.getId());
} else if (routingKey.equals("msg.bar")) {
final Bar bar = new Jackson2JsonObjectMapper().fromJson(payloadStr, Bar.class);
System.out.println("BAR id: " + bar.getId());
}
} catch (Exception e) {
e.printStackTrace();
}
})
.get();
}
不幸的是,由于重复的 if
/else
子句,它非常不稳定且很难看。我是否缺少任何 Spring Integration 技巧,可以使我的代码更具可读性?
我设法找到了一个类似的问题( spring boot rabbitmq MappingJackson2MessageConverter custom object conversion ),但除了解决方案不起作用之外,它是非常特定于 RabbitMQ 的。我宁愿与 RabbitMQ 无关,并尽可能使用标准 AMQP 类/导入。
最佳答案
您似乎忽略了 Jackson2JsonMessageConverter
在 AMQP 消息中填充适当 header 的事实。这个可以依靠这些 header 在消费者端恢复 POJO。
因此,请考虑使用 Jackson2JsonMessageConverter
配置您的 Amqp.inboundAdapter(connectionFactory, queue)
。
在文档中查看有关转换器的更多信息:https://docs.spring.io/spring-amqp/docs/2.2.0.RC1/reference/html/#json-message-converter
关于java - RabbitMQ + Spring 集成 : Message conversion from queue bound to a topic exchange,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/57841169/