jakarta-ee - HornetQ:如何重用 XAConnection 和 XASession

标签 jakarta-ee jboss jms hornetq xa

我在尝试在 JBoss 应用程序中的多个工作人员上重用 XAConnection 和 XASession 时遇到了一些问题。我设法将问题简化为一种方法。它应该能够使用相同的连接和 session 来生产和消费一条消息。目前我的应用程序有很多队列和工作人员,每个工作人员当前正在启动和启动每个自己的连接和 session ,而不是共享它。那不应该吗?

这是我的代码示例:

import org.apache.log4j.Logger;
import javax.annotation.PostConstruct;
import javax.annotation.PreDestroy;
import javax.ejb.Singleton;
import javax.ejb.Startup;
import javax.jms.*;
import javax.jms.Queue;
import javax.naming.InitialContext;

@Singleton
@Startup
public class QueueTest {

    private Logger logger = Logger.getLogger(QueueTest.class);

    @PostConstruct
    public void startup() {
        try {
            String queue = "queue/Queue1";
            String message = "test";

            //setting up connection
            InitialContext iniCtx = new InitialContext();
            XAConnectionFactory qcf = (XAConnectionFactory) iniCtx.lookup("java:/JmsXA");
            XAConnection connection = qcf.createXAConnection();
            connection.start();
            logger.debug("creating connection at " + new java.util.Date());

            //setting up session
            XASession session = connection.createXASession();
            logger.debug("creating session at " + new java.util.Date());

            //find the queue
            Object queueObj = iniCtx.lookup(queue);
            Queue jmsQueue = (javax.jms.Queue)queueObj;

            //adding message to queue
            javax.jms.MessageProducer producer = session.createProducer(jmsQueue);
            javax.jms.TextMessage textMessage = session.createTextMessage(message);
            producer.send(textMessage);
            producer.close();
            logger.debug("Message added to queue");

            //receiving message from queue
            javax.jms.MessageConsumer consumer = session.createConsumer(jmsQueue);
            javax.jms.TextMessage messageReceived = (javax.jms.TextMessage)consumer.receive(5000);

            if (messageReceived==null)
                throw new Exception("No message reveived");

            logger.debug("Got message:"+messageReceived.getText());
            consumer.close();
        }
        catch(Exception e) {
            logger.debug("Error: " + e.getMessage(), e);
        }
    }

    @PreDestroy
    public void shutdown() {

    }
}

结果是这样的输出:
11:47:17,905 DEBUG [QueueTest] (MSC service thread 1-8) creating connection at Thu Sep 05 11:47:17 CEST 2013
11:47:18,041 DEBUG [QueueTest] (MSC service thread 1-8) creating session at Thu Sep 05 11:47:18 CEST 2013
11:47:18,065 DEBUG [QueueTest] (MSC service thread 1-8) Message added to queue
11:47:23,081 DEBUG [QueueTest] (MSC service thread 1-8) Error: No message reveived

如您所见,消费者没有收到任何消息。为什么?

编辑 1:
package dk.energimidt.uapi.zigbee.services;

import org.apache.log4j.Logger;

import javax.ejb.Stateless;
import javax.ejb.TransactionAttribute;
import javax.ejb.TransactionAttributeType;
import javax.jms.Queue;
import javax.jms.XAConnection;
import javax.jms.XAConnectionFactory;
import javax.jms.XASession;
import javax.naming.InitialContext;

@TransactionAttribute(TransactionAttributeType.REQUIRED)
@Stateless
public class QueueTestWorkerBean implements QueueTestWorker {

    private Logger logger = Logger.getLogger(QueueTestWorkerBean.class);

