java - 在 IntegrationFlow 服务激活器方法成功返回之前不确认 RabbitMQ 消息?

标签 java spring spring-integration spring-rabbit spring-integration-amqp

我的集成流程定义如下:

IntegrationFlows.from(Amqp.inboundAdapter(connectionFactory, "queueName")
                    .id("id")
                    .autoStartup(autoStartup)
                    .concurrentConsumers(2)
                    .maxConcurrentConsumers(3)
                    .messageConverter(messageConverter()))
                    .aggregate(a -> ...)
                    .handle(serviceActivatorBean, "myMethod", e -> e.advice(requestHandlerRetryAdviceForIntegrationFlow()))
                    .get();

serviceActivatorBean 的定义如下:

@Component
@Transactional
public class ServiceActivator {

    @ServiceActivator    
    public void myMethod(Collection<MyEvent> events) {
        ....
    }    
}

并且 requestHandlerRetryAdviceForIntegrationFlow() 的定义如下:

  public static RequestHandlerRetryAdvice requestHandlerRetryAdviceForIntegrationFlow() {
    RequestHandlerRetryAdvice advice = new RequestHandlerRetryAdvice();
    RetryTemplate retryTemplate = new RetryTemplate();
    SimpleRetryPolicy retryPolicy = new SimpleRetryPolicy();
    retryPolicy.setMaxAttempts(MAX_VALUE);
    retryTemplate.setRetryPolicy(retryPolicy);
    retryTemplate.setListeners(new RetryListenerSupport[]{new RetryListenerSupport() {
        @Override
        public <T, E extends Throwable> void onError(RetryContext context, RetryCallback<T, E> callback, Throwable throwable) {
            log.error("Caught exception {} (retry count {}), will retry again!", throwable.getClass().getSimpleName(),
                    context.getRetryCount(), throwable);
        }
    }});
    advice.setRetryTemplate(retryTemplate);
    ExponentialBackOffPolicy backOffPolicy = new ExponentialBackOffPolicy();
    backOffPolicy.setMaxInterval(5000L);
    backOffPolicy.setInitialInterval(200L);
    backOffPolicy.setMultiplier(2);
    retryTemplate.setBackOffPolicy(backOffPolicy);
    return advice;
}

我们面临的问题是,当服务激活器中的events集合包含2个或更多事件时,并且由于某种原因myMethod的处理失败并且服务器崩溃。似乎发生的情况是,IntegrationFlow 一次使用并确认来自 RabbitMQ 的一条消息,因此,如果服务器在处理 myMethod 期间崩溃,除了最后一个事件之外的所有事件都会丢失。这对我们来说既不好又不够安全。我们可以做些什么来将 IntegrationFlow 配置为在服务激活器中的 myMethod 成功完成之前不确认任何消息吗?

最佳答案

关于java - 在 IntegrationFlow 服务激活器方法成功返回之前不确认 RabbitMQ 消息?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/47455784/

相关文章:

java - 链接给出了来自代码的无效响应代码,但来自浏览器的有效响应代码

java - 从连接池获取连接的静态类

java - 在JAVA中将字符串分成子字符串

java - 没有使用 Spring Security 进行身份验证和授权

java - Spring Integration 入站 JMS 与 JBoss createConnection() : IllegalArgumentException: ClassCastException

java - HttpRequestHandlingMessagingGateway PayloadExpression java 配置

java - 在java中使用jfree图表制作动态折线图

java - 如何在 spring-boot 项目中的 pom.xml 添加非 spring-boot 依赖

json - 如何在 Spring Boot 中覆盖默认的 JSON 响应

java - 使用 Spring Integration JDBC 出站网关更新数据