java - 如何配置 RabbitMQ 为多个消费者平等地提供多个队列

标签 java concurrency rabbitmq message-queue

我有一个设置了多个队列的 RabbitMQ 代理。在客户端(Java),我有多个消费者,它们都在监听他们的队列,如下所示:

QUEUE_1 -> DataConsumer1; QUEUE_2 -> 数据消费者2 ...

它们都使用一个连接但不同的 channel 。 发生的情况是,当我加载所有队列并启动应用程序代理时,它会先服务一个队列,然后再服务另一个队列,依此类推。因此,消息当时由各自的消费者队列接收。 我还想提一下,我使用预取计数 1 来尝试实现消费者流量的公平分配。

我怎样才能做到这一点,以便所有队列都得到平等的服务。

编辑:这是创建消费者的代码(非常基本)

import com.rabbitmq.client.*;

import org.apache.log4j.Logger;

import java.io.IOException;
import java.util.UUID;
import java.util.concurrent.TimeoutException;

/**
 * Used for consuming and acknowledging messages from defined queue.
 *
 */
public class Consumer {
    private final static Logger logger = Logger.getLogger(Consumer.class);
    // Maximum number of messages that can be on the consumer at a time
    private static int prefetchCount = 1;

    // Internal enum which contains queue names and their exchange keys
    private Queue queue;
    private Channel channel;
    private String consumerTag;
    private String uuid = UUID.randomUUID().toString();
    private boolean subscribed = false;
    private DeliverCallback deliverCallback = this::handleDeliver;
    private CancelCallback cancelCallback = this::handleCancel;
    private ConsumerShutdownSignalCallback consumerShutdownSignalCallback = this::handleShutdown;

    /**
     * The constructors sets the channel to RabbitMQ broker for the specified queue.
     * Callback for events are set to their default implementation.
     *
     * @param queue RabbitMQ queue - this consumer will be assigned to this queue and will only be able to consume from it.
     * @see #setDeliverCallback(DeliverCallback)
     * @see #setCancelCallback(CancelCallback)
     * @see #setConsumerShutdownSignalCallback(ConsumerShutdownSignalCallback)
     */
    public Consumer(Queue queue) {
        this.queue = queue;

        try {
            setUpChannel();

        } catch (IOException e) {
            e.printStackTrace();

        }
    }

    public Class getEntityClass() {
        return Queue.getEntityClassForQueue(queue);
    }

    public String getUuid() {
        return uuid;
    }

    public boolean isSubscribed() {
        return subscribed;
    }

    public DeliverCallback getDeliverCallback() {
        return deliverCallback;
    }

    public void setDeliverCallback(DeliverCallback deliverCallback) {
        this.deliverCallback = deliverCallback;
    }

    public CancelCallback getCancelCallback() {
        return cancelCallback;
    }

    public void setCancelCallback(CancelCallback cancelCallback) {
        this.cancelCallback = cancelCallback;
    }

    public ConsumerShutdownSignalCallback getConsumerShutdownSignalCallback() {
        return consumerShutdownSignalCallback;
    }

    public void setConsumerShutdownSignalCallback(ConsumerShutdownSignalCallback consumerShutdownSignalCallback) {
        this.consumerShutdownSignalCallback = consumerShutdownSignalCallback;
    }


    /**
     * <p>
     * Subscribes to the set queue. The subscription can be cancelled using
     * Checks if the queue is set up properly.
     * </p>
     * <p>
     * Note: this is a non-blocking operation. The client will listen for incoming messages and handle them using
     * the provided DeliveryCallback function but the execution of this operation will be on another thread.
     * </p>
     *
     * @throws IOException if I/O problem is encountered.
     */
    public void subscribeToQueue() throws IOException {
        if (channel != null) {
            consumerTag = channel.basicConsume(
                    queue.getQueueName(),
                    deliverCallback,
                    cancelCallback,
                    consumerShutdownSignalCallback
            );
            subscribed = true;

        } else {
            logger.error("Channel does not exist. Unable to consume message.");

        }
    }

    /**
     * Confirms the message has been successfully processed.
     *
     * @param deliveryTag Unique message tag generated by the server.
     * @throws IOException if I/O problem is encountered.
     */
    public void acknowledgeMessageReceived(long deliveryTag) throws IOException {
        if (channel != null) {
            channel.basicAck(deliveryTag, false);

        } else {
            logger.error("Channel does not exist. Unable to acknowledge message delivery.");

        }
    }

