java - 建议使用 Spring Integration Java DSL 进行失败转换的正确方法是什么

标签 java spring transform spring-integration dsl

我已经完成了一条“快乐之路”(如下)。

我如何建议 .transform 调用调用错误流(通过 errorChannel)而不中断 mainFlow

目前 mainFlow 在第二个 .transform 中第一次出现故障时终止(当有效负载无法反序列化为类型时)。我想要的行为是我想记录并继续处理。

我读过有关 ExpressionEvaluatingRequestHandlerAdvice 的内容。我是否会向每个 .transform 调用添加第二个参数,例如 e -> e.advice(myAdviceBean) 并用 success 声明这样的 bean和 error channel ?假设我需要分解我的 mainFlow 才能从每个转换中获得成功。

在一些评论方向上,我更新了原始代码示例。但我仍然无法“一路回家”。

2015-09-08 11:49:19,664 [pool-3-thread-1] org.springframework.integration.handler.ServiceActivatingHandler DEBUG handler 'ServiceActivator for [org.springframework.integration.dsl.support.BeanNameMessageProcessor@5f3839ad] (org.springframework.integration.handler.ServiceActivatingHandler#0)' produced no reply for request Message: ErrorMessage [payload=org.springframework.integration.handler.advice.ExpressionEvaluatingRequestHandlerAdvice$MessageHandlingExpressionEvaluatingAdviceException: Handler Failed; nested exception is org.springframework.integration.transformer.MessageTransformationException: failed to transform message; nested exception is com.fasterxml.jackson.databind.exc.UnrecognizedPropertyException: Unrecognized field "hasDoaCostPriceChanged" (class com.xxx.changehistory.jdbc.data.RatePlanLevelRestrictionLog), not marked as ignorable (18 known properties: "supplierUpdateDate", "fPLOSMaskArrival", "createDate", "endAllowed", "sellStateId", "ratePlanLevel", "ratePlanId", "startAllowed", "stayDate", "doaCostPriceChanged", "hotelId", "logActionTypeId" [truncated]])
 at [Source: java.util.zip.GZIPInputStream@242017b8; line: 1, column: 32] (through reference chain: com.xxx.changehistory.jdbc.data.RatePlanLevelRestrictionLog["hasDoaCostPriceChanged"]), headers={id=c054d976-5750-827f-8894-51aba9655c77, timestamp=1441738159660}]
2015-09-08 11:49:19,664 [pool-3-thread-1] org.springframework.integration.channel.DirectChannel DEBUG postSend (sent=true) on channel 'errorChannel', message: ErrorMessage [payload=org.springframework.integration.handler.advice.ExpressionEvaluatingRequestHandlerAdvice$MessageHandlingExpressionEvaluatingAdviceException: Handler Failed; nested exception is org.springframework.integration.transformer.MessageTransformationException: failed to transform message; nested exception is com.fasterxml.jackson.databind.exc.UnrecognizedPropertyException: Unrecognized field "hasDoaCostPriceChanged" (class com.xxx.changehistory.jdbc.data.RatePlanLevelRestrictionLog), not marked as ignorable (18 known properties: "supplierUpdateDate", "fPLOSMaskArrival", "createDate", "endAllowed", "sellStateId", "ratePlanLevel", "ratePlanId", "startAllowed", "stayDate", "doaCostPriceChanged", "hotelId", "logActionTypeId" [truncated]])
 at [Source: java.util.zip.GZIPInputStream@242017b8; line: 1, column: 32] (through reference chain: com.xxx.changehistory.jdbc.data.RatePlanLevelRestrictionLog["hasDoaCostPriceChanged"]), headers={id=c054d976-5750-827f-8894-51aba9655c77, timestamp=1441738159660}]
