java - 我有一个使用 http 和 https 协议(protocol)的 JMS - ActiveMQ

标签 java activemq

我的制作人:

import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.Destination;
import javax.jms.JMSException;
import javax.jms.MessageProducer;
import javax.jms.Session;
import javax.jms.TextMessage;

import org.apache.activemq.ActiveMQConnection;
import org.apache.activemq.ActiveMQConnectionFactory;

public class Producer {
    //127.0.0.1 loop back to localhost
private static String url="http://127.0.0.1:61616";
private static String subject="DemoQueue4";

public static void main(String[] args)throws JMSException {
        ConnectionFactory cf=null;
        Connection con = null;
        try {

        // getting jms connection from the server and starting it.
            System.out.println("Please wait connecting...");
        cf=new ActiveMQConnectionFactory(url);
        con=cf.createConnection();
        System.out.println("Successfully Connected \n");

        System.out.println("Please wait creating session...");
        Session s=con.createSession(false,Session.AUTO_ACKNOWLEDGE);    
        System.out.println("Session created \n");

        System.out.println("Please wait Creating Queue...");
        // create queue
        Destination d;
        d=s.createQueue(subject);
        System.out.println("Queue created \n");

        System.out.println("Please wait Creating Producer...");
        // create producer/sender
        MessageProducer mp;
        mp=s.createProducer(d);
        System.out.println("Producer created \n");

        System.out.println("Please wait Connection Starting...");
        con.start();
        System.out.println("Connection Started \n");

        System.out.println("Please wait Creating TextMessage..");
        // We will send a small text message saying 'Hello' in Japanese
        TextMessage message = s.createTextMessage("Hi How are you!");
        System.out.println("TextMessage Created \n");

        System.out.println("Please wait TextMessage Sending...");
        // Here we are sending the message!
        mp.send(message);
        System.out.println("TextMessage Sent '" + message.getText() + "'");
        System.out.println("Success");          

        }catch(Exception e) {
            e.printStackTrace();
        }finally{
            if(con!=null){
                con.close();
            }
        }

    }
}
/**i am trying to insert the text message to queue but it is not inserting**/

我的代理配置 - activemq.xml:

/**this is my config file of activemq.xml**/

