我正在尝试使用 Spring-Integration JMS 的示例项目进行 JMS 集成,我已经成功完成了。然而我的要求略有不同。我需要使用发布订阅模式从 1 个 JMS 代理进行监听,并且需要将相同的监听 JMS 消息发送到另一个 Kafak 队列/或其他队列。我正在努力进行配置,到目前为止我只配置了请求和响应队列。这是配置。请帮忙。
Common.xml
<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xmlns:integration="http://www.springframework.org/schema/integration"
xsi:schemaLocation="http://www.springframework.org/schema/beans
http://www.springframework.org/schema/beans/spring-beans.xsd
http://www.springframework.org/schema/integration
http://www.springframework.org/schema/integration/spring-integration.xsd">
<bean id="connectionFactory" class="org.springframework.jms.connection.CachingConnectionFactory">
<property name="targetConnectionFactory">
<bean class="org.apache.activemq.ActiveMQConnectionFactory">
<property name="brokerURL" value="vm://localhost"/>
</bean>
</property>
<property name="sessionCacheSize" value="10"/>
<property name="cacheProducers" value="false"/>
</bean>
<!-- <bean id="requestQueue" class="org.apache.activemq.command.ActiveMQQueue">
<constructor-arg value="queue.demo"/>
</bean> -->
<bean id="replyQueue" class="org.apache.activemq.command.ActiveMQQueue">
<constructor-arg value="queue.reply"/>
</bean>
<bean id="requestQueue" class="org.apache.activemq.command.ActiveMQQueue">
<constructor-arg value="queue.request"/>
</bean>
<integration:poller id="poller" default="true" fixed-delay="100"/>
</beans>
InboudChanelAdapter
<?xml version="1.0" encoding="UTF-8"?>
<beans:beans xmlns="http://www.springframework.org/schema/integration"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xmlns:beans="http://www.springframework.org/schema/beans"
xmlns:jms="http://www.springframework.org/schema/integration/jms"
xmlns:stream="http://www.springframework.org/schema/integration/stream"
xsi:schemaLocation="http://www.springframework.org/schema/beans
http://www.springframework.org/schema/beans/spring-beans.xsd
http://www.springframework.org/schema/integration
http://www.springframework.org/schema/integration/spring-integration.xsd
http://www.springframework.org/schema/integration/jms
http://www.springframework.org/schema/integration/jms/spring-integration-jms.xsd
http://www.springframework.org/schema/integration/stream
http://www.springframework.org/schema/integration/stream/spring-integration-stream.xsd">
<jms:message-driven-channel-adapter id="jmsIn"
destination="requestQueue"
channel="jmsInChannel" />
<channel id="jmsInChannel" />
<beans:beans profile="testCase">
<bridge input-channel="jmsInChannel" output-channel="queueChannel"/>
<channel id="queueChannel">
<queue />
</channel>
</beans:beans>
</beans:beans>
OutboundChannelAdapter.xml
<?xml version="1.0" encoding="UTF-8"?>
<beans:beans xmlns="http://www.springframework.org/schema/integration"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xmlns:beans="http://www.springframework.org/schema/beans"
xmlns:jms="http://www.springframework.org/schema/integration/jms"
xmlns:stream="http://www.springframework.org/schema/integration/stream"
xsi:schemaLocation="http://www.springframework.org/schema/beans
http://www.springframework.org/schema/beans/spring-beans.xsd
http://www.springframework.org/schema/integration
http://www.springframework.org/schema/integration/spring-integration.xsd
http://www.springframework.org/schema/integration/jms
http://www.springframework.org/schema/integration/jms/spring-integration-jms.xsd
http://www.springframework.org/schema/integration/stream
http://www.springframework.org/schema/integration/stream/spring-integration-stream.xsd">
<stream:stdin-channel-adapter id="stdin" channel="stdinToJmsoutChannel"/>
<channel id="stdinToJmsoutChannel"/>
<channel id="jmsInChannel" />
<jms:outbound-channel-adapter id="jmsout" channel="jmsInChannel" destination="requestQueue"/>
</beans:beans>
**DemoConfig.xml**
<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xmlns:integration="http://www.springframework.org/schema/integration"
xmlns:jms="http://www.springframework.org/schema/integration/jms"
xmlns:beans="http://www.springframework.org/schema/beans"
xsi:schemaLocation="http://www.springframework.org/schema/beans
http://www.springframework.org/schema/beans/spring-beans.xsd
http://www.springframework.org/schema/integration
http://www.springframework.org/schema/integration/spring-integration.xsd
http://www.springframework.org/schema/integration/jms
http://www.springframework.org/schema/integration/jms/spring-integration-jms.xsd">
<bean id="connectionFactory" class="org.springframework.jms.connection.CachingConnectionFactory">
<property name="targetConnectionFactory">
<bean class="org.apache.activemq.ActiveMQConnectionFactory">
<property name="brokerURL" value="vm://localhost"/>
</bean>
</property>
<property name="sessionCacheSize" value="10"/>
<property name="cacheProducers" value="false"/>
</bean>
<bean id="connectionFactory2nd" class="org.springframework.jms.connection.CachingConnectionFactory">
<property name="targetConnectionFactory">
<bean class="org.apache.activemq.ActiveMQConnectionFactory">
<property name="brokerURL" value="vm://localhost"/>
</bean>
</property>
<property name="sessionCacheSize" value="10"/>
<property name="cacheProducers" value="false"/>
</bean>
<bean id="replyQueue" class="org.apache.activemq.command.ActiveMQQueue" >
<constructor-arg value="queue.reply"/>
</bean>
<bean id="requestQueue" class="org.apache.activemq.command.ActiveMQQueue">
<constructor-arg value="queue.request"/>
</bean>
<jms:message-driven-channel-adapter id="jmsIn"
destination="requestQueue"
channel="jmsInChannel"
connection-factory="connectionFactory"/>
<jms:publish-subscribe-channel id= "jmsInChannel"/>
<jms:topic id="Topic"></jms:topic>
</<jms:channel>
<jms:outbound-channel-adapter id="jmsout" channel="jmsInChannel" destination="replyQueue" connection-factory="connectionFactory2nd"/>
<integration:poller id="poller" default="true" fixed-delay="100"/>
</beans>
最佳答案
看起来你的理论知识还不够,所以你应该去看看有关 Spring Integration 的文档和书籍。你感觉不舒服是什么MessageChannel
还没有。
<jms:message-driven-channel-adapter id="jmsIn"
destination="requestQueue"
channel="jmsInChannel" />
含义:收听requestQueue
目的地并将 Spring 集成消息发送到 jmsInChannel
.
如果您只是将该消息发送到另一个 JMS 目的地,您应该执行如下操作:
<jms:outbound-channel-adapter id="jmsout" channel="jmsInChannel" destination="replyQueue"/>
并确保没有更多订阅者 jmsInChannel
,因为它是 DirectChannel
.
根据您当前的配置,您有额外的订阅者 <bridge>
。在这种情况下 Round-Robin
平衡器的作用是jmsInChannel
第一条消息将发送给第一个订阅者,只有第二条消息将发送给第二个订阅者,依此类推。
如果您想接受两个订阅者的消息,您应该更改 jmsInChannel
到<publish-subscribe-channel>
.
您可以从文档中找到更多信息。
关于java - Spring 集成 JMS 在入站和出站中发布订阅,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/27897941/