java - Spring 集成 JMS 在入站和出站中发布订阅

标签 java spring spring-integration

我正在尝试使用 Spring-Integration JMS 的示例项目进行 JMS 集成,我已经成功完成了。然而我的要求略有不同。我需要使用发布订阅模式从 1 个 JMS 代理进行监听,并且需要将相同的监听 JMS 消息发送到另一个 Kafak 队列/或其他队列。我正在努力进行配置,到目前为止我只配置了请求和响应队列。这是配置。请帮忙。

Common.xml

<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
    xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
    xmlns:integration="http://www.springframework.org/schema/integration"
    xsi:schemaLocation="http://www.springframework.org/schema/beans
            http://www.springframework.org/schema/beans/spring-beans.xsd
            http://www.springframework.org/schema/integration
            http://www.springframework.org/schema/integration/spring-integration.xsd">

    <bean id="connectionFactory" class="org.springframework.jms.connection.CachingConnectionFactory">
        <property name="targetConnectionFactory">
            <bean class="org.apache.activemq.ActiveMQConnectionFactory">
                <property name="brokerURL" value="vm://localhost"/>
            </bean>
        </property>
        <property name="sessionCacheSize" value="10"/>
        <property name="cacheProducers" value="false"/>
    </bean>

     <!-- <bean id="requestQueue" class="org.apache.activemq.command.ActiveMQQueue">
        <constructor-arg value="queue.demo"/>
    </bean> -->

    <bean id="replyQueue" class="org.apache.activemq.command.ActiveMQQueue">
        <constructor-arg value="queue.reply"/>
    </bean>
 <bean id="requestQueue" class="org.apache.activemq.command.ActiveMQQueue">
        <constructor-arg value="queue.request"/>
    </bean>

    <integration:poller id="poller" default="true" fixed-delay="100"/>

</beans>

InboudChanelAdapter

<?xml version="1.0" encoding="UTF-8"?>
<beans:beans xmlns="http://www.springframework.org/schema/integration"
    xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
    xmlns:beans="http://www.springframework.org/schema/beans"
    xmlns:jms="http://www.springframework.org/schema/integration/jms"
    xmlns:stream="http://www.springframework.org/schema/integration/stream"
    xsi:schemaLocation="http://www.springframework.org/schema/beans
            http://www.springframework.org/schema/beans/spring-beans.xsd
            http://www.springframework.org/schema/integration
            http://www.springframework.org/schema/integration/spring-integration.xsd
            http://www.springframework.org/schema/integration/jms
            http://www.springframework.org/schema/integration/jms/spring-integration-jms.xsd
            http://www.springframework.org/schema/integration/stream
            http://www.springframework.org/schema/integration/stream/spring-integration-stream.xsd">

    <jms:message-driven-channel-adapter id="jmsIn"
            destination="requestQueue"
            channel="jmsInChannel" />

    <channel id="jmsInChannel" />



    <beans:beans profile="testCase">

        <bridge input-channel="jmsInChannel" output-channel="queueChannel"/>

        <channel id="queueChannel">
            <queue />
        </channel>

     </beans:beans> 

</beans:beans>

