activemq - 正确关闭 ActiveMQ 和 Spring DefaultMessageListenerContainer

标签 activemq spring-jms

当 Tomcat 管理器发出“停止”命令时,我们的系统不会关闭。我已经确定它与 ActiveMQ/Spring 相关。我什至想出了如何让它关闭,但是我的解决方案是一个黑客(至少我希望这不是“正确”的方法)。我想知道关闭 ActiveMQ 的正确方法,以便我可以删除我的黑客。

我继承了这个组件,我不知道为什么做出某些架构决定,经过大量挖掘,我想我理解他的想法,但我可能会遗漏一些东西。换句话说,真正的问题可能在于我们尝试使用 ActiveMQ/Spring 的方式。

我们在 ServletContainer (Tomcat 6/7) 中运行并使用 ActiveMQ 5.9.1 和 Spring 3.0.0 我们的应用程序的多个实例可以在一个“组”中运行,每个实例运行在它自己的服务器上。 ActiveMQ 用于促进多个实例之间的通信。每个实例都有自己的嵌入式代理和自己的一组队列。每个实例上的每个队列都恰好有 1 个 org.springframework.jms.listener.DefaultMessageListenerContainer 监听它,例如,5 个队列 = 5 个 DefaultMessageListenerContainers。

我们的系统会正常关闭,直到我们通过将 queuePrefetch="0"添加到 ConnectionFactory 来修复错误。起初我认为这个更改在某种程度上是不正确的,但现在我了解了情况,我相信我们不应该使用预取功能。

我创建了一个测试应用程序来复制这个问题。请注意,以下信息未提及消息生产者。那是因为我可以在不发送/处理单个消息的情况下复制问题。在启动期间简单地创建 Broker、ConnectionFactory、Queues 和 Listeners 就足以防止系统正常停止。

这是我的 Spring XML 中的示例配置。如果有人想要,我很乐意提供我的整个项目:

<amq:broker persistent="false" id="mybroker"> 
 <amq:transportConnectors> 
  <amq:transportConnector uri="tcp://0.0.0.0:61616"/> 
 </amq:transportConnectors> 
</amq:broker> 

<amq:connectionFactory id="ConnectionFactory" brokerURL="vm://localhost?broker.persistent=false" > 
 <amq:prefetchPolicy> 
  <amq:prefetchPolicy queuePrefetch="0"/> 
 </amq:prefetchPolicy> 
</amq:connectionFactory> 

<amq:queue id="lookup.mdb.queue.cat" physicalName="DogQueue"/> 
<amq:queue id="lookup.mdb.queue.dog" physicalName="CatQueue"/> 
<amq:queue id="lookup.mdb.queue.fish" physicalName="FishQueue"/> 

<bean id="messageListener" class="org.springframework.jms.listener.DefaultMessageListenerContainer" abstract="true"> 
 <property name="connectionFactory" ref="ConnectionFactory"/> 
</bean> 

<bean parent="messageListener" id="cat"> 
 <property name="destination" ref="lookup.mdb.queue.dog"/> 
 <property name="messageListener"> 
  <bean class="com.acteksoft.common.remote.jms.WorkerMessageListener"/> 
 </property> 
 <property name="concurrentConsumers" value="200"/> 
 <property name="maxConcurrentConsumers" value="200"/> 
</bean> 

<bean parent="messageListener" id="dog"> 
 <property name="destination" ref="lookup.mdb.queue.cat"/> 
 <property name="messageListener"> 
  <bean class="com.acteksoft.common.remote.jms.WorkerMessageListener"/> 
 </property> 
 <property name="concurrentConsumers" value="200"/> 
 <property name="maxConcurrentConsumers" value="200"/> 
</bean> 

<bean parent="messageListener" id="fish"> 
 <property name="destination" ref="lookup.mdb.queue.fish"/> 
 <property name="messageListener"> 
  <bean class="com.acteksoft.common.remote.jms.WorkerMessageListener"/> 
 </property> 
 <property name="concurrentConsumers" value="200"/> 
 <property name="maxConcurrentConsumers" value="200"/> 
</bean> 

我的技巧涉及使用 ServletContextListener 手动停止对象。 hacky 部分是我必须创建额外的线程来停止 DefaultMessageListenerContainers。也许我以错误的顺序停止对象,但我已经尝试了我能想象的一切。如果我试图停止主线程中的对象,那么它们将无限期地挂起。

先感谢您!

更新
我已经根据 boday 的建议尝试了以下方法,但没有奏效。我还尝试将 amq:transportConnector uri 指定为 tcp://0.0.0.0:61616?transport.daemon=true
  <amq:broker persistent="false" id="mybroker" brokerName="localhost">
   <amq:transportConnectors>
    <amq:transportConnector uri="tcp://0.0.0.0:61616?daemon=true"/>
   </amq:transportConnectors>
  </amq:broker>

  <amq:connectionFactory id="connectionFactory" brokerURL="vm://localhost" >
   <amq:prefetchPolicy>
    <amq:prefetchPolicy queuePrefetch="0"/>
   </amq:prefetchPolicy>
  </amq:connectionFactory>

有一次,我尝试向 amq:connectionFactory 元素中的 brokerUrl 参数添加类似的属性,并且关闭正常工作,但是经过进一步测试,我了解到这些属性导致从 VMTransportFactory 抛出异常。这导致不正确的初始化和基本的消息功能不起作用。

最佳答案

尝试设置 daemon=true在您的 TCP 传输上,这允许进程作为守护线程运行,不会阻止容器关闭

http://activemq.apache.org/tcp-transport-reference.html

关于activemq - 正确关闭 ActiveMQ 和 Spring DefaultMessageListenerContainer,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/24049462/

相关文章:

java - JMSTemplate 检查主题是否存在并获取订阅者数量

java - org.apache.activemq.broker.TransportConnection serviceTransportException 警告 : java. io.EOFException

java - activemq 数据源的延迟初始化

java - 如何在ActiveMQ中抑制排序?

java - 当客户端停止工作时,JMS 有 javax.jms.InvalidDestinationException

java - 为什么 Spring 有 JdbcDaoSupport 类,但没有类似的 JmsSupport 类?

java - 如何在ssl中激活mq

java - Spring JMS 运行时连接管理

java - 是否可以在没有 QueueConnectionFactory 的情况下从 WAS JNDI 获取 JMS 队列引用?

java - 从监听器中获取 DefaultMessageListenerContainer