java - JBoss EAP6 + HornetQ - 不确定如何创建到 HornetQ 的队列连接

标签 java spring jboss jms hornetq

我构建了一个 Spring JMS 解决方案,该解决方案是针对 JBoss 7.1.1 和外部 HornetQ 实现 2.2.1.4 构建的。此连接并成功运行。

但是我现在使用 EAP6,并尝试连接到 EAP6 中打包的内部 HornetQ。

我有一些管理连接和创建队列的类。但这些似乎都不适合连接到打包的 HornetQ - 对于连接到外部 HornetQ 来说效果很好。

我已经向 Redhat 提出了这个问题,他们不确定如何解决,因为这也需要 Spring 编码。

我遇到的问题是,我相信我需要创建一个QueueConnection,如QueueConnection qcon =queueConnectionFactory.createQueueConnection("user","password");

但是我们在Spring中实现它的方式是使用Spring JmsTemplate,而向其添加队列连接的概念不可用,所以它不起作用。

下面是包含所需 Spring beans 的 jms-services.xml 文件:

<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
    xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:context="http://www.springframework.org/schema/context"
    xmlns:task="http://www.springframework.org/schema/task" xmlns:tx="http://www.springframework.org/schema/tx"
    xsi:schemaLocation="http://www.springframework.org/schema/beans 
                        http://www.springframework.org/schema/beans/spring-beans.xsd
                        http://www.springframework.org/schema/context 
                        http://www.springframework.org/schema/context/spring-context.xsd
                        http://www.springframework.org/schema/task 
                        http://www.springframework.org/schema/task/spring-task.xsd
                        http://www.springframework.org/schema/tx
                        http://www.springframework.org/schema/tx/spring-tx-3.0.xsd">

    <context:annotation-config />

    <context:component-scan base-package="com.myproject.test" />

    <task:annotation-driven />

    <bean id="testTransactionManager"
        class="org.springframework.transaction.jta.JtaTransactionManager">
        <property name="transactionManagerName" value="java:/TransactionManager"></property>
        <property name="autodetectUserTransaction" value="false"></property>
    </bean>

    <tx:annotation-driven transaction-manager="testTransactionManager" />

    <bean id="queueConnectionFactory" class="com.myproject.test.impl.QueueConnectionFactoryImpl">
        <constructor-arg type="String" name="host" value="231.7.7.7" />
        <constructor-arg type="int" name="port" value="9876" />
        <constructor-arg type="boolean" name="useJta" value="true" />
        <constructor-arg type="boolean" name="useCluster" value="true" />
    </bean>

    <bean id="testQueueManager" class="com.myproject.test.impl.QueueManagerImpl">
        <constructor-arg ref="queueConnectionFactory" />
        <constructor-arg name="queue" value="TestQueue" />
    </bean>

</beans>

这是我的QueueConnectionFactoryImpl类:

package com.myproject.test.impl;

import java.util.HashMap;
import java.util.Map;

import javax.jms.ConnectionFactory;
import javax.jms.JMSException;

import org.hornetq.api.core.TransportConfiguration;
import org.hornetq.api.jms.HornetQJMSClient;
import org.hornetq.api.jms.JMSFactoryType;
import org.hornetq.core.remoting.impl.netty.NettyConnectorFactory;
import org.hornetq.core.remoting.impl.netty.TransportConstants;
import org.jboss.logging.Logger;

import com.myproject.test.QueueConnectionFactory;

public class QueueConnectionFactoryImpl implements QueueConnectionFactory {

    private String host;
    private int port;
    private ConnectionFactory connectionFactory;
    private Logger logger;
    private boolean useJta = false;

    public QueueConnectionFactoryImpl(String host, int port, boolean useJta)
    {
        this.useJta = useJta;
        createConnection(host, port);
    }

    public QueueConnectionFactoryImpl(String host, int port) {

        createConnection(host, port);
    }

    public QueueConnectionFactoryImpl(String host, int port, boolean useJta, boolean useCluster)
    {
        this.useJta = useJta;
        if(useCluster)
            createClusterConnection(host, port);
        else
            createConnection(host, port);
    }

