java - ActiveMQ 和嵌入式代理

标签 java jms activemq

编辑:改写问题:

我想使用 ActiveMQ 作为我的服务器和客户端应用程序之间的信使服务。

我正在尝试在服务器中设置一个嵌入式代理(即不是一个单独的进程)来处理生成的消息供我的客户使用。这个队列是持久化的。

代理初始化如下:

BrokerService broker = new BrokerService();
KahaPersistenceAdapter adaptor = new KahaPersistenceAdapter();
adaptor.setDirectory(new File("activemq"));
broker.setPersistenceAdapter(adaptor);
broker.setUseJmx(true);
broker.addConnector("tcp://localhost:61616");
broker.start();

经过修改,我最终得到的服务器部分是:

public static class HelloWorldProducer implements Runnable {
    public void run() {
        try {
            ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory("tcp://localhost"); // apparently the vm part is all i need
            Connection connection = connectionFactory.createConnection(); 
            connection.start();
            Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
            Destination destination = session.createQueue("TEST.FOO");
            MessageProducer producer = session.createProducer(destination);
            producer.setDeliveryMode(DeliveryMode.NON_PERSISTENT);
            String text = "Hello world! From: " + Thread.currentThread().getName() + " : " + this.hashCode();
            TextMessage message = session.createTextMessage(text);
            System.out.println("Sent message: "+ message.hashCode() + " : " + Thread.currentThread().getName());
            producer.send(message);
            session.close();
            connection.close();
        }
        catch (Exception e) {
            System.out.println("Caught: " + e);
            e.printStackTrace();
        }
    }
}

客户端非常相似,看起来像这样:

public static class HelloWorldConsumer implements Runnable, ExceptionListener {
    public void run() {
        try {
          ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory("tcp://localhost");
            Connection connection = connectionFactory.createConnection(); // exception happens here...
            connection.start();
            connection.setExceptionListener(this);
            Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
            Destination destination = session.createQueue("TEST.FOO");
            MessageConsumer consumer = session.createConsumer(destination);
            Message message = consumer.receive(1000);
            if (message instanceof TextMessage) {
                TextMessage textMessage = (TextMessage) message;
                String text = textMessage.getText();
                System.out.println("*****Received: " + text);
            } else {
                System.out.println("*****Received obj: " + message);
            }
            consumer.close();
            session.close();
            connection.close();
        } catch (Exception e) {
            System.out.println("Caught: " + e);
            e.printStackTrace();
        }
    }

main 方法只是在一个线程中启动其中的每一个以开始生成/接收消息。

...但是我在每个线程的开头都遇到了以下问题:

2013-01-24 07:54:31,271 INFO  [org.apache.activemq.broker.BrokerService] Using Persistence Adapter: AMQPersistenceAdapter(activemq-data/localhost)
2013-01-24 07:54:31,281 INFO  [org.apache.activemq.store.amq.AMQPersistenceAdapter] AMQStore starting using directory: activemq-data/localhost
2013-01-24 07:54:31,302 INFO  [org.apache.activemq.kaha.impl.KahaStore] Kaha Store using data directory activemq-data/localhost/kr-store/state
2013-01-24 07:54:31,339 INFO  [org.apache.activemq.store.amq.AMQPersistenceAdapter] Active data files: []
2013-01-24 07:54:31,445 DEBUG [org.apache.activemq.broker.jmx.ManagementContext] Probably not using JRE 1.4: mx4j.tools.naming.NamingService
2013-01-24 07:54:31,450 DEBUG [org.apache.activemq.broker.jmx.ManagementContext] Failed to create local registry
    java.rmi.server.ExportException: internal error: ObjID already in use
    at sun.rmi.transport.ObjectTable.putTarget(ObjectTable.java:186)
    at sun.rmi.transport.Transport.exportObject(Transport.java:92)
    at sun.rmi.transport.tcp.TCPTransport.exportObject(TCPTransport.java:247)
    at sun.rmi.transport.tcp.TCPEndpoint.exportObject(TCPEndpoint.java:411)
    at sun.rmi.transport.LiveRef.exportObject(LiveRef.java:147)
        <snip....>

似乎消息已成功生成和使用(我之前发布的其他问题已解决),但上述异常让我担心。

编辑:在代理关闭期间,我现在也受到以下欢迎:

2013-01-25 08:40:17,486 DEBUG [org.apache.activemq.transport.failover.FailoverTransport] Transport failed with the following exception:
    java.io.EOFException
    at java.io.DataInputStream.readInt(DataInputStream.java:392)
    at org.apache.activemq.openwire.OpenWireFormat.unmarshal(OpenWireFormat.java:269)
    at org.apache.activemq.transport.tcp.TcpTransport.readCommand(TcpTransport.java:210)
    at org.apache.activemq.transport.tcp.TcpTransport.doRun(TcpTransport.java:202)
    at org.apache.activemq.transport.tcp.TcpTransport.run(TcpTransport.java:185)
    at java.lang.Thread.run(Thread.java:722)

最佳答案

您可以通过多种方式将代理嵌入到您的代码中,其中大部分已记录在案 here .您可能想尝试升级您的版本,因为您使用的版本似乎很旧,因为它默认为现在已弃用的 AMQ 商店,而不是更新的 KahaDB 商店。您可能会因为客户端线程之间的竞争而遇到问题,因为它们使用不同的连接工厂,这些连接工厂可能会竞相在 VM 代理中创建。如果您在生产者上设置 create=false 选项并确保消费者线程在这之后启动可以解决问题,或者您可以提前创建 VM 代理并将 create=false 添加到两个线程,这可能会解决问题。

BrokerService broker = new BrokerService();
// configure the broker
broker.setBrokerName("localhost");
broker.setUseJmx(false);
broker.start();

然后在客户端代码中通过这个连接工厂配置附加。

ActiveMQConnectionFactory cf = new ActiveMQConnectionFactory("vm://localhost?create=false");

关于java - ActiveMQ 和嵌入式代理,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/14480023/

相关文章:

java - 开始接收短信 Activity

java - JMS-从远程 Glassfish 接收消息

c++ - 为 ARM 交叉编译 ActiveMQ

java - 在 ActiveMQSslConnectionFactory 中以编程方式设置信任库似乎失败

java - 等待异步任务的安全有效方式

java - 无法为对象堆 liferay tomcat 保留足够的空间

java - 将字符串转换为小写,但所有元字符除外

java - 将消息从一个队列复制到另一个队列

Delphi+消息队列

amazon-ec2 - 是否有提供 Amazon SQS 高可用性的 FIFO 消息队列服务?