java - Spring AMQP AcknowledgeMode.AUTO 工作缓慢

标签 java spring amqp spring-amqp

我有一个生产者,它每秒向 RabbitMQ 发送 20 条消息,我还有一个消费者,它应该以与生产消息相同的速度接收消息。

有一些条件我必须实现:

  1. 每秒生成和使用 20 条消息。
  2. 保存生产订单。
  3. 消息不应丢失(这就是我使用 AcknowledgeMode.AUTO 的原因)。

当我使用 Spring AMQP 实现 (org.springframework.amqp.rabbit) 时,我的消费者每秒最多处理 6 条消息。但是,如果我使用 native AMQP 库 (com.rabbitmq.client),它会每秒处理所有 20 条消息,同时使用 ack - 自动和手动。

问题是:

为什么 Spring 在 Consumer 案例中的实现工作如此缓慢,我该如何解决这个问题?

如果我设置 prefetchCount(20),它会根据需要工作,但我不能使用预取,因为它会在拒绝情况下破坏顺序。

Spring amqp:

@Bean
public ConnectionFactory connectionFactory() {
    CachingConnectionFactory connectionFactory = new CachingConnectionFactory(rabbitMqServer);
    connectionFactory.setUsername(rabbitMqUsername);
    connectionFactory.setPassword(rabbitMqPassword);
    return connectionFactory;
}

...

private SimpleMessageListenerContainer createContainer(Queue queue, Receiver receiver, AcknowledgeMode acknowledgeMode) {
    SimpleMessageListenerContainer persistentListenerContainer = new SimpleMessageListenerContainer();
    persistentListenerContainer.setConnectionFactory(connectionFactory());
    persistentListenerContainer.setQueues(queue);
    persistentListenerContainer.setMessageListener(receiver);
    persistentListenerContainer.setAcknowledgeMode(AcknowledgeMode.AUTO);
    return persistentListenerContainer;
}

...

@Override
public void onMessage(Message message) {saveToDb}

最佳答案

Spring AMQP(2.0 之前)默认预取为 1,正如您所说,即使在拒绝后也能保证顺序。

默认情况下, native 客户端不应用 basicQos(),这实际上意味着它具有无限预取。

所以你不是在比较苹果和苹果。

使用 native 客户端尝试 channel.basicQos(1),您应该会看到与默认 spring amqp 设置类似的结果。

编辑

当比较苹果与苹果时,无论是否使用框架,我都会得到相似的结果......

@SpringBootApplication
public class So47995535Application {

    public static void main(String[] args) {
        SpringApplication.run(So47995535Application.class, args).close();
    }

    private final CountDownLatch latch = new CountDownLatch(100);

    private int nativeCount;

    private int rlCount;

    @Bean
    public ApplicationRunner runner(ConnectionFactory factory, RabbitTemplate template,
            SimpleMessageListenerContainer container) {
        return args -> {
            for (int i = 0; i < 100; i++) {
                template.convertAndSend("foo", "foo" + i);
            }
            container.start();
            Connection conn = factory.createConnection();
            Channel channel = conn.createChannel(false);
            channel.basicQos(1);
            channel.basicConsume("foo", new DefaultConsumer(channel) {

                @Override
                public void handleDelivery(String consumerTag, Envelope envelope, BasicProperties properties,
                        byte[] body) throws IOException {
                    System.out.println("native " + new String(body));
                    channel.basicAck(envelope.getDeliveryTag(), false);
                    nativeCount++;
                    latch.countDown();
                }

            });
            latch.await(60, TimeUnit.SECONDS);
            System.out.println("Native: " + this.nativeCount + " LC: " + this.rlCount);
            channel.close();
            conn.close();
            container.stop();
        };
    }

    @Bean
    public SimpleMessageListenerContainer container(ConnectionFactory connectionFactory) {
        SimpleMessageListenerContainer container = new SimpleMessageListenerContainer(connectionFactory);
        container.setQueueNames("foo");
        container.setPrefetchCount(1);
        container.setAutoStartup(false);
        container.setMessageListener((MessageListener) m -> {
            System.out.println("LC " + new String(m.getBody()));
            this.rlCount++;
            this.latch.countDown();
        });
        return container;
    }

}

Native: 50 LC: 50

关于java - Spring AMQP AcknowledgeMode.AUTO 工作缓慢,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/47995535/

相关文章:

java - 如何解决这个类型删除?

java - 在单例类中并发调用方法

java - 使用ttf字体使文本宽度灵活

java - 为什么Method的getAnnotation方法需要类作为参数?

java - Spring表达式语言: Get absolute value

java - Wildfly 与 Tomcat 上的 fastxml.jackson 版本

java - hibernateTemplate bulkUpdate上的“缺少SET关键字”

java - ServiceBus over AMQP 关闭消费者

erlang - 使 RabbitMQ 代理上的非事件连接过期

c++ - 如何使用依赖于另一个静态库的静态库进行编译