    private void createConnection(String host, int port) {

        logger = Logger.getLogger(this.getClass());

        this.host = host;
        this.port = port;

        Map<String, Object> connectionParams = new HashMap<String, Object>();
        connectionParams.put(TransportConstants.PORT_PROP_NAME, port);
        connectionParams.put(TransportConstants.HOST_PROP_NAME, host);  

        TransportConfiguration transportConfiguration = new TransportConfiguration(NettyConnectorFactory.class.getName(),  connectionParams);

        JMSFactoryType jmsFType = JMSFactoryType.CF; 

        if(useJta)
            jmsFType = JMSFactoryType.XA_CF;

        connectionFactory = (ConnectionFactory) HornetQJMSClient.createConnectionFactoryWithoutHA(jmsFType, transportConfiguration);

    }

    private void createClusterConnection(String host, int port)
    {
        logger = Logger.getLogger(this.getClass());

        this.host = host;
        this.port = port;

        JMSFactoryType jmsFType = JMSFactoryType.CF; 

        if(useJta)
        jmsFType = JMSFactoryType.XA_CF;

        connectionFactory = (ConnectionFactory) HornetQJMSClient.createConnectionFactoryWithHA(new DiscoveryGroupConfiguration(host, port), jmsFType);

    }

    public QueueConnectionFactoryImpl(Object connectionFactory)
    {
        logger = Logger.getLogger(this.getClass());
        logger.debug("Object is: "+connectionFactory);
    }

    public String getHost() {
        return host;
    }

    public void setHost(String host) {
        this.host = host;
    }

    public int getPort() {
        return port;
    }

    public void setPort(int port) {
        this.port = port;
    }

    public ConnectionFactory getConnectionFactory() {
        return connectionFactory;
    }

    public void setConnectionFactory(ConnectionFactory connectionFactory) {
        this.connectionFactory = connectionFactory;
    }

    public boolean isUseJta() {
        return useJta;
    }

    public void setUseJta(boolean useJta) {
        this.useJta = useJta;
    }

}

这是我的QueueManagerImpl代码

package com.myproject.test.impl;

import javax.jms.ConnectionFactory;
import javax.jms.DeliveryMode;
import javax.jms.Queue;

import org.apache.log4j.Logger;
import org.hornetq.api.jms.HornetQJMSClient;
import org.springframework.jms.core.JmsTemplate;

import com.myproject.test.QueueManager;

public class QueueManagerImpl implements QueueManager {

    private String queue;
    private ConnectionFactory connectionFactory;
    private JmsTemplate template;
    private Queue jmsQueue;
    private boolean useJta = false;
    private static final Logger log = Logger.getLogger(QueueManagerImpl.class);

    public QueueManagerImpl(QueueConnectionFactory queueConnectionFactory) {

        template = new JmsTemplate();
        connectionFactory = queueConnectionFactory.getConnectionFactory();
        try
        {
            this.setUseJta(queueConnectionFactory.isUseJta());
            template.setConnectionFactory(connectionFactory);
            template.setExplicitQosEnabled(true);
            template.setDeliveryMode(DeliveryMode.PERSISTENT);
            if(queueConnectionFactory.isUseJta())
                template.setSessionTransacted(true);
        }
        catch(Exception ex)
        {
            logError(ex.toString());
        }
    }

    public QueueManagerImpl(QueueConnectionFactory queueConnectionFactory, String queue) {

        this(queueConnectionFactory);
        setQueue(queue);
    }

    public String getQueue() {

        return queue;
    }

    public void setQueue(String queue) {

        try
        {
            jmsQueue = HornetQJMSClient.createQueue(queue);
            template.setDefaultDestination(jmsQueue);
            this.queue = queue;
        }
        catch(Exception ex)
        {
            logError(ex.toString());
        }
    }

    public JmsTemplate getTemplate() {
        return template;
    }

    public void logError(String error)
    {
        String details = String.format("Unable to connect to queue, details: %s ", error);
        String errorMessage = String.format("error...", details);
        log.error(errorMessage);
    }

    @Override
    public boolean isUseJta() {
        return useJta;
    }

    @Override
    public void setUseJta(boolean useJta) {
        this.useJta = useJta;
    }
}
<小时/>

最主要的是,上面的代码需要将 ConenctionFactory 对象传递给 QueueManagerImpl 中的 JmsTemplate - template.setConnectionFactory(connectionFactory);。

我尝试了几种方法来使其发挥作用:

1) 将以下内容添加到 jsm-service.xml 文件中:

