java - 在 Java 应用程序中使用 ActiveMQ 目的地(启动时)

标签 java jms activemq jndi

我已经在 ActiveMQ conf/activemq.xml 中配置了许多在启动时可用的目标。我可以看到队列和主题已在 ActiveMQ Web 控制台上创建。现在我想使用相同的队列和主题通过使用 JMS 的 Java 程序进行消息交换。下面是我正在使用的代码。但是,我在 ActiveMQ Web 控制台中看不到主题上排队的任何消息。

package com.test;

import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.Destination;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageConsumer;
import javax.jms.MessageProducer;
import javax.jms.Session;
import javax.jms.TextMessage;
import javax.naming.Context;
import javax.naming.InitialContext;
import javax.naming.NamingException;
import org.apache.log4j.BasicConfigurator;

public class Main {
public static void main(String[] args) {

    BasicConfigurator.configure();

    Destination destination =null;
    String destinationName = "MyTopic";
    final int numMsgs;
    Context jndiContext = null;

    // producer details
    ConnectionFactory connectionFactory = null;
    Connection connection = null;
    Session session = null;
    MessageProducer producer = null;

    //consumer details
    ConnectionFactory consumerConnectionFactory = null;
    Connection consumerConnection = null;
    Session consumerSession = null;
    MessageConsumer messageConsumer=null;

    /*
     * Create a JNDI API InitialContext object
     */
    try {
        jndiContext = new InitialContext();
    } catch (NamingException e) {
        System.out.println("Could not create JNDI API context: " + e.toString());
        System.exit(1);
    }

    /*
     * Look up connection factory and destination.
     */
    try {
        connectionFactory = (ConnectionFactory) jndiContext.lookup("ConnectionFactory");
        destination = (Destination) jndiContext.lookup(destinationName);

    } catch (NamingException e) {
        System.out.println("JNDI API lookup failed: " + e);
        System.exit(1);
    }

    /*
     * Create connection. Create session from connection; false means
     * session is not transacted. Create sender and text message. Send
     * messages, varying text slightly. Send end-of-messages message.
     * Finally, close connection.
     */
    try {
        connection = connectionFactory.createConnection();
        connection.start();
        session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
        producer = session.createProducer(destination);
        TextMessage message = session.createTextMessage();
        for (int i = 0; i < 2; i++) {
            message.setText("This is message " + (i + 1));
            System.out.println("Sending message: " + message.getText());
            producer.send(message);

        }

        /*
         * Send a non-text control message indicating end of messages.
         */
        producer.send(session.createMessage());

        /***
         * to recieve message
         * */

        try {
            consumerConnectionFactory = (ConnectionFactory) jndiContext.lookup("ConnectionFactory");
            consumerConnection = consumerConnectionFactory.createConnection();
            consumerConnection.start();
            consumerSession = consumerConnection.createSession(false, Session.AUTO_ACKNOWLEDGE);
            messageConsumer = consumerSession.createConsumer(destination);
        } catch (NamingException e) {
            // TODO Auto-generated catch block
            e.printStackTrace();
        }

        String greeting = "NO_GREETING";

        // read a message from the topic destination
        Message recievedMessage = messageConsumer.receive();
        System.out.println("******************" + recievedMessage);
        // check if a message was received
        if (recievedMessage != null) {
            // cast the message to the correct type
            TextMessage textMessage = (TextMessage) recievedMessage;

            // retrieve the message content
            String text = textMessage.getText();
            System.out.println(": received message with text='{}'" + text);

            // create greeting
            greeting = "Hello " + text + "!";
        } else {
            System.out.println(": no message received");
        }

        System.out.println("greeting={}" + greeting);

    } catch (JMSException e) {
        System.out.println("Exception occurred: " + e);
    } finally {
        if (connection != null && consumerConnection !=null) {
            try {
                connection.close();
                consumerConnection.close();
            } catch (JMSException e) {
            }
        }
    }
}

}

我的 JNDI 属性文件

java.naming.factory.initial = org.apache.activemq.jndi.ActiveMQInitialContextFactory

# use the following property to configure the default connector
java.naming.provider.url = tcp://localhost:61616/

# use the following property to specify the JNDI name the connection factory
# should appear as. 
#connectionFactoryNames = connectionFactory, queueConnectionFactory, topicConnectionFactry

# register some queues in JNDI using the form
# queue.[jndiName] = [physicalName]
queue.MyQueue = example.MyQueue


# register some topics in JNDI using the form
# topic.[jndiName] = [physicalName]
topic.MyTopic = SOME.TOPIC

ActiveMQ/conf/activemq.xml文件中的ActiveMQ配置文件

<!--
    Licensed to the Apache Software Foundation (ASF) under one or more
    contributor license agreements.  See the NOTICE file distributed with
    this work for additional information regarding copyright ownership.
    The ASF licenses this file to You under the Apache License, Version 2.0
    (the "License"); you may not use this file except in compliance with
    the License.  You may obtain a copy of the License at

    http://www.apache.org/licenses/LICENSE-2.0

    Unless required by applicable law or agreed to in writing, software
    distributed under the License is distributed on an "AS IS" BASIS,
    WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    See the License for the specific language governing permissions and
    limitations under the License.
