java - JMS - 从一个消费者到多个消费者

标签 java jms message-queue messaging

我有一个 JMS 客户端,它正在生成消息并通过 JMS 队列发送给它的唯一消费者。

我想要的是不止一个消费者收到这些消息。我首先想到的是将队列转换为主题,以便当前和新的消费者可以订阅并将相同的消息传递给他们。

这显然将涉及在生产者和消费者方面修改当前客户端代码。

我还想看看其他选项,例如创建第二个队列,这样我就不必修改现有的消费者。我相信这种方法有一些优点,比如(如果我错了,请纠正我)平衡两个不同队列而不是一个队列之间的负载,这可能会对性能产生积极影响。

我想就您可能会看到的这些选项和缺点/优点获得建议。非常感谢任何反馈。

最佳答案

正如你所说,你有几个选择。

如果将其转换为主题以获得相同的效果,则需要使消费者成为持久消费者。如果您的消费者不活着,队列提供的一件事是持久性。这取决于您使用的 MQ 系统。

如果您想坚持使用队列,您将为每个消费者创建一个队列,并创建一个监听原始队列的调度程序。

Producer -> Queue_Original <- Dispatcher -> Queue_Consumer_1 <- Consumer_1
                                         -> Queue_Consumer_2 <- Consumer_2
                                         -> Queue_Consumer_3 <- Consumer_3

主题的优点

  • 更容易动态添加新的消费者。所有消费者无需任何工作即可收到新消息。
  • 您可以创建循环主题,这样 Consumer_1 将收到一条消息,然后是 Consumer_2,然后是 Consumer_3
  • 可以向消费者推送新消息,而不必查询队列以使其 react 。

主题的缺点

  • 除非您的代理支持此配置,否则消息不会持久化。如果消费者离线并回来,则可能会丢失消息,除非设置了持久消费者。
  • 难以让 Consumer_1 和 Consumer_2 接收消息,但不能让 Consumer_3 接收消息。使用 Dispatcher 和 Queues,Dispatcher 无法将消息放入 Consumer_3 的队列中。

队列的优点

  • 在消费者删除它们之前,消息是持久的
  • 调度程序可以通过不将消息放入相应的消费者队列来过滤哪些消费者获得了哪些消息。不过,这可以通过过滤器处理主题。

队列的缺点

  • 需要创建额外的队列来支持多个消费者。在动态环境中,这不会是有效的。

在开发消息传递系统时,我更喜欢主题,因为它给了我最大的权力,但是鉴于您已经在使用队列,它需要您更改系统的工作方式来实现主题。

多消费者队列系统的设计与实现

Producer -> Queue_Original <- Dispatcher -> Queue_Consumer_1 <- Consumer_1
                                         -> Queue_Consumer_2 <- Consumer_2
                                         -> Queue_Consumer_3 <- Consumer_3

来源

请记住,您还需要处理其他一些事情,例如问题异常处理、重新连接到连接以及在失去连接时排队等。这只是为了让您了解如何处理完成我所描述的。

在真实系统中,我可能不会在第一个异常时退出。我会允许系统继续尽其所能地运行并记录错误。在这段代码中,如果将消息放入单个消费者队列失败,整个调度程序将停止。

Dispatcher.java

/*
 * To change this template, choose Tools | Templates
 * and open the template in the editor.
 */
package stackoverflow_4615895;

import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageConsumer;
import javax.jms.MessageProducer;
import javax.jms.Queue;
import javax.jms.QueueConnection;
import javax.jms.QueueConnectionFactory;
import javax.jms.QueueSession;
import javax.jms.Session;

public class Dispatcher {

    private static long QUEUE_WAIT_TIME = 1000;
    private boolean mStop = false;
    private QueueConnectionFactory mFactory;
    private String mSourceQueueName;
    private String[] mConsumerQueueNames;

    /**
     * Create a dispatcher
     * @param factory
     *      The QueueConnectionFactory in which new connections, session, and consumers
     *      will be created. This is needed to ensure the connection is associated
     *      with the correct thread.
     * @param source
     *
     * @param consumerQueues
     */
    public Dispatcher(
        QueueConnectionFactory factory, 
        String sourceQueue, 
        String[] consumerQueues) {

        mFactory = factory;
        mSourceQueueName = sourceQueue;
        mConsumerQueueNames = consumerQueues;
    }

    public void start() {
        Thread thread = new Thread(new Runnable() {

            public void run() {
                Dispatcher.this.run();
            }
        });
        thread.setName("Queue Dispatcher");
        thread.start();
    }

    public void stop() {
        mStop = true;
    }

    private void run() {

        QueueConnection connection = null;
        MessageProducer producer = null;
        MessageConsumer consumer = null;
        QueueSession session = null;
        try {
            // Setup connection and queues for receiving the messages
            connection = mFactory.createQueueConnection();
            session = connection.createQueueSession(false, Session.DUPS_OK_ACKNOWLEDGE);
            Queue sourceQueue = session.createQueue(mSourceQueueName);
            consumer = session.createConsumer(sourceQueue);

            // Create a null producer allowing us to send messages
            // to any queue.
            producer = session.createProducer(null);

            // Create the destination queues based on the consumer names we
            // were given.
            Queue[] destinationQueues = new Queue[mConsumerQueueNames.length];
            for (int index = 0; index < mConsumerQueueNames.length; ++index) {
                destinationQueues[index] = session.createQueue(mConsumerQueueNames[index]);
            }

            connection.start();

            while (!mStop) {

                // Only wait QUEUE_WAIT_TIME in order to give
                // the dispatcher a chance to see if it should
                // quit
                Message m = consumer.receive(QUEUE_WAIT_TIME);
                if (m == null) {
                    continue;
                }

                // Take the message we received and put
                // it in each of the consumers destination
                // queues for them to process
                for (Queue q : destinationQueues) {
                    producer.send(q, m);
                }
            }

        } catch (JMSException ex) {
            // Do wonderful things here 
        } finally {
            if (producer != null) {
                try {
                    producer.close();
                } catch (JMSException ex) {
                }
            }
            if (consumer != null) {
                try {
                    consumer.close();
                } catch (JMSException ex) {
                }
            }
            if (session != null) {
                try {
                    session.close();
                } catch (JMSException ex) {
                }
            }
            if (connection != null) {
                try {
                    connection.close();
                } catch (JMSException ex) {
                }
            }
        }
    }
}

Main.java

    QueueConnectionFactory factory = ...;

    Dispatcher dispatcher =
            new Dispatcher(
            factory,
            "Queue_Original",
            new String[]{
                "Consumer_Queue_1",
                "Consumer_Queue_2",
                "Consumer_Queue_3"});
    dispatcher.start();

关于java - JMS - 从一个消费者到多个消费者,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/4615744/

相关文章:

java - Web Script/share/page/api/javascript/debugger 已响应状态 500

azure - 如何从 Spring JMS 设置 Azure ServiceBus 的 ContentType

jms - JMS中基于定义时间的消息调度/消费

java - XmlBlaster 的优点和缺点

安卓2.1 : Multiple Handlers in a Single Activity

java - TFS eclipse 插件无法 checkin 跨项目内多个文件夹的更改

java - Java 中 Hashmap 的可变性

JMS主题订阅Tomee 1.7.1

queue - 分布式限速

java - 在servlet 3.0中使用Log4j2