java - Spring AMQP 手动容器异常缓慢

标签 java spring spring-amqp spring-rabbit

当使用 Spring AMQP DSL 并手动创建 MessageListenerContainer 时,其吞吐量比使用 @RabbitListener 方法慢很多。根据我的理解,当使用 @RabbitListener 时,容器是由 BeanPostProcessor 创建的,我正在做的应该是相同的,但由于某种原因不是。我在下面添加了这两种方法来说明我的意思。

我的方法的吞吐量非常慢,使用下面的配置时约为 30/s,而使用注释时,吞吐量非常快。我做错了什么?

应用程序属性

spring.rabbitmq.listener.type=direct
spring.rabbitmq.listener.direct.default-requeue-rejected=false
spring.rabbitmq.listener.direct.consumers-per-queue=50
spring.rabbitmq.listener.direct.prefetch=10

DemoConfiguration.java

@Configuration
@EnableRabbit
public class DemoConfiguration {

    @Bean
    // manually creating the container, VERY slow
    public DirectMessageListenerContainer container(final DirectRabbitListenerContainerFactory containerFactory) {
        final DirectMessageListenerContainer listenerContainer = containerFactory.createListenerContainer();
        listenerContainer.setQueueNames("in_queue"); // has 2.000 messages before starting the application
        listenerContainer.setListenerId("listener_in_queue");
        return listenerContainer;
    }

    @Bean
    public IntegrationFlow demoFlow(final DirectMessageListenerContainer container) {
        return IntegrationFlows.from(Amqp.inboundGateway(container))
                // EXTREMELY slow
                .nullChannel();
    }

    // this is working very fast, 1000 messages per second
    /*@RabbitListener(queues = "in_queue")
    public void consume() {

    }*/
}

最佳答案

编辑

问题是您使用的是入站网关,并且消费者线程正在等待答复(默认情况下为 1 秒),该答复永远不会到达。

改为使用 Amqp.inboundAdapter(或将网关上的 replyTimeout 设置为零。

PRE_EDIT

我还不确定为什么,但问题似乎出在 Spring Integration 中,而不是容器中:

@SpringBootApplication
public class So54365437Application {

    private static final Logger logger = LoggerFactory.getLogger(So54365437Application.class);

    public static void main(String[] args) {
        SpringApplication.run(So54365437Application.class, args).close();
    }

    private final AtomicInteger lCount = new AtomicInteger();

    private final AtomicInteger mCount = new AtomicInteger();

    private final AtomicInteger iCount = new AtomicInteger();

    private final AtomicLong t0 = new AtomicLong();

    @RabbitListener(queues = "foo")
    public void listener(Integer in) {
        int n = lCount.incrementAndGet();
        if (n % 100 == 0) {
            logger.info("listener @" + n);
        }
        if (n == 2000) {
            logger.info("listener done @" + rate());
        }
    }

    @Bean
    public DirectMessageListenerContainer container(final DirectRabbitListenerContainerFactory containerFactory) {
        final DirectMessageListenerContainer listenerContainer = containerFactory.createListenerContainer();
        listenerContainer.setQueueNames("bar");
        listenerContainer.setAutoStartup(false);
        listenerContainer.setMessageListener(m -> {
            int n = mCount.incrementAndGet();
            if (n % 100 == 0) {
                logger.info("manual @" + n);
            }
            if (n == 2000) {
                logger.info("manual done @" + rate());
            }
        });
        return listenerContainer;
    }

    @Bean
    public DirectMessageListenerContainer integrationContainer(final DirectRabbitListenerContainerFactory containerFactory) {
        final DirectMessageListenerContainer listenerContainer = containerFactory.createListenerContainer();
        listenerContainer.setQueueNames("baz");
        listenerContainer.setAutoStartup(false);
        return listenerContainer;
    }

    @Bean
    public IntegrationFlow demoFlow(final DirectMessageListenerContainer integrationContainer) {
        return IntegrationFlows.from(Amqp.inboundGateway(integrationContainer).autoStartup(false))
                .handle(p -> {
                    int n = iCount.incrementAndGet();
                    if (n % 100 == 0) {
                        logger.info("integration @" + n);
                    }
                    if (n == 2000) {
                        logger.info("integration done @" + rate());
                    }
                })
                .get();
    }

    private String rate() {
        return "" + 2000000.0 / ((System.currentTimeMillis() - t0.get())) + "/sec";
    }

    @Bean
    public ApplicationRunner runner(RabbitTemplate template, RabbitListenerEndpointRegistry registry,
            DirectMessageListenerContainer container, DirectMessageListenerContainer integrationContainer) {

        return args -> {
            IntStream.range(0, 2000)
                .forEach(i -> {
                    switch(args.getSourceArgs()[1]) {
                    case "listener":
                        template.convertAndSend("foo", i);
                        break;
                    case "manual":
                        template.convertAndSend("bar", i);
                        break;
                    case "integration":
                        template.convertAndSend("baz", i);
                        break;
                    }
                });
            logger.info("All sent; starting container");
            t0.set(System.currentTimeMillis());
            switch(args.getSourceArgs()[1]) {
            case "listener":
                registry.start();
                break;
            case "manual":
                container.start();
                break;
            case "integration":
                integrationContainer.start();
                break;
            }
            System.in.read();
        };
    }

}

listener done @10309.278350515464/sec

manual done @11111.111111111111/sec

integration done @15.578629236413487/sec

关于java - Spring AMQP 手动容器异常缓慢,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/54365437/

相关文章:

java - 嵌套事务用例中的外部事务未看到数据库中持久保存的更新(JPA、MySQL、Spring Framework 和 Hibernate)

java - 间歇性 NGINX 错误 : client sent invalid chunked body

Java:使用分隔符连接基元数组

java - React 登录页面(客户端)和 spring boot/security(服务器端)

spring - org/springframework/core/NativeDetector java.lang.NoClassDefFoundError : org/springframework/core/NativeDetector issue is comming

spring - 如何通过配置禁用 Rabbit 健康检查

java - Mongodb 无法持久化事务状态,因为 session 事务集合丢失

java - 在 JSP 中验证表单

java - AggregatingMessageHandler 的手动 ACK

java - Spring集成-读取JDBC,发送AMQP消息并COMMIT