java - 用于多个 JSON 对象模式的 Spring IntegrationFlow

标签 java json spring amazon-web-services spring-integration

我尝试通过设置轮询器定期从队列中轮询 (JSON) 消息并处理消息然后保存到我的数据库来实现 Spring IntegrationFlow 以使用 AWS SQS 队列。

我成功地从队列中轮询单个 JSON 消息模式并转换为我的自定义对象。现在我有 2 种类型的 JSON 模式发送到同一个 SQS 队列。例如,

`"Type" : "Notification",
"MessageId" : "xxxx-xxxx-xxxx",
"TopicArn" : "arn:aws:sns:us-west-2:xxxxx:topicName00",
"Subject" : "OK: \"test00\" in US-West-2",
"Message" : "{\"AlarmName\":\"test00\..."`

`"Type" : "Notification",
"MessageId" : "xxxxx-xxxx-xxxxx",
"TopicArn" : "arn:aws:sns:us-west-2:xxxxxx:topicName01",
"Message" : "{\"version\":\"0\",\"id\":\"xxxxx\",\"detail-type\":\"EC2 Instance State-change Notification\",\"source\":\"aws.ec2\..."`

这些消息被发送到同一个队列,我想使用同一个轮询器轮询队列,然后根据消息正文将消息路由到不同的转换器和 serviceActivator(handle)。

@Bean
public IntegrationFlow sqsIntegrationFlow()
{
    return IntegrationFlows.from( this.sqsMessageSource(), c -> c.poller( myPoller() ) )
            .channel( new DirectChannel() )
            .<Payload,Boolean>route( input -> input.value().contains( "EC2 Instance State-change Notification" ),
                    mapping -> mapping
                            .subFlowMapping( "true", sf -> sf.channel( new DirectChannel() )
                                    .transform(
                                            SqsMessageToInstanceConverter::convertSqsMessagesToInstanceInfo )
                                    .channel( new DirectChannel() ).handle( ( message ) -> {
                                        ec2InstanceService.updateInstanceInfo( (List<SqsMessageResult>) message.getPayload() );
                                    } ) )
                            .subFlowMapping( "false", sf -> sf.channel( new DirectChannel() )
                                    .transform( SqsMessageToInstanceConverter::convertSqsMessageToAlarmInfo )
                                    .channel( new DirectChannel() ).handle( (alarm -> {
                                        cwAlarmService.updateAlarmInfo(
                                                (List<SqsAlarmMessageResult>) alarm.getPayload() );
                                    }) ) ) )
            .get();
}

我尝试如上所述使用路由器,并使用消息正文中的字符串(“EC2 实例状态更改通知”)识别消息,但出现错误

java.lang.ClassNotFoundException:org.springframework.integration.support.management.MappingMessageRouterManagement

我的问题是:

1.这是路由器的正确使用方式吗?

2。如何实现使用集成流程处理 2 条不同 JSON 消息的目标?

最佳答案

是的,它是正确的(我大约一个小时前写了一个类似的流程)。看起来你有某种类路径问题 - 该接口(interface)与路由器位于同一个 jar 中。您在哪里运行您的应用程序?

尝试使用 -verbose JVM arg 运行,我刚刚做了并得到了这个...

...
[Loaded org.springframework.integration.router.AbstractMessageRouter from file:/Users/.../.m2/repository/org/springframework/integration/spring-integration-core/4.2.5.RELEASE/spring-integration-core-4.2.5.RELEASE.jar]
[Loaded org.springframework.integration.support.management.MappingMessageRouterManagement from file:/Users/.../.m2/repository/org/springframework/integration/spring-integration-core/4.2.5.RELEASE/spring-integration-core-4.2.5.RELEASE.jar]
...

关于java - 用于多个 JSON 对象模式的 Spring IntegrationFlow,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/37336196/

相关文章:

java - 如何检查 Java 枚举中的值?

java - 最小化所有应用程序

java - Generic Dao 的设计模式,Spring 中的服务层,不使用 Hibernate

c++ - 使用 jsoncpp 创建空的 json 数组

java - 从AJAX接收HTTP POST,并在后端java中读取它

java - Spring Boot 记录器换行符

java - 是否可以使用 Spring 配置在运行时设置 JMS 监听器目标

spring - 如何在 Spring Reactor Web 应用程序中执行一系列操作并确保一个操作在下一个操作之前完成?

java - hibernate 恩维尔 : @AuditJoinTable rows missing

javascript - AJAX jquery json发送数组到php