-->
<!-- START SNIPPET: example -->
<beans
  xmlns="http://www.springframework.org/schema/beans"
  xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
  xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd
  http://activemq.apache.org/schema/core http://activemq.apache.org/schema/core/activemq-core.xsd">

    <!-- Allows us to use system properties as variables in this configuration file -->
    <bean class="org.springframework.beans.factory.config.PropertyPlaceholderConfigurer">
        <property name="locations">
            <value>file:${activemq.conf}/credentials.properties</value>
        </property>
    </bean>

   <!-- Allows accessing the server log -->
    <bean id="logQuery" class="io.fabric8.insight.log.log4j.Log4jLogQuery"
          lazy-init="false" scope="singleton"
          init-method="start" destroy-method="stop">
    </bean>

    <!--
        The <broker> element is used to configure the ActiveMQ broker.
    -->
    <broker xmlns="http://activemq.apache.org/schema/core" brokerName="localhost" dataDirectory="${activemq.data}">

     <destinations>
      <queue physicalName="FOO.BAR" />
      <topic physicalName="SOME.TOPIC" />
    </destinations>

        <destinationPolicy>
            <policyMap>
              <policyEntries>
                <policyEntry topic=">" >
                    <!-- The constantPendingMessageLimitStrategy is used to prevent
                         slow topic consumers to block producers and affect other consumers
                         by limiting the number of messages that are retained
                         For more information, see:

                         http://activemq.apache.org/slow-consumer-handling.html

                    -->
                  <pendingMessageLimitStrategy>
                    <constantPendingMessageLimitStrategy limit="1000"/>
                  </pendingMessageLimitStrategy>
                </policyEntry>
              </policyEntries>
            </policyMap>
        </destinationPolicy>

        <!--
            The managementContext is used to configure how ActiveMQ is exposed in
            JMX. By default, ActiveMQ uses the MBean server that is started by
            the JVM. For more information, see:

            http://activemq.apache.org/jmx.html
        -->
        <managementContext>
            <managementContext createConnector="false"/>
        </managementContext>

        <!--
            Configure message persistence for the broker. The default persistence
            mechanism is the KahaDB store (identified by the kahaDB tag).
            For more information, see:

            http://activemq.apache.org/persistence.html
        -->
        <persistenceAdapter>
            <kahaDB directory="${activemq.data}/kahadb"/>
        </persistenceAdapter>


          <!--
            The systemUsage controls the maximum amount of space the broker will
            use before disabling caching and/or slowing down producers. For more information, see:
            http://activemq.apache.org/producer-flow-control.html
          -->
          <systemUsage>
            <systemUsage>
                <memoryUsage>
                    <memoryUsage percentOfJvmHeap="70" />
                </memoryUsage>
                <storeUsage>
                    <storeUsage limit="100 gb"/>
                </storeUsage>
                <tempUsage>
                    <tempUsage limit="50 gb"/>
                </tempUsage>
            </systemUsage>
        </systemUsage>

        <!--
            The transport connectors expose ActiveMQ over a given protocol to
            clients and other brokers. For more information, see:

            http://activemq.apache.org/configuring-transports.html
        -->
        <transportConnectors>
            <!-- DOS protection, limit concurrent connections to 1000 and frame size to 100MB -->
            <transportConnector name="openwire" uri="tcp://0.0.0.0:61616?maximumConnections=1000&amp;wireFormat.maxFrameSize=104857600"/>
            <transportConnector name="amqp" uri="amqp://0.0.0.0:5672?maximumConnections=1000&amp;wireFormat.maxFrameSize=104857600"/>
            <transportConnector name="stomp" uri="stomp://0.0.0.0:61613?maximumConnections=1000&amp;wireFormat.maxFrameSize=104857600"/>
            <transportConnector name="mqtt" uri="mqtt://0.0.0.0:1883?maximumConnections=1000&amp;wireFormat.maxFrameSize=104857600"/>
            <transportConnector name="ws" uri="ws://0.0.0.0:61614?maximumConnections=1000&amp;wireFormat.maxFrameSize=104857600"/>
        </transportConnectors>

        <!-- destroy the spring context on shutdown to stop jetty -->
        <shutdownHooks>
            <bean xmlns="http://www.springframework.org/schema/beans" class="org.apache.activemq.hooks.SpringContextHook" />
        </shutdownHooks>

    </broker>

    <!--
        Enable web consoles, REST and Ajax APIs and demos
        The web consoles requires by default login, you can disable this in the jetty.xml file

        Take a look at ${ACTIVEMQ_HOME}/conf/jetty.xml for more details
    -->
    <import resource="jetty.xml"/>

</beans>
<!-- END SNIPPET: example -->

在我的输出中,我可以看到下面的文本。

Sending message: This is message 1
Sending message: This is message 2

但是,我认为该消息没有放在主题上,因为 ActiveMQ Web 控制台上没有显示排队的消息。

任何人都可以帮助我确定这是否是在 java 启动时访问 ActiveMQ 目标的正确方法。我很清楚如何使用 JMS 和 ActiveMQ 通过 Java 创建队列和主题。但在这个项目中,我想访问程序中现有的队列和主题。

最佳答案

主题需要主动订阅才能传递消息,否则消息将被丢弃,这就是发布/订阅的本质。如果您想发送消息并稍后阅读,您应该使用队列,或者作为最后的手段使用持久主题订阅。如果你费心去谷歌一下,有很多关于这方面的教程。

关于java - 在 Java 应用程序中使用 ActiveMQ 目的地(启动时),我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/33094870/

相关文章:

Java - 将多个用户输入添加到 arraylist

java - activemq中未加载页面文件异常

filter - JMS 选择器与单独队列

java - Spring JMS 消息监听器容器

java - JMS如何读取队列上的多个文件

java - 安全的 JMS 队列连接 jboss

java - 如何使用 Spring WebSocket 向 STOMP 客户端发送错误消息?

java - EntityManager.remove() 不会生成删除查询并且不会删除实体

java - 使用struts2标签测试字符串长度

Java GMT 显示