<beans
  xmlns="http://www.springframework.org/schema/beans"
  xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
  xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd
  http://activemq.apache.org/schema/core http://activemq.apache.org/schema/core/activemq-core.xsd">

    <!-- Allows us to use system properties as variables in this configuration file -->
    <bean class="org.springframework.beans.factory.config.PropertyPlaceholderConfigurer">
        <property name="locations">
            <value>file:${activemq.conf}/credentials.properties</value>
        </property>
    </bean>

    <!-- Allows log searching in hawtio console -->
    <bean id="logQuery" class="org.fusesource.insight.log.log4j.Log4jLogQuery"
          lazy-init="false" scope="singleton"
          init-method="start" destroy-method="stop">
    </bean>

    <!--
        The <broker> element is used to configure the ActiveMQ broker.
    -->
    <broker xmlns="http://activemq.apache.org/schema/core" brokerName="localhost" dataDirectory="${activemq.data}">

        <destinationPolicy>
            <policyMap>
              <policyEntries>
                <policyEntry topic=">" >
                    <!-- The constantPendingMessageLimitStrategy is used to prevent
                         slow topic consumers to block producers and affect other consumers
                         by limiting the number of messages that are retained
                         For more information, see:

                         http://activemq.apache.org/slow-consumer-handling.html

                    -->
                  <pendingMessageLimitStrategy>
                    <constantPendingMessageLimitStrategy limit="1000"/>
                  </pendingMessageLimitStrategy>
                </policyEntry>
              </policyEntries>
            </policyMap>
        </destinationPolicy>


        <!--
            The managementContext is used to configure how ActiveMQ is exposed in
            JMX. By default, ActiveMQ uses the MBean server that is started by
            the JVM. For more information, see:

            http://activemq.apache.org/jmx.html
        -->
        <managementContext>
            <managementContext createConnector="false"/>
        </managementContext>

        <!--
            Configure message persistence for the broker. The default persistence
            mechanism is the KahaDB store (identified by the kahaDB tag).
            For more information, see:

            http://activemq.apache.org/persistence.html
        -->
        <persistenceAdapter>
            <kahaDB directory="${activemq.data}/kahadb"/>
        </persistenceAdapter>


          <!--
            The systemUsage controls the maximum amount of space the broker will
            use before disabling caching and/or slowing down producers. For more information, see:
            http://activemq.apache.org/producer-flow-control.html
          -->
          <systemUsage>
            <systemUsage>
                <memoryUsage>
                    <memoryUsage percentOfJvmHeap="70" />
                </memoryUsage>
                <storeUsage>
                    <storeUsage limit="100 gb"/>
                </storeUsage>
                <tempUsage>
                    <tempUsage limit="50 gb"/>
                </tempUsage>
            </systemUsage>
        </systemUsage>

        <!--
            The transport connectors expose ActiveMQ over a given protocol to
            clients and other brokers. For more information, see:

            http://activemq.apache.org/configuring-transports.html
        -->
        <transportConnectors>
            <!-- DOS protection, limit concurrent connections to 1000 and frame size to 100MB -->

            <transportConnector name="openwire" uri="http://0.0.0.0:61616?maximumConnections=1000&amp;wireFormat.maxFrameSize=104857600"/>




     <!-- 
     <transportConnector name="http" uri="http://0.0.0.0:61616?maximumConnections=1000&amp;wireFormat.maxFrameSize=104857600"/>

     <transportConnector name="amqp" uri="amqp://0.0.0.0:5672?maximumConnections=1000&amp;wireFormat.maxFrameSize=104857600"/>
            <transportConnector name="stomp" uri="stomp://0.0.0.0:61613?maximumConnections=1000&amp;wireFormat.maxFrameSize=104857600"/>
            <transportConnector name="mqtt" uri="mqtt://0.0.0.0:1883?maximumConnections=1000&amp;wireFormat.maxFrameSize=104857600"/>
            <transportConnector name="ws" uri="ws://0.0.0.0:61614?maximumConnections=1000&amp;wireFormat.maxFrameSize=104857600"/> -->
        </transportConnectors>

        <!-- destroy the spring context on shutdown to stop jetty -->
        <shutdownHooks>
            <bean xmlns="http://www.springframework.org/schema/beans" class="org.apache.activemq.hooks.SpringContextHook" />
        </shutdownHooks>

    </broker>

    <!--
        Enable web consoles, REST and Ajax APIs and demos
        The web consoles requires by default login, you can disable this in the jetty.xml file

        Take a look at ${ACTIVEMQ_HOME}/conf/jetty.xml for more details
    -->
    <import resource="jetty.xml"/>

</beans>


Activemq version 5.9
jdk 1.8
eclipseIDE

jars:

activemq-all.jar
httpclient.jar
httpcore.jar
commonslogging-1.2.jar
com.thoughtworks.xstream.jar
xmlpull-xpp3-1.1.4c.jar

运行时出现错误mp.send(message),这是一个一般的简单示例,您也可以在您的环境中复制粘贴

Please wait connecting...
log4j:WARN No appenders could be found for logger (org.apache.http.impl.conn.PoolingClientConnectionManager).
log4j:WARN Please initialize the log4j system properly.
log4j:WARN See http://logging.apache.org/log4j/1.2/faq.html#noconfig for more info.
Successfully Connected 

Please wait creating session...
Session created 

Please wait Creating Queue...
Queue created 

Please wait Creating Producer...
Producer created 

Please wait Connection Starting...
Connection Started 

Please wait Creating TextMessage..
TextMessage Created 

