java - RabbitMQ + Spring 集成 : Message conversion from queue bound to a topic exchange

标签 java rabbitmq spring-integration spring-amqp spring-rabbit

我有两个组件(我们称之为生产者和消费者)连接到同一个基于 RabbitMQ 主题的交换。

生产者可以发送两种不同的消息类型; FooBar(每条消息的内容无关,但我们只能说它们都有一个 id 字段)。每条消息使用的路由键分别是 msg.foomsg.bar。生产者使用 Jackson2JsonMessageConverter,而不是依赖默认的 Java 序列化。

消费者有一个队列,该队列绑定(bind)到同一个交换器,路由键为msg.#。一旦使用,消费者所要做的就是在日志文件中打印每条消息的id。为了检索 id 字段的值,需要将 JSON 负载转换为某种对象。

消息类(FooBar)不在两个组件之间共享。消费者的对象在其消息表示中可能具有更多或更少的字段。这很好,在这种情况下,任何空字段都可以设置为 null。

是否有一种优雅的方式将这些消息从 JSON 转换/序列化为 FooBar 类型的对象?我能想到的唯一解决方案是手动编写读取 amqp_receivedRoutingKey 或 json__TypeId__ header 的代码,以确定对象类型。示例:

@Bean
public IntegrationFlow inbound(ConnectionFactory connectionFactory, Queue queue) {
    return IntegrationFlows.from(Amqp.inboundAdapter(connectionFactory, queue))
        .handle(message -> {
            final byte[] payload = (byte[]) message.getPayload();
            final String payloadStr = new String(payload, Charset.defaultCharset());

            final String routingKey = (String) message.getHeaders().get("amqp_receivedRoutingKey");

            try {
                if (routingKey.equals("msg.foo")) {
                    final Foo foo = new Jackson2JsonObjectMapper().fromJson(payloadStr, Foo.class);

                    System.out.println("FOO id: " + foo.getId());
                } else if (routingKey.equals("msg.bar")) {
                    final Bar bar = new Jackson2JsonObjectMapper().fromJson(payloadStr, Bar.class);

                    System.out.println("BAR id: " + bar.getId());
                }
            } catch (Exception e) {
                e.printStackTrace();
            }
        })
        .get();
}

不幸的是,由于重复的 if/else 子句,它非常不稳定且很难看。我是否缺少任何 Spring Integration 技巧,可以使我的代码更具可读性?

我设法找到了一个类似的问题( spring boot rabbitmq MappingJackson2MessageConverter custom object conversion ),但除了解决方案不起作用之外,它是非常特定于 RabbitMQ 的。我宁愿与 RabbitMQ 无关,并尽可能使用标准 AMQP 类/导入。

最佳答案

您似乎忽略了 Jackson2JsonMessageConverter 在 AMQP 消息中填充适当 header 的事实。这个可以依靠这些 header 在消费者端恢复 POJO。

因此,请考虑使用 Jackson2JsonMessageConverter 配置您的 Amqp.inboundAdapter(connectionFactory, queue)

在文档中查看有关转换器的更多信息:https://docs.spring.io/spring-amqp/docs/2.2.0.RC1/reference/html/#json-message-converter

关于java - RabbitMQ + Spring 集成 : Message conversion from queue bound to a topic exchange,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/57841169/

相关文章:

locking - ZooKeeper 和 RabbitMQ/Qpid 结合在一起——是杀伤力大还是一个好的组合?

java - Spring SFTP入站 channel 适配器删除本地文件

java - 如何使用 Spring 集成 DSL 进行 Ftp 轮询

java - 我的容器如何注入(inject)未标记为@stateful/@stateless等的bean?

java - 如何将 .txt 文件添加到 JTextArea 以及放置我的 .txt 文件的位置?

java - 使用 RabbitMQ 和 JMS API 进行事务管理

java - 使用 spEL 的多队列 RabbitListener

java - 你能@Autowired一个@MessageGateway到RestController中吗

2^31 次方的 Java 指数错误

java - 我如何使用 hibernate 条件查询将两个属性连接成一个属性