java - ActiveMQ Artemis - javax.jms.IllegalStateException : AMQ219019: Session is closed

标签 java activemq-artemis

我有两个 ActiveMQ Artemis 代理运行在不同的机器上,形成一个简单的集群。我正在使用 Java 应用程序(非常基本)来生成和使用消息来分析集群的行为。 Java代码如下:

public void runExample() throws Exception {
    InitialContext initialContext = null;
    Connection connectionA = null;

    try {
        Properties properties = new Properties();
        properties.put("java.naming.factory.initial", "org.apache.activemq.artemis.jndi.ActiveMQInitialContextFactory");
        properties.put("connectionFactory.ConnectionFactory", "udp://231.7.7.7:9876");
        properties.put("queue.queue/anotherExampleQueue", "anotherExampleQueue"); 

        initialContext = new InitialContext(properties);
        Queue queue = (Queue) initialContext.lookup("queue/anotherExampleQueue");
        ConnectionFactory connectionFactory = (ConnectionFactory) initialContext.lookup("ConnectionFactory");

        Thread.sleep(5000);
        connectionA = connectionFactory.createConnection("admin", "admin");
        
        Session sessionA = connectionA.createSession(false, Session.AUTO_ACKNOWLEDGE);
        
        System.out.println("Session A - " + ((ClientSessionInternal)((org.apache.activemq.artemis.jms.client.ActiveMQSession) sessionA).getCoreSession()).getConnection().getRemoteAddress());
        
        MessageProducer producerA = sessionA.createProducer(queue);
        
        final int numMessages = 10;

        for (int i = 0; i < numMessages; i++) {
            TextMessage messageA = sessionA.createTextMessage("A:This is text message " + i);
            producerA.send(messageA);
            System.out.println("Sent message: " + messageA.getText());
        }

        connectionA.start();
        consume(sessionA, queue, numMessages, "A");
        
    } finally {
        if (connectionA != null) {
            connectionA.close();
        }
        if (initialContext != null) {
            initialContext.close();
        }
    }
}

private static void consume(Session session, Queue queue, int numMessages, String node) throws JMSException {
    MessageConsumer consumer = session.createConsumer(queue);

    for (int i = 0; i < numMessages; i++) {
        TextMessage message = (TextMessage) consumer.receive(2000);
        
        if(message!=null)
            System.out.println("Got message: " + message.getText() + " from node " + node);
    }

    System.out.println("receive other message from node " + node + ": " + consumer.receive(2000));
}

connectionA.start() 处使用断点调试上述应用程序时。如果我停止我的主代理,那么我会看到从属代理接管并且所有消息都按预期移动到从属代理。但是,此时,如果我继续我的应用程序,则会抛出 javax.jms.IllegalStateException: AMQ219019: Session is closed而不是在从属代理上消费消息。当我再次启动主代理并继续调试时,也会发生同样的情况。 documentation表示自动客户端故障转移将自动发生。

这是主broker.xml的片段

<connectors>
   <connector name="clusterConnectorOne">tcp://10.10.170.5:61616</connector>
</connectors>

<discovery-groups>
   <discovery-group name="my-discovery-group">
      <local-bind-address>10.10.170.5</local-bind-address>
      <group-address>231.7.7.7</group-address>
      <group-port>9876</group-port>
      <refresh-timeout>10000</refresh-timeout>
   </discovery-group>
</discovery-groups>

<cluster-connections>
   <cluster-connection name="my-cluster">
      <connector-ref>clusterConnectorOne</connector-ref>
      <retry-interval>500</retry-interval>
      <use-duplicate-detection>true</use-duplicate-detection>
      <message-load-balancing>STRICT</message-load-balancing>
      <max-hops>1</max-hops>
      <discovery-group-ref discovery-group-name="my-discovery-group"/>
   </cluster-connection>
</cluster-connections>
      
<broadcast-groups>
   <broadcast-group name="my-broadcast-group">
      <local-bind-address>10.10.170.5</local-bind-address>
      <local-bind-port>5432</local-bind-port>
      <group-address>231.7.7.7</group-address>
      <group-port>9876</group-port>
      <broadcast-period>2000</broadcast-period>
      <connector-ref>clusterConnectorOne</connector-ref>
   </broadcast-group>
</broadcast-groups>

<ha-policy>
   <replication>
      <master>
        <check-for-live-server>true</check-for-live-server>
      </master>
   </replication>
</ha-policy>

我不知道这里出了什么问题,有什么建议吗?

最佳答案

由于您的连接工厂正在使用udp://231.7.7.7:9876,因此它正在“发现”最终需要使用的连接器。在您的情况下,连接器 tcp://10.10.170.5:61616 正在由您的代理广播,以便客户端将发现和使用。但是,此连接器未配置用于 HA。我需要类似于 tcp://10.10.170.5:61616?ha=true;reconnectAttempts=-1 ,以便告诉客户端在连接丢失时故障转移到备份。更新 broker.xml 中的连接器配置,故障转移应该可以正常工作。代理附带的许多高可用性示例都演示了此设置,例如transaction-failover .

关于java - ActiveMQ Artemis - javax.jms.IllegalStateException : AMQ219019: Session is closed,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/62842056/

相关文章:

ssl - Artemis 中 2-way SSL 和单一身份验证之间的区别

jms - 如何将 JMS 消息从 WildFly 10 发送到远程 ActiveMQ

java - 上传具有共享首选项的多个图像

javascript - 如何在 youtube 上获取直播视频的可下载网址

java - 在哪里可以找到使用 Apache ActiveMQ Artemis 中的 artemis create 创建的队列和地址

docker - 在许多 Docker 容器上配置 ActiveMQ Artemis 集群?

java - 使用 Wildlfy 11 嵌入的 Apache Artemis 接收 MQTT 消息

java - 为什么JDK Map.get不支持返回值的类型推断

java - Java中的条码图像生成器

java - 如何使用 Java 和 Apache POI 在 xls 中编写自定义尺寸的图像