java - 无法同时从 ActiveMQ 嵌入式代理进行消费

标签 java jms activemq

我正在寻找一种方法,将 10 条消息发布到 ActiveMQ 嵌入式代理,并在同一虚拟机上使用 JMS API 同时使用它们。

下面的代码存在某种竞争,因为有时它会并行消耗 2、4、8 条消息,并挂起直到 latch.await 调用超时。

public final class ActiveMQJMSParallelTest {
    private static final Logger logger = LoggerFactory.getLogger(ActiveMQJMSParallelTest.class);
    private static final int numberOfMessages = 10;

    public static void main(final String[] args) throws Exception {
        final Properties props = new Properties();
        props.setProperty(Context.INITIAL_CONTEXT_FACTORY, "org.apache.activemq.jndi.ActiveMQInitialContextFactory");
        props.setProperty(Context.PROVIDER_URL, "vm://localhost?broker.persistent=false");
        props.setProperty("queue.parallelQueue", "parallelQueue");
        final Context jndiContext = new InitialContext(props);
        final ConnectionFactory connectionFactory = (ConnectionFactory) jndiContext.lookup("ConnectionFactory");
        final Destination destination = (Destination) jndiContext.lookup("parallelQueue");
        final Connection connection = connectionFactory.createConnection();
        Session session = null;
        try {
            session = connection.createSession(true, Session.AUTO_ACKNOWLEDGE);
            final MessageProducer producer = session.createProducer(destination);
            for (int i = 0; i < numberOfMessages; i++) {
                final TextMessage message = session.createTextMessage();
                message.setText("This is message " + (i + 1));
                producer.send(message);
                logger.info("Produced message: {}", message);
            }
            session.commit();
        } finally {
            if (session != null)
                session.close();
        }

        final CountDownLatch latch = new CountDownLatch(numberOfMessages);
        final ExecutorService pool = Executors.newFixedThreadPool(numberOfMessages);
        for (int i = 0; i < numberOfMessages; i++) {
            pool.submit(new Runnable() {
                @Override public void run() {
                    try {
                        final Connection connection = connectionFactory.createConnection();
                        connection.start();
                        final Session session = connection.createSession(false, Session.CLIENT_ACKNOWLEDGE);
                        final Queue destination = session.createQueue("parallelQueue");
                        final MessageConsumer consumer = session.createConsumer(destination);
                        final Message received = consumer.receive();
                        logger.info("Consuming message: {}", received);
                        latch.countDown();
                        latch.await(1, TimeUnit.MINUTES);
                        logger.info("Consumed message: {}", received);
                        session.close();
                        connection.close();
                    } catch(Exception e) {
                        e.printStackTrace();
                    }
                }
            });
        }
        latch.await(10, TimeUnit.MINUTES);
        jndiContext.close();
    }
}

有人可以为这个任务想出工作代码吗?

最佳答案

如果您想确保每个消费者都有机会一次获取一条消息,那么您应该使用预取值为零,这样代理就不会在第一个消费者到达时尝试分派(dispatch)达到预取限制的消息,依此类推。

看看预取是如何在 documentation 上工作的页。

关于java - 无法同时从 ActiveMQ 嵌入式代理进行消费,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/28287635/

相关文章:

java - spring security在哪里取值来替换通配符?

Java - 允许用户将节点添加到链表的方法

java - 对于独立应用程序(对于 Spring JMS),Java main 方法应该是什么?

node.js - 无法使用 Node.js 连接到 Apache ActiveMQ

java - 从 jquery 序列化数组发布 jsp servlet 数组

java - Runnable 实现的类是什么样子的?

java - 部署在使用 JMS/AMQP 消息的 Cloud Foundry 上的应用程序

java - 如何从命令行搜索 Jar 以查看它是否包含类

java - RabbitMQ 真的可以与 Amazon MQ (ActiveMQ) 配合使用吗?

java - ActiveMQ 消费者挂起