Please wait TextMessage Sending...
javax.jms.JMSException: Could not post command: ActiveMQTextMessage {commandId = 5, responseRequired = true, messageId = ID:WSTIN0128-50902-1472648010012-2:1:1:1:1, originalDestination = null, originalTransactionId = null, producerId = ID:WSTIN0128-50902-1472648010012-2:1:1:1, destination = queue://DemoQueue4, transactionId = null, expiration = 0, timestamp = 1472648011069, arrival = 0, brokerInTime = 0, brokerOutTime = 0, correlationId = null, replyTo = null, persistent = true, type = null, priority = 4, groupID = null, groupSequence = 0, targetConsumerId = null, compressed = false, userID = null, content = org.apache.activemq.util.ByteSequence@67d48005, marshalledProperties = null, dataStructure = null, redeliveryCounter = 0, size = 0, properties = null, readOnlyProperties = true, readOnlyBody = true, droppable = false, jmsXGroupFirstForConsumer = false, text = Diyotta-http!} due to: java.net.SocketTimeoutException: Read timed out
    at org.apache.activemq.util.JMSExceptionSupport.create(JMSExceptionSupport.java:72)
    at org.apache.activemq.ActiveMQConnection.syncSendPacket(ActiveMQConnection.java:1423)
    at org.apache.activemq.ActiveMQConnection.syncSendPacket(ActiveMQConnection.java:1333)
    at org.apache.activemq.ActiveMQSession.send(ActiveMQSession.java:1811)
    at org.apache.activemq.ActiveMQMessageProducer.send(ActiveMQMessageProducer.java:289)
    at org.apache.activemq.ActiveMQMessageProducer.send(ActiveMQMessageProducer.java:224)
    at org.apache.activemq.ActiveMQMessageProducerSupport.send(ActiveMQMessageProducerSupport.java:241)
    at com.kumar.jmsproducer.Producer.main(Producer.java:57)
Caused by: java.io.IOException: Could not post command: ActiveMQTextMessage {commandId = 5, responseRequired = true, messageId = ID:WSTIN0128-50902-1472648010012-2:1:1:1:1, originalDestination = null, originalTransactionId = null, producerId = ID:WSTIN0128-50902-1472648010012-2:1:1:1, destination = queue://DemoQueue4, transactionId = null, expiration = 0, timestamp = 1472648011069, arrival = 0, brokerInTime = 0, brokerOutTime = 0, correlationId = null, replyTo = null, persistent = true, type = null, priority = 4, groupID = null, groupSequence = 0, targetConsumerId = null, compressed = false, userID = null, content = org.apache.activemq.util.ByteSequence@67d48005, marshalledProperties = null, dataStructure = null, redeliveryCounter = 0, size = 0, properties = null, readOnlyProperties = true, readOnlyBody = true, droppable = false, jmsXGroupFirstForConsumer = false, text = Diyotta-http!} due to: java.net.SocketTimeoutException: Read timed out
    at org.apache.activemq.util.IOExceptionSupport.create(IOExceptionSupport.java:33)
    at org.apache.activemq.transport.http.HttpClientTransport.oneway(HttpClientTransport.java:138)
    at org.apache.activemq.transport.AbstractInactivityMonitor.doOnewaySend(AbstractInactivityMonitor.java:304)
    at org.apache.activemq.transport.AbstractInactivityMonitor.oneway(AbstractInactivityMonitor.java:286)
    at org.apache.activemq.transport.MutexTransport.oneway(MutexTransport.java:68)
    at org.apache.activemq.transport.ResponseCorrelator.asyncRequest(ResponseCorrelator.java:81)
    at org.apache.activemq.transport.ResponseCorrelator.request(ResponseCorrelator.java:86)
    at org.apache.activemq.ActiveMQConnection.syncSendPacket(ActiveMQConnection.java:1394)
    ... 6 more
Caused by: java.net.SocketTimeoutException: Read timed out
    at java.net.SocketInputStream.socketRead0(Native Method)
    at java.net.SocketInputStream.socketRead(Unknown Source)
    at java.net.SocketInputStream.read(Unknown Source)
    at java.net.SocketInputStream.read(Unknown Source)
    at org.apache.http.impl.io.AbstractSessionInputBuffer.fillBuffer(AbstractSessionInputBuffer.java:160)
    at org.apache.http.impl.io.SocketInputBuffer.fillBuffer(SocketInputBuffer.java:84)
    at org.apache.http.impl.io.AbstractSessionInputBuffer.readLine(AbstractSessionInputBuffer.java:273)
    at org.apache.http.impl.conn.LoggingSessionInputBuffer.readLine(LoggingSessionInputBuffer.java:116)
    at org.apache.http.impl.conn.DefaultHttpResponseParser.parseHead(DefaultHttpResponseParser.java:140)
    at org.apache.http.impl.conn.DefaultHttpResponseParser.parseHead(DefaultHttpResponseParser.java:57)
    at org.apache.http.impl.io.AbstractMessageParser.parse(AbstractMessageParser.java:260)
    at org.apache.http.impl.AbstractHttpClientConnection.receiveResponseHeader(AbstractHttpClientConnection.java:283)
    at org.apache.http.impl.conn.DefaultClientConnection.receiveResponseHeader(DefaultClientConnection.java:251)
    at org.apache.http.impl.conn.ManagedClientConnectionImpl.receiveResponseHeader(ManagedClientConnectionImpl.java:197)
    at org.apache.http.protocol.HttpRequestExecutor.doReceiveResponse(HttpRequestExecutor.java:271)
    at org.apache.http.protocol.HttpRequestExecutor.execute(HttpRequestExecutor.java:123)
    at org.apache.http.impl.client.DefaultRequestDirector.tryExecute(DefaultRequestDirector.java:685)
    at org.apache.http.impl.client.DefaultRequestDirector.execute(DefaultRequestDirector.java:487)
    at org.apache.http.impl.client.AbstractHttpClient.doExecute(AbstractHttpClient.java:863)
    at org.apache.http.impl.client.CloseableHttpClient.execute(CloseableHttpClient.java:82)
    at org.apache.http.impl.client.CloseableHttpClient.execute(CloseableHttpClient.java:106)
    at org.apache.http.impl.client.CloseableHttpClient.execute(CloseableHttpClient.java:57)
    at org.apache.activemq.transport.http.HttpClientTransport.oneway(HttpClientTransport.java:125)
    ... 12 more

关于ActiveMQ

HTTP 和 HTTPS 传输用于使用 XML 负载通过 HTTP 或 HTTPS 建立隧道。这允许 ActiveMQ 客户端和代理通过 HTTP 建立隧道,避免任何防火墙问题。

如果客户端不是 JMS,您可能需要查看 REST 或 Ajax 支持。 请注意,HTTP 传输位于 activemq-Optional jar 中。

ActiveMQ 使用 Jetty 的服务器和 SslSocketConnector 对象的组合通过 HTTPS 传输进行通信。使用 HTTPS 时,相应 SSL 证书和/或 key 的不正确配置很可能会导致此 nabble 线程中描述的 Jetty 无限循环问题。可以在此处找到有关创建和配置 key 和证书的良好引用。

<小时/>
BrokerStartup.java

package com.kumar.httpsprotocol;

import java.net.URI;

import org.apache.activemq.broker.BrokerService;
import org.apache.activemq.broker.TransportConnector;

public class BrokerStartup {

    private static final String KEYSTORE = "i:/apache-activemq-5.9.0/conf/broker.ks";
    private static final String TRUST_KEYSTORE = "i:/apache-activemq-5.9.0/conf/broker.ts";
    private static final String PASSWORD = "123456";

    static BrokerService broker = new BrokerService();

    public static void startBorker(String host,String port) throws Exception{

        System.setProperty("javax.net.ssl.keyStore", KEYSTORE);
        System.setProperty("javax.net.ssl.keyStorePassword", PASSWORD);
        System.setProperty("javax.net.ssl.trustStore", TRUST_KEYSTORE);
        System.setProperty("javax.net.ssl.trustStorePassword", PASSWORD);

        TransportConnector connector = new TransportConnector();
        connector.setUri(new URI("https://"+host+":"+port));
        broker.addConnector(connector);
        broker.start();
        System.out.println("Broker Started.."+broker.getBrokerName());

    }

    public static void main(String args[]){
        try {
            startBorker("HostName","61616");
        } catch (Exception e) {
            // TODO Auto-generated catch block
            e.printStackTrace();
        }
    }
}
--------------------------------------------------------------------------------
Client Code: (Producer.java)

package com.kumar.httpsprotocol;

import javax.jms.Connection;
import javax.jms.Destination;
import javax.jms.JMSException;
import javax.jms.MessageProducer;
import javax.jms.Session;
import javax.jms.TextMessage;

import org.apache.activemq.ActiveMQConnectionFactory;

public class Producer {

    private static final String KEYSTORE = "i:/apache-activemq-5.9.0/conf/broker.ks";
    private static final String TRUSTSTORE = "i:/apache-activemq-5.9.0/conf/client.ts";
    private static final String PASSWORD = "123456";    

    public static void main(String[] args)throws JMSException {

        System.setProperty("javax.net.ssl.keyStore", KEYSTORE);
        System.setProperty("javax.net.ssl.keyStorePassword", PASSWORD);
        System.setProperty("javax.net.ssl.trustStore", TRUSTSTORE);
        System.setProperty("javax.net.ssl.trustStorePassword", PASSWORD);

        ActiveMQConnectionFactory cf=null;
        Connection con = null;

        try {

            System.out.println("Please wait connecting...");
            cf=new ActiveMQConnectionFactory("https://HostName:61617");
            con=cf.createConnection();

            /*cf.setTrustStore("I:/apache-activemq-5.9.0/conf/client.ts");
            cf.setTrustStorePassword("123456");*/

            System.out.println("Successfully Connected \n");
            Session s=con.createSession(false,Session.AUTO_ACKNOWLEDGE);    
            System.out.println("Session created \n");
            Destination d=s.createQueue("DemoQueue7");
            MessageProducer mp=s.createProducer(d);
            con.start();
            System.out.println("Please wait Creating TextMessage..");
            TextMessage message = s.createTextMessage("Recieved from Kumar HTTPS Protocol!");
            System.out.println("TextMessage Created \n");
            System.out.println("Please wait TextMessage Sending...");
            mp.send(message);
            System.out.println("TextMessage Sent '" + message.getText() + "'");

        }catch(Exception e) {
            e.printStackTrace();
        }finally{
            if(con!=null){
                con.close();
            }
        }

    }
}
<小时/>

activemq.xml

 <transportConnector name="openwire" uri="https://HostName:61617?maximumConnections=1000&amp;wireFormat.maxFrameSize=104857600"/>
</transportConnectors>

错误:启动代理时

Exception in thread "main" java.lang.NoSuchMethodError: org.eclipse.jetty.server.ssl.SslConnector.getSslContextFactory()Lorg/eclipse/jetty/util/ssl/SslContextFactory;
    at org.apache.activemq.transport.SecureSocketConnectorFactory.createConnector(SecureSocketConnectorFactory.java:65)
    at org.apache.activemq.transport.https.HttpsTransportServer.doStart(HttpsTransportServer.java:36)
    at org.apache.activemq.util.ServiceSupport.start(ServiceSupport.java:55)
    at org.apache.activemq.broker.TransportConnector.start(TransportConnector.java:243)
    at org.apache.activemq.broker.BrokerService.startTransportConnector(BrokerService.java:2501)
    at org.apache.activemq.broker.BrokerService.startAllConnectors(BrokerService.java:2415)
    at org.apache.activemq.broker.BrokerService.doStartBroker(BrokerService.java:666)
    at org.apache.activemq.broker.BrokerService.startBroker(BrokerService.java:632)
    at org.apache.activemq.broker.BrokerService.start(BrokerService.java:568)
    at com.kumar.httpsprotocol.BrokerStartup.startBorker(BrokerStartup.java:26)
    at com.kumar.httpsprotocol.BrokerStartup.main(BrokerStartup.java:33)

最佳答案

你的代码没问题,代理配置也没有问题,这似乎是一个库版本问题,你能否验证客户端和代理端使用的 jar 版本是否相同,特别是对于这些: httpclient-4.2.5.jar httpcore-4.2.4.jar xstream-1.4.4.jar xpp3-1.1.4c.jar

关于java - 我有一个使用 http 和 https 协议(protocol)的 JMS - ActiveMQ,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/39250490/

相关文章:

java - 在 JFrame 中手动定位图像

Java spring - 从 ajax post 到 Controller 收到 415 错误

java.lang.NoClassDefFoundError : Could not initialize class org. apache.commons.logging.LogFactory

jboss - Camel 应用程序在关闭 session 后尝试使用 JMS session

ActiveMQ版本与jre 1.5冲突

java - 使用 JMS 重试机制记录异常情况

spring - 这很可能在 tomcat 7.0.x 中造成内存泄漏

java - 对 ArrayList 进行排序?

java - 如何将 ActiveMQ 生产者连接到 OpenMQ JMS 代理

java - 基于GWT的开源电子商务解决方案