我有一个 Spring Integration 项目,我可以在其中从 RabbitMQ 队列发送和接收消息。
系统发布消息的顺序是可以的,但是之后接收消息的顺序是不正确的。
所以我找到了这段( https://docs.spring.io/spring-integration/reference/html/amqp.html#amqp-strict-ordering )并配置了监听器: simpleMessageListenerContainer.setPrefetchCount(1);
。
我们进行了一些测试,效果良好。然而,大约一周后,它开始出现类似的排序问题。
让我解释一下:
我在一个 Spring 集成应用程序中有两个流 (IntegrationFlow
)。
在第一个 IntegrationFlow
中,它创建消息并将每条消息发布到兔子队列中。
在发布之前,它会记录每条消息,我可以确认 sequenceNumber
按预期递增(在我的例子中为 1,2,3,4,5,6,7,8,9, 10,11)。
然后在第二个流程中消费这些已发布的消息。收到每条消息后,流程会立即再次记录该消息。在这里,我发现 sequenceNumber
没有按预期增加(在我的例子中为 1,3,5,7,2,4,6,8,9,10,11)。
对于此应用程序来说,以正确的顺序处理消息非常重要。
当我查看rabbit的用户界面时,我发现了以下内容(其中大部分都是我所期望的):
- rabbit 有 3 个连接(用于 3 个 Java 应用程序)
- 我的应用程序的连接有 3 个 channel 。其中 2 个空闲/没有消费者,1 个有 6 个订阅者且预取计数为 1。
- 每个订阅者的预取计数均为 1
- 我只关心其中 1 个订阅者(队列)。
- 此队列具有“需要确认”属性,而不是“独占”属性。
我没想到我的应用程序连接中有 3 个 channel 。我自己没有配置,也许 Spring Integration/AMQP 为我做了这个。
现在,我认为另一个 channel 可能会变得活跃,这会导致排序问题。但我在日志记录中找不到这个。并且不在配置中。
代码片段:
@Bean
public SimpleMessageListenerContainer simpleMessageListenerContainer(final ConnectionFactory connectionFactory,
final Jackson2JsonMessageConverter jackson2MessageConverter,
final MethodInterceptor retryInterceptor) {
SimpleMessageListenerContainer simpleMessageListenerContainer = new SimpleMessageListenerContainer(connectionFactory);
simpleMessageListenerContainer.setMessageConverter(jackson2MessageConverter);
simpleMessageListenerContainer.setAdviceChain(retryInterceptor);
// force FIFO ordering (https://docs.spring.io/spring-integration/reference/html/amqp.html#amqp-strict-ordering):
simpleMessageListenerContainer.setPrefetchCount(1);
simpleMessageListenerContainer.setConcurrency();
return simpleMessageListenerContainer;
}
@Bean
public IntegrationFlow routeIncomingAmqpMessagesFlow(final SimpleMessageListenerContainer simpleMessageListenerContainer,
final Queue q1, final Queue q2, final Queue q3,
final Queue q4, final Queue q5,
final Queue q6) {
simpleMessageListenerContainer.setQueues(q1, q2, q3, q4, q5, q6);
return IntegrationFlows.from(
Amqp.inboundAdapter(simpleMessageListenerContainer)
.messageConverter(jackson2MessageConverter))
.log(LoggingHandler.Level.DEBUG, "com.my.thing")
.headerFilter(MyMessageHeaders.QUEUE_ROUTING_KEY)
.route(router())
.get();
}
private HeaderValueRouter router() {
HeaderValueRouter router = new HeaderValueRouter(AmqpHeaders.CONSUMER_QUEUE);
router.setChannelMapping(AmqpConfiguration.Q1_NAME, Q1_CHANNEL);
router.setChannelMapping(AmqpConfiguration.Q2_NAME, Q2_CHANNEL);
router.setChannelMapping(AmqpConfiguration.Q3_NAME, Q3_CHANNEL);
router.setChannelMapping(AmqpConfiguration.Q4_NAME, Q4_CHANNEL);
router.setChannelMapping(AmqpConfiguration.Q5_NAME, Q5_CHANNEL);
router.setChannelMapping(AmqpConfiguration.Q6_NAME, Q6_CHANNEL);
router.setResolutionRequired(false);
router.setDefaultOutputChannelName("errorChannel");
return router;
}
发布:
@Bean
public IntegrationFlow prepareForUpload(final Handler1 handler1) {
BinaryFileSplitter binaryFileSplitter = new BinaryFileSplitter(true);
binaryFileSplitter.setChunkSize(chunksize);
return IntegrationFlows
.from(aFlow)
.handle(handler1)
.split(binaryFileSplitter)
.log(LoggingHandler.Level.TRACE, "com.my.log.identifyer")
// Send message to the correct AMQP queue after successful processing
.enrichHeaders(h -> h.header(QUEUE_ROUTING_KEY, AmqpConfiguration.Q4_NAME))
.channel(MyChannels.AMQP_OUTPUT)
.get();
}
@Bean
public IntegrationFlow outputAmqpFlow(final AmqpTemplate amqpTemplate, final UpdateDb updateDb) {
return IntegrationFlows.from(MyChannels.AMQP_OUTPUT)
.log(LoggingHandler.Level.DEBUG, "com.my.log.identify")
.handle(updateDb)
.handle(Amqp.outboundAdapter(amqpTemplate)
.exchangeName(AmqpConfiguration.THE_TOPIC_EXCHANGE)
.routingKeyExpression("headers['queueRoutingKey']"))
.get();
}
接收:
@Bean
public IntegrationFlow handleReceivedMessages() {
return IntegrationFlows
.from(Q4_CHANNEL)
.log(LoggingHandler.Level.DEBUG, "com.my.log.identifyer")
.handle(..)
.aggregate(a -> a.releaseStrategy(new ChunkReleaseStrategy()))
.transform(..)
....(..)..
...
最佳答案
正如您指出的文档中所讨论的,您需要将 BoundRabbitChannelAdvice
添加到分离器,以便所有下游流使用相同的 channel 。
@Bean
public IntegrationFlow flow(RabbitTemplate template) {
return IntegrationFlows.from(Gate.class)
.split(s -> s.delimiters(",")
.advice(new BoundRabbitChannelAdvice(template)))
.<String, String>transform(String::toUpperCase)
.handle(Amqp.outboundAdapter(template).routingKey("rk"))
.get();
}
关于java - 如何在 Spring Integration 中对 Rabbit MQ 消息监听器强制执行严格排序?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/61506367/