java - 当生产者的连接断开时如何收到通知?

标签 java activemq

我正在使用嵌入式 ActiveMQ 代理。 我的目标是找到一种方法来检测队列上的外部生产者何时失去连接。

我像这样启动经纪人:

BrokerService broker = new BrokerService();
broker.addConnector("tcp://" + LISTEN_DEVICE_IP + ":" + port);
setLastMessagesPersistent(broker);
broker.start();

ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory("vm://localhost");
connection = factory.createConnection();
session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);

connection.start();

之后我尝试添加一个 TransportListener:

((ActiveMQConnection) connection).addTransportListener(new TransportListener() {
    public void transportResumed() {
        System.out.println("resumed");
    }
    public void transportInterupted() {
        System.out.println("interrupted");
    }
    public void onException(IOException arg0) {
        System.out.println("ioexception: " + arg0);
    }
    public void onCommand(Object arg0) {
        System.out.println("command: " + arg0);
    }
});

我还注册了一个消费者和 ProducerListener,如下所示:

Destination dest = session.createQueue(queuename);
MessageConsumer consumer = session.createConsumer(dest);

ProducerEventSource source = new ProducerEventSource(connection, dest);
System.out.println("Setting Producer Listener");
source.setProducerListener(prodevent -> {
    System.out.println("producer status: " + prodevent.isStarted());
});
// Gets called from inside the broker's Thread and somehow causes deadlocks if I don't invoke this from the outside
new Thread(() -> {
    try {
        consumer.setMessageListener(new NetworkEventPlayerAdapter(objectMapper, event, gameEventManager, playerID));
    } catch (Exception e) {
        e.printStackTrace();
    }
}).start();

不幸的是,当我强制退出之前添加为生产者的另一个应用程序 (Alt+F4) 时,TransportListener 和 ProducerListener 都没有给我任何输出。不过经纪人肯定注意到了:

 WARN | Transport Connection to: tcp://127.0.0.1:58988 failed: java.net.SocketException: Connection reset
 WARN | Transport Connection to: tcp://127.0.0.1:58986 failed: java.net.SocketException: Connection reset

但是我没有找到在 Java 中获取这些事件回调的方法。 我还尝试在代理中设置自定义 IOExceptionHandler 并向连接添加 ExceptionListener。他们也从未被调用过。

最佳答案

您可以使用咨询主题 ActiveMQ.Advisory.Connection 甚至 ActiveMQ.Advisory.Producer.Queue ActiveMQ.Advisory.Producer.Topic ,这些提供有关生产者数量连接的统计信息,请查看此链接 http://activemq.apache.org/advisory-message.html

关于java - 当生产者的连接断开时如何收到通知?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/34953466/

相关文章:

java - 如果删除 ListView 中的最后一项,则显示不同的布局

java - 如何解决不可变的 String[]

jms - ActiveMQ 5.11 代理网络和主/从组配置

java - 基于时间间隔的节流

java - 从远程机器访问 Grails ActiveMQ

java - jackson 的嵌套映射

java - 为什么我在 Java 中收到 NoClassDefFoundError 错误?

java - 创建不带参数的方法和创建字符串参数的方法之间的区别?

java - JMS 主题在队列监听器中接收

Node.js应用监听消息队列并异步添加消息到redis