java - 升级到 Spring Integration 5 后消费消息不再有效

标签 java spring-boot spring-integration spring-integration-dsl

我正在尝试将使用 Spring Integration 4.3 和 Spring Boot 1.6 的项目升级到 Spring Integration 5.1 和 Spring Boot 2.1。以前我有以下配置:

IntegrationFlows.from(Amqp.inboundAdapter(connectionFactory, "queueName")
                    .id("myId")
                    .autoStartup(autoStartup)
                    .prefetchCount(10)
                    .concurrentConsumers(2)
                    .maxConcurrentConsumers(3)
                    .messageConverter(messageConverter()))
                    .aggregate(a -> a.correlationExpression("payload.entityId")
                                    .releaseExpression("size() eq iterator().next().payload.batchSize")
                                    .sendPartialResultOnExpiry(true)
                                    .groupTimeout(2000)
                                    .expireGroupsUponCompletion(true)
                                    .outputProcessor(myMessageGroupProcessor))
                    .handle(serviceActivatorBean, "myMethod", e -> e.advice(requestHandlerRetryAdviceForIntegrationFlow()))
                    .get();

在升级过程中,我尝试遵循文档 here因此将配置更改为:

@Configuration
@EnableAutoConfiguration
@EnableIntegration
public class SpringConfig {

    @Bean(name = "myFlowId")
    public IntegrationFlow myFlow(ConnectionFactory connectionFactory, ServiceActivatorBean serviceActivatorBean,
                                  @Value("${spring.integration.flow.auto-startup:true}") boolean autoStartup,
                                  MyMessageGroupProcessor myMessageGroupProcessor) {
        IntegrationFlows.from(Amqp.inboundAdapter(connectionFactory, "queueName")
                        .id("myId")
                        .autoStartup(autoStartup)
                        .configureContainer(c -> c.acknowledgeMode(MANUAL)
                            .prefetchCount(10)
                            .concurrentConsumers(2)
                            .maxConcurrentConsumers(3)
                        )
                        .messageConverter(messageConverter()))
                        .aggregate(a -> a.correlationExpression("payload.entityId")
                                        .releaseExpression("size() eq one().payload.batchSize")
                                        .sendPartialResultOnExpiry(true)
                                        .groupTimeout(2000)
                                        .expireGroupsUponCompletion(true)
                                        .outputProcessor(myMessageGroupProcessor))
                        .handle(serviceActivatorBean, "myMethod", e -> e.advice(requestHandlerRetryAdviceForIntegrationFlow()))
                        .get();
    }
}

但是当我发布消息时,集成流程似乎没有接收/处理它们。我没有收到任何错误日志(即使我启用了调试日志记录,也没有收到任何日志),而且我不太确定从哪里开始调试。我确信消息实际上已发布到 RabbitMQ,所以这不是问题。我可能会错过什么?

最佳答案

我的问题实际上不是由于 Spring Integration,而是与 Spring AMQP 的更改有关。以前“可声明”可以这样创建:

@Bean
List<Binding> myBinding() {
    return List.of(<binding1>, <binding2>, ..)
}

但在 Spring AMQP 2.1 中应更改为:

@Bean
Declarables myBinding() {
    return new Declarables(List.of(<binding1>, <binding2>, ..))
}

查看文档 here .

顺便说一句,我的releaseExpression也错了,应该是size() eq one.payload.batchSize

关于java - 升级到 Spring Integration 5 后消费消息不再有效,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/55918779/

相关文章:

java - 在 hadoop-examples jar 文件上运行 wordcount 时出现 "Not a valid JAR"

spring - 如何从Spring Data REST产生的表示中删除超媒体元素?

spring - 如何使用 Spring Integration 移入 GCP Storage 后从本地目录中删除文件

java - 列表中的 SQL 异常

Java 防止在类外调用私有(private)或 protected 方法

spring-mvc - 新的Spring MVC 4中的样式表

java - 如何根据SFTP适配器的Spring集成中的文件名生成传入文件的动态本地目录

java - com.jcraft.jsch.JSchException : verify: false 错误

java - 在 Java8 中使用 lambda 仅在不为空时过滤值

java - Jlink Spring Boot