    /**
     * Sends a negative acknowledgement to RabbitMQ without re-queueing the message.
     *
     * @param deliveryTag Unique message tag generated by the server.
     * @throws IOException if I/O problem is encountered.
     */
    public void rejectMessage(long deliveryTag) throws IOException {
        if (channel != null) {
            channel.basicReject(deliveryTag, false);

        } else {
            logger.error("Channel does not exist. Unable to reject message delivery.");

        }
    }

    /**
     * Cancels consumer subscription to the queue.
     * The consumer can be used for acknowledging messages, but will not receive new messages.
     * This does not close the underlying channel. To close the channel use closeChannel() method.
     *
     * @throws IOException
     * @see #subscribeToQueue()
     * @see #closeChannel()
     */
    public void cancelSubscription() throws IOException {
        if (channel != null) {
            channel.basicCancel(this.consumerTag);
            subscribed = false;

        } else {
            logger.error("Channel does not exist. Unable to cancel consumer subscription.");
        }
    }

    /**
     * Explicitly closes channel to the queue.
     * After doing this you will not be able to use any of the methods of this class.
     *
     * @throws IOException      if I/O problem is encountered.
     * @throws TimeoutException if connection problem occurs.
     */
    public void closeChannel() throws IOException, TimeoutException {
        if (channel != null) {
            channel.close();
            channel = null;
            logger.info("Closing RabbitMQ consumer channel...");

        } else {
            logger.error("Channel already closed.");

        }
    }

    /**
     * Checks if the queue exists and creates the channel.
     * If the queue does not exist channel is set to null and cannot be used.
     *
     * @throws IOException if I/O problem is encountered.
     */
    private void setUpChannel() throws IOException {

        channel = ChannelFactory.getInstance().createChannel();
        try {
            channel.queueDeclarePassive(queue.getQueueName());
            channel.basicQos(prefetchCount);

        } catch (IOException e) {
            // When this exception occurs it renders the channel unusable so it's best set to null.
            channel = null;

            logger.error(String.format("Queue %s does not exist [%s]", queue.getQueueName(), e.getMessage()));
            e.printStackTrace();

        }
        logger.info("Setting up RabbitMQ consumer channel. Channel successfully initialized: " + (channel != null));
    }

    /**
     * Callback called when a message is delivered to the client.
     * Default implementation. Callback acknowledges message received and does nothing with it.
     * To use custom implementation use setDeliverCallback method.
     *
     * @param consumerTag The consumer tag associated with the consumer.
     * @param message     Message object.
     * @see #setDeliverCallback(DeliverCallback)
     */
    private void handleDeliver(String consumerTag, Delivery message) {
        Envelope envelope = message.getEnvelope();
        long deliveryTag = envelope.getDeliveryTag();

        logger.info("Message delivered: " + deliveryTag);

        try {
            channel.basicAck(deliveryTag, false);

        } catch (IOException e) {
            e.printStackTrace();

        }
    }

    /**
     * Callback called when a service is cancelled.
     * Default implementation. To use custom implementation specify it in the constructor.
     *
     * @param consumerTag The consumer tag associated with the consumer.
     */
    private void handleCancel(String consumerTag) {
        logger.info("Consumer (" + consumerTag + ") cancelled: ");
    }

    /**
     * Called when the consumer is abruptly shutdown due to termination of the underlying connection or channel.
     * Default implementation. To use custom implementation specify it in the constructor.
     *
     * @param consumerTag The consumer tag associated with the consumer.
     * @param exception   Shutdown reason.
     */
    private void handleShutdown(String consumerTag, ShutdownSignalException exception) {
        logger.info(String.format("Consumer (%s) shutdown. Reason: %s", consumerTag, exception.getMessage()));
        logger.info(exception);
    }
}

最佳答案

更新:已解决,显然我的预取计数未设置,因此它是无限的。这就是为什么流量被锁定在一个 channel 上,直到队列耗尽为止。

关于java - 如何配置 RabbitMQ 为多个消费者平等地提供多个队列,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/57162592/

相关文章:

在Android中,当OP_WRITE存在时,Java nio OP_READ不执行?

java - Java中的StoreStore内存屏障是否禁止读写重新排序?

postgresql - 警告 : concurrent delete in progress within table while creating index in big table in Postgres 9. 2

java - Java中同步方法的死锁

python - 如何使用 celery 从 rabbit-mq 服务器获取消息?

java - 如何在Javafx的HBox中设置自己的调整大小优先级

java - 跳过 While 条件的 Java Do/While 循环

concurrency - 使用Clojure DataFlow编程习惯

logging - RabbitMQ SASL 日志记录

Python Kombu 消费者未收到 rabbitmq 消息通知(queue.get 有效)