<bean id="myConnectionFactory" class="org.springframework.jms.connection.UserCredentialsConnectionFactoryAdapter">
    <property name="targetConnectionFactory" ref="queueConnectionFactory"/>
    <property name="username" value="myuser"/>
    <property name="password" value="myuser123"/>
</bean>     

这会产生以下异常:

java.lang.IllegalStateException: Cannot convert value of type [com.myproject.test.impl.QueueConnectionFactoryImpl] to required type [javax.jms.ConnectionFactory] for property 'targetConnectionFactory': no matching editors or conversion strategy found

2) 将 QueueConnectionFactoryImpl 中的连接更改为:

org.hornetq.jms.client.HornetQConnectionFactory HQConnectionFactory = (HornetQConnectionFactory) HornetQJMSClient.createConnectionFactoryWithoutHA(jmsFType, transportConfiguration);

try {
    connectionFactory = (ConnectionFactory) HQConnectionFactory.createConnection("myuser","myuser123");
} catch (JMSException e) {
    // TODO Auto-generated catch block
    e.printStackTrace();
}

这也行不通。我得到一个异常(exception):

java.lang.ClassCastException: org.hornetq.jms.client.HornetQConnection cannot be cast to javax.jms.ConnectionFactory

<小时/>

简而言之,任何人都可以帮助我通过以某种方式提供用户名和密码来让我的上述代码连接到 HornetQ,以便我仍然可以使用 JmsTemplate。

最佳答案

只需使用您自己的连接工厂即可。它对我有用:

spring bean(5445是hornetq接受器端口):

<bean name="jmsConnectionFactory" class="messaging.jms.CustomHornetQJMSConnectionFactory">
    <constructor-arg name="ha" value="false" /> <!-- set true if you want support failover -->
    <constructor-arg name="commaSepratedServerUrls" value="127.0.0.1:5445" />
    <property name="username" value="admin" />
    <property name="password" value="admin" />
</bean>

连接工厂实现(使用来自 hornetq-jms-client 的 HornetQJMSConnectionFactory 和来自 hornetq-core-client 的 TransportConfiguration):

public class CustomHornetQJMSConnectionFactory extends org.hornetq.jms.client.HornetQJMSConnectionFactory
{
    private static final long serialVersionUID = 1L;

    private String username;
    private String password;

    public CustomHornetQJMSConnectionFactory(boolean ha, String commaSepratedServerUrls)
    {
        super(ha, converToTransportConfigurations(commaSepratedServerUrls));
    }

    public static TransportConfiguration[] converToTransportConfigurations(String commaSepratedServerUrls)
    {   
        String [] serverUrls = commaSepratedServerUrls.split(",");
        TransportConfiguration[] transportconfigurations = new TransportConfiguration[serverUrls.length];
        for(int i = 0; i < serverUrls.length; i++)
        {
            String[] urlParts = serverUrls[i].split(":");
            HashMap<String, Object> map = new HashMap<String,Object>();
            map.put(TransportConstants.HOST_PROP_NAME, urlParts[0]);
            map.put(TransportConstants.PORT_PROP_NAME, urlParts[1]);
            transportconfigurations[i] = new TransportConfiguration(NettyConnectorFactory.class.getName(), map);
        }
        return transportconfigurations;
    }

    @Override
    public Connection createConnection() throws JMSException 
    {
        return super.createConnection(username, password);
    }
    public String getUsername() {
        return username;
    }
    public void setUsername(String username) {
        this.username = username;
    }
    public String getPassword() {
        return password;
    }
    public void setPassword(String password) {
        this.password = password;
    }
}

现在,如果将此连接工厂提供给 jmsTemplate,则可以使用 user/pass 来发送/消费消息

关于java - JBoss EAP6 + HornetQ - 不确定如何创建到 HornetQ 的队列连接,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/15835711/

相关文章:

java - 我试图根据用户输入设置整数范围

spring - application.yml 中的环境变量不适用于以空格开头的默认值

Spring Boot 中的 Spring 安全性

java - 在 Spring 配置中,用 XML 格式编写 Java 代码有什么好处?

java - 将用户身份验证详细信息从一台服务器传递到另一台服务器

java - JBoss EAP 7 和 Ifinispan

java - Spring boot Hadoop、Webhdfs 和 Apache Knox

Java Observer 和 Observable 在应用程序之间无法正常工作

java - 解析 HTML - 仅获取表行的子集

java - Jax-RS 中的可选 @PathParam