java - 如何在 Spring Integration 中对 Rabbit MQ 消息监听器强制执行严格排序?

标签 java spring rabbitmq spring-integration spring-amqp

我有一个 Spring Integration 项目,我可以在其中从 RabbitMQ 队列发送和接收消息。

系统发布消息的顺序是可以的,但是之后接收消息的顺序是不正确的。

所以我找到了这段( https://docs.spring.io/spring-integration/reference/html/amqp.html#amqp-strict-ordering )并配置了监听器: simpleMessageListenerContainer.setPrefetchCount(1);

我们进行了一些测试,效果良好。然而,大约一周后,它开始出现类似的排序问题。

让我解释一下:

我在一个 Spring 集成应用程序中有两个流 (IntegrationFlow)。

在第一个 IntegrationFlow 中,它创建消息并将每条消息发布到兔子队列中。

在发布之前,它会记录每条消息,我可以确认 sequenceNumber 按预期递增(在我的例子中为 1,2,3,4,5,6,7,8,9, 10,11)。

然后在第二个流程中消费这些已发布的消息。收到每条消息后,流程会立即再次记录该消息。在这里,我发现 sequenceNumber 没有按预期增加(在我的例子中为 1,3,5,7,2,4,6,8,9,10,11)。

对于此应用程序来说,以正确的顺序处理消息非常重要。

当我查看rabbit的用户界面时,我发现了以下内容(其中大部分都是我所期望的):

  • rabbit 有 3 个连接(用于 3 个 Java 应用程序)
  • 我的应用程序的连接有 3 个 channel 。其中 2 个空闲/没有消费者,1 个有 6 个订阅者且预取计数为 1。
  • 每个订阅者的预取计数均为 1
  • 我只关心其中 1 个订阅者(队列)。
  • 此队列具有“需要确认”属性,而不是“独占”属性。

我没想到我的应用程序连接中有 3 个 channel 。我自己没有配置,也许 Spring Integration/AMQP 为我做了这个。

现在,我认为另一个 channel 可能会变得活跃,这会导致排序问题。但我在日志记录中找不到这个。并且不在配置中。

代码片段:

    @Bean
    public SimpleMessageListenerContainer simpleMessageListenerContainer(final ConnectionFactory connectionFactory,
                                                                         final Jackson2JsonMessageConverter jackson2MessageConverter,
                                                                         final MethodInterceptor retryInterceptor) {
        SimpleMessageListenerContainer simpleMessageListenerContainer = new SimpleMessageListenerContainer(connectionFactory);
        simpleMessageListenerContainer.setMessageConverter(jackson2MessageConverter);
        simpleMessageListenerContainer.setAdviceChain(retryInterceptor);
        // force FIFO ordering (https://docs.spring.io/spring-integration/reference/html/amqp.html#amqp-strict-ordering):
        simpleMessageListenerContainer.setPrefetchCount(1);
        simpleMessageListenerContainer.setConcurrency();
        return simpleMessageListenerContainer;
    }

    @Bean
    public IntegrationFlow routeIncomingAmqpMessagesFlow(final SimpleMessageListenerContainer simpleMessageListenerContainer,
                                                         final Queue q1, final Queue q2, final Queue q3,
                                                         final Queue q4, final Queue q5,
                                                         final Queue q6) {
        simpleMessageListenerContainer.setQueues(q1, q2, q3, q4, q5, q6);
        return IntegrationFlows.from(
                Amqp.inboundAdapter(simpleMessageListenerContainer)
                        .messageConverter(jackson2MessageConverter))
                .log(LoggingHandler.Level.DEBUG, "com.my.thing")
                .headerFilter(MyMessageHeaders.QUEUE_ROUTING_KEY)
                .route(router())
                .get();
    }

    private HeaderValueRouter router() {
        HeaderValueRouter router = new HeaderValueRouter(AmqpHeaders.CONSUMER_QUEUE);
        router.setChannelMapping(AmqpConfiguration.Q1_NAME, Q1_CHANNEL);
        router.setChannelMapping(AmqpConfiguration.Q2_NAME, Q2_CHANNEL);
        router.setChannelMapping(AmqpConfiguration.Q3_NAME, Q3_CHANNEL);
        router.setChannelMapping(AmqpConfiguration.Q4_NAME, Q4_CHANNEL);
        router.setChannelMapping(AmqpConfiguration.Q5_NAME, Q5_CHANNEL);
        router.setChannelMapping(AmqpConfiguration.Q6_NAME, Q6_CHANNEL);
        router.setResolutionRequired(false);
        router.setDefaultOutputChannelName("errorChannel");
        return router;
    }

发布:

    @Bean
    public IntegrationFlow prepareForUpload(final Handler1 handler1) {
        BinaryFileSplitter binaryFileSplitter = new BinaryFileSplitter(true);
        binaryFileSplitter.setChunkSize(chunksize);

        return IntegrationFlows
                .from(aFlow)
                .handle(handler1)
                .split(binaryFileSplitter)
                .log(LoggingHandler.Level.TRACE, "com.my.log.identifyer")
                // Send message to the correct AMQP queue after successful processing
                .enrichHeaders(h -> h.header(QUEUE_ROUTING_KEY, AmqpConfiguration.Q4_NAME))
                .channel(MyChannels.AMQP_OUTPUT)
                .get();
    }
    @Bean
    public IntegrationFlow outputAmqpFlow(final AmqpTemplate amqpTemplate, final UpdateDb updateDb) {
        return IntegrationFlows.from(MyChannels.AMQP_OUTPUT)
                .log(LoggingHandler.Level.DEBUG, "com.my.log.identify")
                .handle(updateDb)
                .handle(Amqp.outboundAdapter(amqpTemplate)
                        .exchangeName(AmqpConfiguration.THE_TOPIC_EXCHANGE)
                        .routingKeyExpression("headers['queueRoutingKey']"))
                .get();
    }

接收:

    @Bean
    public IntegrationFlow handleReceivedMessages() {
        return IntegrationFlows
                .from(Q4_CHANNEL)
                .log(LoggingHandler.Level.DEBUG, "com.my.log.identifyer")
                .handle(..)
                .aggregate(a -> a.releaseStrategy(new ChunkReleaseStrategy()))
                .transform(..)
                ....(..)..
                ...

最佳答案

正如您指出的文档中所讨论的,您需要将 BoundRabbitChannelAdvice 添加到分离器,以便所有下游流使用相同的 channel 。

        @Bean
        public IntegrationFlow flow(RabbitTemplate template) {
            return IntegrationFlows.from(Gate.class)
                    .split(s -> s.delimiters(",")
                            .advice(new BoundRabbitChannelAdvice(template)))
                    .<String, String>transform(String::toUpperCase)
                    .handle(Amqp.outboundAdapter(template).routingKey("rk"))
                    .get();
        }

关于java - 如何在 Spring Integration 中对 Rabbit MQ 消息监听器强制执行严格排序?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/61506367/

相关文章:

具有非最终函数参数的 Java Lambda 表达式

Spring 规范 'and' 不起作用

java - 从测试用例调用 Controller 时,使用自动连线组件测试 Controller 为空

rabbitmq - Spring+RabbitMQ 使队列不持久

java - 将 Spring Cloud Stream 与 Rabbitmq 结合使用,交换类型为 "headers"

java - 我将如何优化此外观以获得唯一的数字算法功能

java - 为什么某些 java 代码/quartz 代码需要 .class 文件?

java - 在后台发送电子邮件 - Android

spring - 获取 SDG 2.0 中的注释,获取策略问题

rabbitmq - RabbitMQ 的典型版本控制策略是什么?