OutboundChannelAdapter.xml

    <?xml version="1.0" encoding="UTF-8"?>
    <beans:beans xmlns="http://www.springframework.org/schema/integration"
        xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
        xmlns:beans="http://www.springframework.org/schema/beans"
        xmlns:jms="http://www.springframework.org/schema/integration/jms"
        xmlns:stream="http://www.springframework.org/schema/integration/stream"
        xsi:schemaLocation="http://www.springframework.org/schema/beans
                http://www.springframework.org/schema/beans/spring-beans.xsd
                http://www.springframework.org/schema/integration
                http://www.springframework.org/schema/integration/spring-integration.xsd
                http://www.springframework.org/schema/integration/jms
                http://www.springframework.org/schema/integration/jms/spring-integration-jms.xsd
                http://www.springframework.org/schema/integration/stream
                http://www.springframework.org/schema/integration/stream/spring-integration-stream.xsd">

        <stream:stdin-channel-adapter id="stdin" channel="stdinToJmsoutChannel"/>

        <channel id="stdinToJmsoutChannel"/>
        <channel id="jmsInChannel" />

        <jms:outbound-channel-adapter id="jmsout" channel="jmsInChannel" destination="requestQueue"/>

    </beans:beans>
 **DemoConfig.xml**

    <?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
    xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
    xmlns:integration="http://www.springframework.org/schema/integration"
    xmlns:jms="http://www.springframework.org/schema/integration/jms"
    xmlns:beans="http://www.springframework.org/schema/beans"
    xsi:schemaLocation="http://www.springframework.org/schema/beans
            http://www.springframework.org/schema/beans/spring-beans.xsd
            http://www.springframework.org/schema/integration
            http://www.springframework.org/schema/integration/spring-integration.xsd
            http://www.springframework.org/schema/integration/jms
            http://www.springframework.org/schema/integration/jms/spring-integration-jms.xsd">

    <bean id="connectionFactory" class="org.springframework.jms.connection.CachingConnectionFactory">
        <property name="targetConnectionFactory">
            <bean class="org.apache.activemq.ActiveMQConnectionFactory">
                <property name="brokerURL" value="vm://localhost"/>
            </bean>
        </property>
        <property name="sessionCacheSize" value="10"/>
        <property name="cacheProducers" value="false"/>
    </bean>


     <bean id="connectionFactory2nd" class="org.springframework.jms.connection.CachingConnectionFactory">
        <property name="targetConnectionFactory">
            <bean class="org.apache.activemq.ActiveMQConnectionFactory">
                <property name="brokerURL" value="vm://localhost"/>
            </bean>
        </property>
        <property name="sessionCacheSize" value="10"/>
        <property name="cacheProducers" value="false"/>
    </bean>



    <bean id="replyQueue" class="org.apache.activemq.command.ActiveMQQueue" >
        <constructor-arg value="queue.reply"/>
    </bean>
 <bean id="requestQueue" class="org.apache.activemq.command.ActiveMQQueue">
        <constructor-arg value="queue.request"/>
    </bean>

  <jms:message-driven-channel-adapter id="jmsIn"
        destination="requestQueue"
        channel="jmsInChannel" 
       connection-factory="connectionFactory"/>

   <jms:publish-subscribe-channel id= "jmsInChannel"/>
   <jms:topic id="Topic"></jms:topic>
   </<jms:channel>     

   <jms:outbound-channel-adapter id="jmsout" channel="jmsInChannel" destination="replyQueue" connection-factory="connectionFactory2nd"/>

    <integration:poller id="poller" default="true" fixed-delay="100"/>

</beans>

最佳答案

看起来你的理论知识还不够,所以你应该去看看有关 Spring Integration 的文档和书籍。你感觉不舒服是什么MessageChannel还没有。

<jms:message-driven-channel-adapter id="jmsIn"
        destination="requestQueue"
        channel="jmsInChannel" />

含义:收听requestQueue目的地并将 Spring 集成消息发送到 jmsInChannel .

如果您只是将该消息发送到另一个 JMS 目的地,您应该执行如下操作:

<jms:outbound-channel-adapter id="jmsout" channel="jmsInChannel" destination="replyQueue"/>

并确保没有更多订阅者 jmsInChannel ,因为它是 DirectChannel .

根据您当前的配置,您有额外的订阅者 <bridge> 。在这种情况下 Round-Robin平衡器的作用是jmsInChannel第一条消息将发送给第一个订阅者,只有第二条消息将发送给第二个订阅者,依此类推。

如果您想接受两个订阅者的消息,您应该更改 jmsInChannel<publish-subscribe-channel> .

您可以从文档中找到更多信息。

关于java - Spring 集成 JMS 在入站和出站中发布订阅,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/27897941/

相关文章:

java - 如何将 Vector 放入 intent.extra?

java - 在 ubuntu 中创建跨平台应用程序

java - 在基于 Spring 的 Java 服务器上管理与 Web 客户端的异步/推送通信的最佳方式

spring - 从Spring依赖管理插件中提取依赖版本

java - Spring 集成: Allocate Space when sending file to FTP

java - Spring Integration 错误 channel 未收到所有异常

java - 检查用户是否是 java 应用程序中的 root

java - iText7 将 SVG 添加到 PdfDocument 中以及可能出现的问题

java - 如何在 spring security 中正确注销用户

jackson 无法识别的字段异常,但字段在 JSON 中