java - 使用 Apache Camel 和 ActiveMQ 时如何处理系统崩溃

标签 java spring-boot apache-camel activemq

我使用 Apache Camel 和 ActiveMQ 来执行大小为 1000 的批处理。我的理解是 Apache Camel 将从队列中检索 1000 个数据并对其进行处理,然后一旦处理完成,它将推送到 TO 队列。我的问题是,如果处理这些数据的系统在推送 TO 队列之前崩溃,会发生什么情况。该数据是否保留在 ActiveMQ 中,或者永远丢失。

在没有Apache Camel的场景中,我们可以为监听器配置不同的确认模式,例如CLIENT_ACKNOWLEDGE。

Apache Camel 中有类似的东西吗?下面是我的 JMS Config 类

public class JMSConfig {

    private Environment env;

    public JMSConfig(Environment env) {
        this.env = env;
    }

    @Bean
    public ConnectionFactory connectionFactory() {
        ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory();
        return connectionFactory;
    }

    @Bean
    public JmsTransactionManager geJmsTransactionManager(final ConnectionFactory connectionFactory) {
        JmsTransactionManager jmsTransactionManager = new JmsTransactionManager();
        jmsTransactionManager.setConnectionFactory(connectionFactory);
        return jmsTransactionManager;
    }

    @Bean
    public JmsComponent createJmsComponent(final ConnectionFactory connectionFactory, final JmsTransactionManager jmsTransactionManager) {
        JmsComponent jmsComponent = JmsComponent.jmsComponentTransacted(connectionFactory, jmsTransactionManager);
        return jmsComponent;
    }
}

最佳答案

我不太明白“大小为 1000 的批处理”的含义,但听起来您消耗了 1000 条消息,将它们提交给代理,然后继续处理它们。这就是问题的根源。

如果您希望避免消息丢失,则必须使用来自代理的事务,并且在消息“安全”之前不要提交消息

最简单的例子:

from("activemq:queue:myInput")
.to("activemq:queue:myOutput")

如果 JMS 组件正确配置为使用事务处理,即使您在处理过程中终止 Camel 应用程序,也不会丢失此路由中的消息

这是因为代理会保留消息直到它们被提交。 Camel 在将消息交给 myOutput 之前就提交了消息。因此这是完全安全的。

但是,当您消费一批 1000 条消息时,我假设它们在消费后立即提交,因此在代理上删除。因此它们不再持久存在任何地方,只存在于 Camel 应用程序的内存中。如果它下降了,他们就会迷失。

请注意,您不需要 Spring TxManager 来消费事务。查看Camel docs about transacted JMS ,特别是关于事务式 DMLC 的点(Camel JMS 使用 Springs MessageListenerContainer)。这会减少您的配置,并且您也不需要将您的 Camel 路由标记为已处理。

顺便说一句,如果您的路由包含有状态组件(例如聚合器),事情就会变得复杂,因为这些组件引入了新的事务边界。

关于java - 使用 Apache Camel 和 ActiveMQ 时如何处理系统崩溃,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/59279870/

相关文章:

java - Java 中的 RxTx Serial Com,平台无关

java - 清除给定的命令行字符串以匹配文件路径

spring webflux webclient响应将字符串列表转换为字符串

java - Camel 类型转换器和对象继承

apache-camel - Apache Camel : Exhausted after delivery attempt -> how debug?

java - 如何重写以下方法而不出现未经检查的警告?

java - 如何在 Genymotion 上安装 Twitter 和 Facebook 等其他应用程序

java - 使用 JaCoCo 和 spring-boot-maven-plugin 生成代码覆盖率

java - : SpringBoot方法中Principal是如何注入(inject)的

java - "Simple"表达式语言 - 算术运算符?