2015-09-08 11:49:19,664 [pool-3-thread-1] org.springframework.integration.channel.DirectChannel DEBUG preSend on channel 'mainFlow.channel#3', message: GenericMessage [payload=java.util.zip.GZIPInputStream@242017b8, headers={id=b80106f9-7f4c-1b92-6aca-6e73d3bf8792, timestamp=1441738159664}]
2015-09-08 11:49:19,664 [pool-3-thread-1] org.springframework.integration.aggregator.AggregatingMessageHandler DEBUG org.springframework.integration.aggregator.AggregatingMessageHandler#0 received message: GenericMessage [payload=java.util.zip.GZIPInputStream@242017b8, headers={id=b80106f9-7f4c-1b92-6aca-6e73d3bf8792, timestamp=1441738159664}]
2015-09-08 11:49:19,665 [pool-3-thread-1] org.springframework.integration.channel.DirectChannel DEBUG preSend on channel 'errorChannel', message: ErrorMessage [payload=org.springframework.messaging.MessageHandlingException: error occurred in message handler [org.springframework.integration.aggregator.AggregatingMessageHandler#0]; nested exception is java.lang.IllegalStateException: Null correlation not allowed.  Maybe the CorrelationStrategy is failing?, headers={id=24e3a1c7-af6b-032c-6a29-b55031fba0d7, timestamp=1441738159665}]
2015-09-08 11:49:19,665 [pool-3-thread-1] org.springframework.integration.handler.ServiceActivatingHandler DEBUG ServiceActivator for [org.springframework.integration.dsl.support.BeanNameMessageProcessor@5f3839ad] (org.springframework.integration.handler.ServiceActivatingHandler#0) received message: ErrorMessage [payload=org.springframework.messaging.MessageHandlingException: error occurred in message handler [org.springframework.integration.aggregator.AggregatingMessageHandler#0]; nested exception is java.lang.IllegalStateException: Null correlation not allowed.  Maybe the CorrelationStrategy is failing?, headers={id=24e3a1c7-af6b-032c-6a29-b55031fba0d7, timestamp=1441738159665}]
2015-09-08 11:49:19,665 [pool-3-thread-1] com.xxx.DataMigrationModule$ErrorService ERROR org.springframework.messaging.MessageHandlingException: error occurred in message handler [org.springframework.integration.aggregator.AggregatingMessageHandler#0]; nested exception is java.lang.IllegalStateException: Null correlation not allowed.  Maybe the CorrelationStrategy is failing?
    at org.springframework.integration.handler.AbstractMessageHandler.handleMessage(AbstractMessageHandler.java:84)
    at org.springframework.integration.dispatcher.AbstractDispatcher.tryOptimizedDispatch(AbstractDispatcher.java:116)
    at org.springframework.integration.dispatcher.UnicastingDispatcher.doDispatch(UnicastingDispatcher.java:101)
    at org.springframework.integration.dispatcher.UnicastingDispatcher.dispatch(UnicastingDispatcher.java:97)
    at org.springframework.integration.channel.AbstractSubscribableChannel.doSend(AbstractSubscribableChannel.java:77)
    at org.springframework.integration.channel.AbstractMessageChannel.send(AbstractMessageChannel.java:287)
    at org.springframework.integration.channel.AbstractMessageChannel.send(AbstractMessageChannel.java:245)
    at org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:115)
    at org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:45)
    at org.springframework.messaging.core.AbstractMessageSendingTemplate.send(AbstractMessageSendingTemplate.java:95)
    at org.springframework.integration.handler.AbstractMessageProducingHandler.sendOutput(AbstractMessageProducingHandler.java:231)
    at org.springframework.integration.handler.AbstractMessageProducingHandler.produceOutput(AbstractMessageProducingHandler.java:154)
    at org.springframework.integration.handler.AbstractMessageProducingHandler.sendOutputs(AbstractMessageProducingHandler.java:102)
    at org.springframework.integration.handler.AbstractReplyProducingMessageHandler.handleMessageInternal(AbstractReplyProducingMessageHandler.java:105)
    at org.springframework.integration.handler.AbstractMessageHandler.handleMessage(AbstractMessageHandler.java:78)
    at org.springframework.integration.dispatcher.AbstractDispatcher.tryOptimizedDispatch(AbstractDispatcher.java:116)
    at org.springframework.integration.dispatcher.UnicastingDispatcher.doDispatch(UnicastingDispatcher.java:101)
    at org.springframework.integration.dispatcher.UnicastingDispatcher.access$000(UnicastingDispatcher.java:48)
    at org.springframework.integration.dispatcher.UnicastingDispatcher$1.run(UnicastingDispatcher.java:92)
    at org.springframework.integration.util.ErrorHandlingTaskExecutor$1.run(ErrorHandlingTaskExecutor.java:52)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
    at java.lang.Thread.run(Thread.java:745)
Caused by: java.lang.IllegalStateException: Null correlation not allowed.  Maybe the CorrelationStrategy is failing?
    at org.springframework.util.Assert.state(Assert.java:385)
    at org.springframework.integration.aggregator.AbstractCorrelatingMessageHandler.handleMessageInternal(AbstractCorrelatingMessageHandler.java:369)
    at org.springframework.integration.handler.AbstractMessageHandler.handleMessage(AbstractMessageHandler.java:78)
    ... 22 more

UPDATED (09-08-2015)

code sample

@Bean
public IntegrationFlow mainFlow() {
    // @formatter:off
    return IntegrationFlows
            .from(
                    amazonS3InboundSynchronizationMessageSource(),
                    e -> e.poller(p -> p.trigger(this::nextExecutionTime))
            )
            .transform(unzipTransformer())
            .split(f -> new FileSplitter())
            .channel(MessageChannels.executor(Executors.newCachedThreadPool()))
            .transform(Transformers.fromJson(persistentType()), , e -> e.advice(handlingAdvice()))
            // @see http://docs.spring.io/spring-integration/reference/html/messaging-routing-chapter.html#agg-and-group-to
            .aggregate(a -> 
                            a.releaseStrategy(g -> g.size() == persistenceBatchSize)
                            .expireGroupsUponCompletion(true)
                            .sendPartialResultOnExpiry(true)
                            .groupTimeoutExpression("size() ge 2 ? 10000 : -1")
                            , null
            )
            .handle(jdbcRepositoryHandler())
            // TODO add advised PollableChannel to deal with possible persistence issue and retry with partial batch
            .get();
    // @formatter:on
}

@Bean
public ErrorService errorService() {
    return new ErrorService();
}

@Bean
public MessageChannel customErrorChannel() {
    return MessageChannels.direct().get();
}

@Bean
public IntegrationFlow customErrorFlow() {
    // @formatter:off
    return IntegrationFlows
            .from(customErrorChannel())
            .handle("errorService", "handleError")
            .get();
    // @formatter:on
}

@Bean
ExpressionEvaluatingRequestHandlerAdvice handlingAdvice() {
    ExpressionEvaluatingRequestHandlerAdvice advice = new ExpressionEvaluatingRequestHandlerAdvice();
    advice.setOnFailureExpression("payload");
    advice.setFailureChannel(customErrorChannel());
    advice.setReturnFailureExpressionResult(true);
    advice.setTrapException(true);
    return advice;
}

protected class ErrorService implements ErrorHandler {

    private final Logger log = LoggerFactory.getLogger(getClass());

    @Override
    public void handleError(Throwable t) {
        stopEndpoints(t);
    }

    private void stopEndpoints(Throwable t) {
        log.error(ExceptionUtils.getStackTrace(t));
    }

}

最佳答案

原来我在几个地方出了问题,比如:

  • 我必须 Autowiring 一个 Jackson2 ObjectMapper(我从 Sprint Boot 自动配置中获得)并构造一个 JsonObjectMapper 实例作为第二个 arg 添加在 Transformers.fromJson 中;更宽松地解码为持久类型(停止 UnrecognizedPropertyException);从而免除了对 ExpressionEvaluatingRequestHandlerAdvice

  • 的需求
  • IntegrationFlowDefinition 中选择 .split 方法的正确变体以使用 FileSplitter,否则您不会得到此拆分器更像是一个 DefaultMessageSplitter,它会在从 InputStream

  • 中读取第一条记录后提前终止流
  • 使用异步任务执行器将 transformaggregatehandle 移动到它自己的 pubsub channel

    <

虽然还没有 100% 满足我的需求,但已经远远超过了。

看看我在下面得到了什么......

@Configuration
@EnableIntegration
@IntegrationComponentScan
public class DataMigrationModule {

private final Logger log = LoggerFactory.getLogger(getClass());

@Value("${cloud.aws.credentials.accessKey}")
private String accessKey;

@Value("${cloud.aws.credentials.secretKey}")
private String secretKey;

@Value("${cloud.aws.s3.bucket}")
private String bucket;

@Value("${cloud.aws.s3.max-objects-per-batch:1024}")
private int maxObjectsPerBatch;

@Value("${cloud.aws.s3.accept-subfolders:false}")
private String acceptSubFolders;

@Value("${cloud.aws.s3.remote-directory}")
private String remoteDirectory;

@Value("${cloud.aws.s3.local-directory-ref:java.io.tmpdir}")
private String localDirectoryRef;

@Value("${cloud.aws.s3.local-subdirectory:target/s3-dump}")
private String localSubdirectory;

@Value("${cloud.aws.s3.filename-wildcard:}")
private String fileNameWildcard;

@Value("${app.persistent-type:}")
private String persistentType;

@Value("${app.repository-type:}")
private String repositoryType;

@Value("${app.persistence-batch-size:2500}")
private int persistenceBatchSize;

@Value("${app.persistence-batch-release-timeout-in-milliseconds:5000}")
private int persistenceBatchReleaseTimeoutMillis;

@Autowired
private ListableBeanFactory beanFactory;

@Autowired
private ObjectMapper objectMapper;

private final AtomicBoolean invoked = new AtomicBoolean();

private Class<?> repositoryType() {
    try {
        return Class.forName(repositoryType);
    } catch (ClassNotFoundException cnfe) {
        log.error("Unknown repository implementation!", cnfe);
        System.exit(0);
    }
    return null;
}

private Class<?> persistentType() {
    try {
        return Class.forName(persistentType);
    } catch (ClassNotFoundException cnfe) {
        log.error("Unsupported type!", cnfe);
        System.exit(0);
    }
    return null;
}

public Date nextExecutionTime(TriggerContext triggerContext) {
    return this.invoked.getAndSet(true) ? null : new Date();
}

@Bean
public FileToInputStreamTransformer unzipTransformer() {
    FileToInputStreamTransformer transformer = new FileToInputStreamTransformer();
    transformer.setDeleteFiles(true);
    return transformer;
}

@Bean
public MessageSource<?> amazonS3InboundSynchronizationMessageSource() {
    AWSCredentials credentials = new BasicAWSCredentials(this.accessKey, this.secretKey);
    AmazonS3InboundSynchronizationMessageSource messageSource = new AmazonS3InboundSynchronizationMessageSource();
    messageSource.setCredentials(credentials);
    messageSource.setBucket(bucket);
    messageSource.setMaxObjectsPerBatch(maxObjectsPerBatch);
    messageSource.setAcceptSubFolders(Boolean.valueOf(acceptSubFolders));
    messageSource.setRemoteDirectory(remoteDirectory);
    if (!fileNameWildcard.isEmpty()) {
        messageSource.setFileNameWildcard(fileNameWildcard);
    }
    String directory = System.getProperty(localDirectoryRef);
    if (!localSubdirectory.startsWith("/")) {
        localSubdirectory = "/" + localSubdirectory;
    }
    if (!localSubdirectory.endsWith("/")) {
        localSubdirectory = localSubdirectory + "/";
    }
    directory = directory + localSubdirectory;
    FileUtils.mkdir(directory);
    messageSource.setDirectory(new LiteralExpression(directory));
    return messageSource;
}

@Bean
public IntegrationFlow mainFlow() {
    // @formatter:off
    return IntegrationFlows
            .from(
                    amazonS3InboundSynchronizationMessageSource(),
                    e -> e.poller(p -> p.trigger(this::nextExecutionTime))
            )
            .transform(unzipTransformer())
            .split(new FileSplitter(), null)
            .publishSubscribeChannel(new SimpleAsyncTaskExecutor(), p -> p.subscribe(persistenceSubFlow()))
            .get();
    // @formatter:on
}

@Bean
public IntegrationFlow persistenceSubFlow() {
    JsonObjectMapper<?, ?> jsonObjectMapper = new Jackson2JsonObjectMapper(objectMapper);
    ReleaseStrategy releaseStrategy = new TimeoutCountSequenceSizeReleaseStrategy(persistenceBatchSize,
            persistenceBatchReleaseTimeoutMillis);
    // @formatter:off
    return f -> f
            .transform(Transformers.fromJson(persistentType(), jsonObjectMapper))
            // @see http://docs.spring.io/spring-integration/reference/html/messaging-routing-chapter.html#agg-and-group-to
            .aggregate(
                    a -> a
                        .releaseStrategy(releaseStrategy)
                        .correlationStrategy(m -> m.getHeaders().get("id"))
                        .expireGroupsUponCompletion(true)
                        .sendPartialResultOnExpiry(true)
                        , null
            )
            .handle(jdbcRepositoryHandler());
    // @formatter:on
}

@Bean
public JdbcRepositoryHandler jdbcRepositoryHandler() {
    return new JdbcRepositoryHandler(repositoryType(), beanFactory);
}

protected class JdbcRepositoryHandler extends AbstractMessageHandler {

    @SuppressWarnings("rawtypes")
    private Insertable repository;

    public JdbcRepositoryHandler(Class<?> repositoryClass, ListableBeanFactory beanFactory) {
        repository = (Insertable<?>) beanFactory.getBean(repositoryClass);
    }

    @Override
    protected void handleMessageInternal(Message<?> message) {
        repository.insert((List<?>) message.getPayload());
    }

}

protected class FileToInputStreamTransformer extends AbstractFilePayloadTransformer<InputStream> {

    @Override
    protected InputStream transformFile(File payload) throws Exception {
        return new GZIPInputStream(new FileInputStream(payload));
    }
}

}

关于java - 建议使用 Spring Integration Java DSL 进行失败转换的正确方法是什么,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/32442271/

相关文章:

java - 格式化位置 - java

java - 使用反射获取方法中的参数名称

java - 字符串不变性问题

java - Spring Boot JSR-303/349 配置

java - Spring 4+ hibernate 4 : Could not obtain transaction-synchronized Session for current thread

java - 使用 jTransforms 的 png DCT2

matrix - 在Mathematica中将坐标系转换为矩阵

java - java版本不匹配

java - java.time 的 Spring DateTimeFormat 配置

internet-explorer - 在 IE 中重现 CSS3 webkit 转换和转换