java - Spring 集成 : Message released twice after delay

标签 java multithreading delay spring-integration delayed-execution

我正在使用以下 XML 片段:

<int-amqp:inbound-channel-adapter acknowledge-mode="MANUAL" channel="commandQueue" concurrent-consumers="${commandConsumers:10}"
                                  queue-names="commands" connection-factory="connectionFactory"/>
<int:channel id="commandQueue"/>
<int:channel id="commands"/>
<int:chain input-channel="commandQueue" output-channel="commands">
    <int:delayer id="commandDelayer" default-delay="30000"/>
    <int:json-to-object-transformer id="commandTransformer" type="com.airwatch.chat.command.Command"/>
</int:chain>

<int:payload-type-router input-channel="commands">
....
....

它正在执行这些任务:

  1. 使用来自名为“commands”的 RabbitMQ 队列的消息。
  2. 将消息执行延迟 30 秒。
  3. 在指定延迟后继续执行消息。

如果在使用上述代码的应用程序启动之前消息已经存在于命令队列中,则应用程序在启动时会在单独的线程中执行消息两次。

我想我知道为什么会这样。

一旦应用程序上下文完全初始化,Spring 就会重新安排在 DelayHandler 的消息存储中持久保存的消息。请引用以下来自 DelayHandler.java 的代码片段:

public void onApplicationEvent(ContextRefreshedEvent event) {
    if (!this.initialized.getAndSet(true)) {
        this.reschedulePersistedMessages();
    }
}

因此,如果消息在应用程序启动之前已经存在于 RabbitMQ 队列中,则在 Spring 上下文初始化期间,消息将从队列中拾取并添加到 DelayHandler 的消息存储中。上下文初始化完成后,如果在此期间消息未从消息存储中释放,则上述代码片段会重新安排同一消息。

现在,当两个独立的线程正在执行同一条消息时,如果一个线程已执行,则应从消息存储中删除该消息,而另一个线程不应继续执行。

当线程执行时,DelayHandler.java 中的下面一段代码允许第二个线程释放重复的消息,导致对同一条消息重复执行,因为消息存储是一个实例SimpleMessageStore 并且没有进一步的检查来停止执行。

private void doReleaseMessage(Message<?> message) {
    if (this.messageStore instanceof SimpleMessageStore
            || ((MessageStore) this.messageStore).removeMessage(message.getHeaders().getId()) != null) {
        this.messageStore.removeMessageFromGroup(this.messageGroupId, message);
        this.handleMessageInternal(message);
    }
    else {
        if (logger.isDebugEnabled()) {
            logger.debug("No message in the Message Store to release: " + message +
                    ". Likely another instance has already released it.");
        }
    }
}

这是 Spring Integration 中的错误吗?

最佳答案

哦,好吧!

那真是个可爱的 bug 。

感谢您指出这一点!

请提出 JIRA issue我们将在下一个版本中解决这个问题。

我可以解释发生了什么。

所有 Spring Integration 从 Lifecycle.start() 开始工作.在你的情况下 <int-amqp:inbound-channel-adapter>从 RabbitMQ 接收消息并将其发送到集成流。他们是delayed .

并且仅在 start 之后应用程序上下文引发 ContextRefreshedEvent .捕获那个甚至DelayHandlermessageStore 中获取所有消息并且,正如您所说,reschedules他们。

因此,是的,对于同一条消息,我们可能有两个计划任务。

有趣的是它只适用于SimpleMessageStore ,因为它没有 removeMessage存储到 groups 的消息的函数.

我看到了几种变体作为解决方法:

  1. 延迟 start对于 <int-amqp:inbound-channel-adapter> .例如,处理相同的 ContextRefreshedEvent来自 <inbound-channel-adapter>并发送@amqpAdapter.start()命令消息到 <control-bus>

  2. 另一个选项自 Spring Integration 4.1 起可用,其名称为 Idempotent Receiver .使用它你可以丢弃 duplicate消息,我猜 idempotentKey正是messageId .干净的幂等接收器模式!

  3. 还有一个选项在persistentMessageStore ,我们真正可以依靠的地方 removeMessage操作。

有关此事的 JIRA 票证:https://jira.spring.io/browse/INT-3560

关于java - Spring 集成 : Message released twice after delay,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/26895123/

相关文章:

java - GWT 历史 JavaScript 在 Internet Explorer 中无法运行

java - 了解hibernate缓存

java - 在新窗口中查找 xpath 位置,单击后打开。使用 java 和 selenium

ios - Xcode 6 - 线程 1 SIGABRT 错误

c++ 11 - 将成员函数传递给线程给出 : no overloaded function takes 2 arguments

delay - 我们可以为 dspic33ep256mu814 Controller 的 mplab 中的 xc16 中的 _delay_ms 添加延迟函数的限制是多少?

c - 使用 PIC32 的定时器 2 设置延迟

java - Java Swing Timer 的随机时间间隔?

java - JOptionPane.showInputDialog() 问题

java - 多线程中的监听器仅在匿名传递时调用,而不是在声明为成员时调用