    public void run() {
        try {
            String queue = "queue/Queue1";
            String message = "test";

            //setting up connection
            InitialContext iniCtx = new InitialContext();
            XAConnectionFactory qcf = (XAConnectionFactory) iniCtx.lookup("java:/JmsXA");
            XAConnection connection = qcf.createXAConnection();
            connection.start();
            logger.debug("creating connection at " + new java.util.Date());

            //setting up session
            XASession session = connection.createXASession();
            logger.debug("creating session at " + new java.util.Date());

            //find the queue
            Object queueObj = iniCtx.lookup(queue);
            Queue jmsQueue = (javax.jms.Queue)queueObj;

            //adding message to queue
            javax.jms.MessageProducer producer = session.createProducer(jmsQueue);
            javax.jms.TextMessage textMessage = session.createTextMessage(message);
            producer.send(textMessage);
            producer.close();
            session.commit();
            logger.debug("Message added to queue");

            //receiving message from queue
            javax.jms.MessageConsumer consumer = session.createConsumer(jmsQueue);
            javax.jms.TextMessage messageReceived = (javax.jms.TextMessage)consumer.receive(5000);

            if (messageReceived==null)
                throw new Exception("No message reveived");

            logger.debug("Got message:"+messageReceived.getText());
            consumer.close();

            connection.close();
        }
        catch(Exception e) {
            logger.debug("Error: " + e.getMessage(), e);
        }
    }
}

现在我在 Session.Commit() 上得到一个异常:
10:46:03,697 DEBUG [QueueTestWorkerBean] (MSC service thread 1-14) creating connection at Tue Sep 17 10:46:03 CEST 2013
10:46:04,343 DEBUG [QueueTestWorkerBean] (MSC service thread 1-14) creating session at Tue Sep 17 10:46:04 CEST 2013
10:46:04,355 DEBUG [QueueTestWorkerBean] (MSC service thread 1-14) Error: XA connection: javax.jms.TransactionInProgressException: XA connection
    at org.hornetq.ra.HornetQRASession.commit(HornetQRASession.java:386)
    at QueueTestWorkerBean.run(QueueTestWorkerBean.java:45) [library-1.0.0.jar:]

最佳答案

我在那里看到一些(实际上是 2 个)混淆:

i - 您正在使用 XA session ,但您没有声明任何事务边界……这通常是在 session Bean 和 MDB 上完成的。我不确定你可以在这个无状态的人上做到这一点。

如果您不使用任何声明性事务,则必须手动登记 XID。

ii - jmsXA 是默认的资源适配器连接工厂。它上面已经有一个游泳池。因此,每当您创建新 session 时,您都会从池中取出。当您关闭它时,您将把它放回游泳池。

您可以使用常规的连接工厂。就在 InVMConnectionFactory 中(或者在 PooledConnectionFactories 之外,假设您在 JBoss 上,或者您在独立设备上定义的任何内容……然后只使用常规 JMS。

即使是常规的连接工厂也可以与 XA 一起使用,但在这种情况下,您需要确保直接使用事务管理器的 api 来登记它。

如果您使用常规连接工厂,则可以根据需要保持连接。

请让我知道它是怎么回事,我会帮助你。我知道你开始赏金了..但我会免费回答它:)

我在 EJB 教程中找不到任何关于在单例中使用事务的示例。

我建议您通过 Statless 或 Stateful Session Bean 使用它,然后将 @TransactionAttribute 应用于 Bean。

Java EE 6 教程有一些关于它的很好的信息:

http://docs.oracle.com/javaee/6/tutorial/doc/bncij.html

请注意,在您提交之前,消息将不可用。因此,如果您在交易中发送消息,您将无法在同一个交易中接收到它。

在您的 edit1 示例中,您正在发送一条消息并在同一个事务中使用它。这将不起作用,因为您首先需要提交生产方法,然后才能使用它。在这种情况下,您将需要两个事务,因此 Edit1 已损坏。

另外:确保最后关闭连接。由于您使用的是 JmsXA(或池化连接工厂),您将由应用程序服务器自动完成轮询。

关于jakarta-ee - HornetQ:如何重用 XAConnection 和 XASession,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/18633281/

相关文章:

jboss - 作业完成后,Jenkins杀死JBoss服务器

JavaEE :links to learn JBoss/EJB and JMS

java - 如何确定 JMS 连接是否存在?

java - 在构造函数或后方法中初始化 CDI 注入(inject)组件?

java - UserTransaction.SetTransactionTimeout 不起作用?

java - 从servlet获取图像到JSP

带有 MongoDB 的 Java 堆空间

java - Windows 更新时 JBoss 服务关闭

java - 在 spring MVC 中在此服务器上找不到请求的 URL/保存

java - 不稳定的 javax.jms.JMSException : Peer disposed