java - 传输监听器和 ActiveMq 重新启动

标签 java jms activemq race-condition

我开发了 JMS 应用程序,现在我想添加支持代理重启的功能。我有动态主题,我应该在连接恢复后重新创建它们。另外,我应该有可能知道经纪人何时关闭以及何时恢复。 所以我尝试使用ActiveMQ故障转移协议(protocol)来实现这个功能。我实现 TransportListener 并在“transportInterrupted”方法中调用完全断开连接,例如

  public void disconnect() throws JMSException {
    System.out.println("!!!!!!!DISCONNECTING!!!!!!!!");
    consumer.close();
    session.close();
    session = null;
    messageProducer.close();
    messageProducer = null;
    connection = null;
    connected = false;
    System.out.println("!!!!!!!DISCONNECTED!!!!!!!!");
}

此后我的应用程序挂起,就像竞争条件一样。如果我 close() 仅生产者并将连接设置为 null 一切正常,但如果我尝试关闭消费者,它仅在 N 种情况中的 1 种情况下有效。我写了测试来证明这一点。我认为问题在于关闭消费者,但我没有找到任何信息我做错了什么。

import javax.jms.Connection;
import javax.jms.JMSException;
import javax.jms.MessageConsumer;
import javax.jms.MessageProducer;
import javax.jms.Queue;
import javax.jms.Session;

import org.apache.activemq.ActiveMQConnection;
import org.apache.activemq.ActiveMQConnectionFactory;
public class FastFailProducer {
    volatile boolean connected = false;
    private ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory("failover:(tcp://localhost:61616)?timeout=5000");;
    private static FailoverListener failoverListener;
    private Connection connection;
    private Session session;
    private Queue queue;
    private MessageProducer messageProducer;
    private MessageConsumer consumer;
    private String something;

public void init() throws JMSException {
    System.out.println("!!!!!!!CONNECTING!!!!!!!!");
    connection = factory.createConnection();
    session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
    connection.start();
    ((ActiveMQConnection) connection).addTransportListener(failoverListener);
    queue = session.createQueue("TEST");
    messageProducer = session.createProducer(queue);
    consumer = session.createConsumer(queue);
    System.out.println("!!!!!!!CONNECTING COMPLETE!!!!!!!!");
    connected = true;
}

public void disconnect() throws JMSException {
    System.out.println("!!!!!!!DISCONNECTING!!!!!!!!");
    consumer.close();
    session.close();
    session = null;
    messageProducer.close();
    messageProducer = null;
    connection = null;
    connected = false;
    System.out.println("!!!!!!!DISCONNECTED!!!!!!!!");
}

public void run() throws Exception {
    // send messages
    for (int i = 0; i < 1000; i++) {
        if (connected) {
            if (session != null & messageProducer != null & queue != null) {
                // send a message
                messageProducer.send(session.createTextMessage(i + " message"));
                System.out.println("Sent message " + i);
            }
        } else {
            // execute your backup logic
            System.out.println("Message " + i + " not sent");

        }
        Thread.sleep(1000);
    }

    messageProducer.close();
    session.close();
    connection.close();
    System.exit(0);
}

public static void main(String[] args) throws Exception {
    FastFailProducer failoverProducer = new FastFailProducer();
    failoverProducer.something = "asdfasdf";
    failoverListener = new FailoverListener(failoverProducer);
    failoverProducer.init();
    failoverProducer.setConnected(true);
    failoverProducer.run();

}

public boolean isConnected() {
    return connected;
}

public void setConnected(boolean connected) {
    this.connected = connected;
}
}

TransportListenerImpl 类

import java.io.IOException;

import javax.jms.JMSException;

import org.apache.activemq.transport.TransportListener;

public class FailoverListener implements TransportListener {
    private FastFailProducer failProducer;

public FailoverListener(FastFailProducer failProducer) {
    super();
    this.failProducer = failProducer;
}

@Override
public void onCommand(Object arg0) {
}

@Override
public void onException(IOException arg0) {

}

@Override
public void transportInterupted() {
    try {
        failProducer.disconnect();
    } catch (JMSException e) {
        e.printStackTrace();
    }
}

@Override
public void transportResumed() {
    System.out.println("!!!!!!!TRANSPORT RESUMED!!!!!!!!");
    if (!failProducer.isConnected()) {
        try {
            failProducer.init();
        } catch (JMSException e) {
            e.printStackTrace();
        }
    }
}
}

最佳答案

我认为您忽略了使用故障转移协议(protocol)的要点。如果您使用故障转移,则无需关闭连接及其关联资源,因为故障转移传输将负责将客户端上的所有内容恢复为代理关闭之前的状态。在事件方法中关闭连接肯定会锁定,因为您不应该这样做。如果您想在代理消失时关闭所有内容,请不要使用故障转移,而是监听 JMS 异常监听器事件 Hook 。

关于java - 传输监听器和 ActiveMq 重新启动,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/14172547/

相关文章:

java - Camel 消息自动过期

java - 企业应用集成: Ways to communicate show rooms with central office

jms - JMS 中的 Multi-Tenancy

java - Spring Security - 两个独立的身份验证信息源

java - 如何中止 JDBC Postgresql CopyManager 复制?

java - 使用旋转动画旋转正方形

启动时 Java GUI 应用程序框架无法正确显示?

java - 始终确保 ActiveMQ 主题中只有最后 10 条消息

ssl - 如何在 SSH 隧道中忽略主机名

java - JBWEB000289 JBoss 部署时出错