java - Vert.x:处理 Rabbitmq 消费者背压而不丢弃新消息

标签 java rabbitmq rx-java message-queue vert.x

当您收到的消息超出您的处理能力时。解决方案是限制内部队列大小:

// Limit to max 300 messages
QueueOptions options = new QueueOptions()
  .setMaxInternalQueueSize(300);

RabbitMQClient client = RabbitMQClient.create(vertx, new RabbitMQOptions());

client.basicConsumer("my.queue", options, res -> {
  if (res.succeeded()) {
    System.out.println("RabbitMQ consumer created !");
    RabbitMQConsumer mqConsumer = res.result();
    mqConsumer.handler((RabbitMQMessage message) -> {
      System.out.println("Got message: " + message.body().toString());
    });
  } else {
    res.cause().printStackTrace();
  }
});

问题是当超过内部队列的队列容量时,新消息将被简单地丢弃

如何处理背压而不丢失任何消息?

最佳答案

我最近使用client.basicQos()函数来限制rabbit的消息消耗并且它运行顺利。

此函数的第一个参数确定预取消息的数量保留在您的消费者(或整个 channel )中,因此它应该像内部队列大小一样工作,但不会获取和丢弃消息。

关于java - Vert.x:处理 Rabbitmq 消费者背压而不丢弃新消息,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/56849365/

相关文章:

java - 如何中断 ExecutorService 线程

ubuntu - 在 Ubuntu 上安装 RabbitMq 时出现 Erlang 依赖问题

rabbitmq - RabbitMQ 中的 prefetchSize 是什么?

java - Observable.never() 函数的实际用途是什么?

android - 使用 MVVM android 进行导航

java - Android,拖放后删除拖动的项目

java - 使用 ImageJ 在 Java 应用程序中显示 dicom 图像

java - RX java 在添加项目时发出延迟的列表项目

java - 如何在java程序中将属性文件的所有属性作为初始化变量?

java - 与 Rabbitmq 建立连接的问题