java - 服务器重启后将消息保留在主题中

标签 java jms activemq spring-integration

我正在学习 Spring Integration JMS。我遇到了一个问题,我的主题不会保留客户端尚未使用的待处理消息。

基本上,我启动 ActiveMQ,然后使用 REST 客户端调用生产者发送消息 50 次,以便 50 条消息在主题中排队。在消费者端,我应用了 5 秒的 sleep 计时器,以便每条消息以 5 秒的固定间隔被消费。然后在这期间我停止了 ActiveMQ。同时,客户端会消耗一些消息,假设 50 条消息中有 15 条已被消耗。然后,如果我重新启动 ActiveMQ,我预计主题会保留待处理的 35 条消息,但我在管理控制台的“主题”选项卡下看不到它。

这是我的配置文件:

<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
       xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
       xmlns:context="http://www.springframework.org/schema/context"
       xmlns:int="http://www.springframework.org/schema/integration"
       xmlns:int-jms="http://www.springframework.org/schema/integration/jms"
       xmlns:oxm="http://www.springframework.org/schema/oxm"
       xmlns:int-jme="http://www.springframework.org/schema/integration"
       xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-2.5.xsd
                http://www.springframework.org/schema/context http://www.springframework.org/schema/context/spring-context-2.5.xsd
                http://www.springframework.org/schema/integration http://www.springframework.org/schema/integration/spring-integration.xsd
                http://www.springframework.org/schema/integration/jms http://www.springframework.org/schema/integration/jms/spring-integration-jms.xsd
                http://www.springframework.org/schema/oxm http://www.springframework.org/schema/oxm/spring-oxm-3.0.xsd">


    <!-- Component scan to find all Spring components -->
    <context:component-scan base-package="com.geekcap.springintegrationexample" />

    <bean class="org.springframework.web.servlet.mvc.annotation.AnnotationMethodHandlerAdapter">
        <property name="order" value="1" />
        <property name="messageConverters">
            <list>
                <!-- Default converters -->
                <bean class="org.springframework.http.converter.StringHttpMessageConverter"/>
                <bean class="org.springframework.http.converter.FormHttpMessageConverter"/>
                <bean class="org.springframework.http.converter.ByteArrayHttpMessageConverter" />
                <bean class="org.springframework.http.converter.xml.SourceHttpMessageConverter"/>
                <bean class="org.springframework.http.converter.BufferedImageHttpMessageConverter"/>
                <bean class="org.springframework.http.converter.json.MappingJackson2HttpMessageConverter" />
            </list>
        </property>
    </bean>

    <!-- Define a channel to communicate out to a JMS Destination -->
    <int:channel id="topicChannel"/>

    <!-- Define the ActiveMQ connection factory -->
    <bean id="connectionFactory" class="org.apache.activemq.spring.ActiveMQConnectionFactory">
        <property name="brokerURL" value="tcp://localhost:61616"/>
    </bean>

    <!--
        Define an adaptor that route topicChannel messages to the myTopic topic; the outbound-channel-adapter
        automagically fines the configured connectionFactory bean (by naming convention
      -->
    <int-jms:outbound-channel-adapter channel="topicChannel"
                                      destination-name="topic.myTopic"
                                      pub-sub-domain="true" />

    <!-- Create a channel for a listener that will consume messages-->
    <int:channel id="listenerChannel" />

    <int-jms:message-driven-channel-adapter id="messageDrivenAdapter"
                                            channel="getPayloadChannel"
                                            destination-name="topic.myTopic"
                                            pub-sub-domain="true" />

    <int:service-activator input-channel="listenerChannel" ref="messageListenerImpl" method="processMessage" />

    <int:channel id="getPayloadChannel" />

    <int:service-activator input-channel="getPayloadChannel" output-channel="listenerChannel" ref="retrievePayloadServiceImpl" method="getPayload" />

</beans>

我还了解到默认模式是持久的。但就我而言,它似乎不起作用。

编辑:

根据添加属性后 Gary Russel 给出的答案

  • 订阅持久=“true”
  • durable-subscription-name="mySubscription"

<int-jms:message-driven-channel-adapter>我面临 XML 相关问题

  • cvc-complex-type.3.2.2:属性“subscription-durable”不允许出现在元素“int-jms:message-driven-channel-adapter”中。

  • cvc-complex-type.3.2.2:属性“durable-subscription-name”不允许出现在元素“int-jms:message-driven-channel-adapter”中。

enter image description here

请帮忙

最佳答案

这就是主题的工作方式,默认情况下,阅读 JMS 规范。

主题是发布/订阅;只有在场的订阅者才能收到该消息。

如果发布了5个,则启动消费者,再发布5个;他只会得到第二个 5。

如果你在经纪人获得全部 5 之前杀死他;在重新启动期间,代理发现没有消费者,因此他清除了消息。

您可以通过使用持久订阅来更改此行为,在这种情况下,代理确实会保留每个此类订阅的消息,即使当前未连接。

要使用 Spring Integration 进行配置,请在消息驱动 channel 适配器上设置 subscription-durable 并为其指定一个唯一的subscription-name

关于java - 服务器重启后将消息保留在主题中,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/33146375/

相关文章:

java - AES - 生成 key 并加载它会产生无效 key 异常

java - 解析html代码还是使用java的正则表达式?

java - StringEncoder是否随机分割一个极长的字符串?

java - 我们如何从 java 运行 python 脚本(使用 nltk 和 scrapy)

java - SLF4J 链接错误

spring - Weblogic 服务器上带有 Spring 的 Camel JMS 组件

java - 多线程 JMS 客户端 ActiveMQ

javascript - activemq 的工作原理

java - JMS关闭客户端资源(MessageConsumer、Session、Connection)

java - 嵌入式 ActiveMQ 代理需要哪些依赖项?