java - 错误消息未传递 - 失败消息为空?

标签 java spring spring-integration

我有一个带有轮询器 cron 的 Spring 集成流程,其目标是初始化目录和 mongodb 集合。如果初始化程序流程抛出 RuntimeException,框架将向 errorChannel 发送一条消息。错误流负责消息,但出现了问题:

2017-04-07 06:25:00.484 |  |  |  | [taskScheduler-8] | DEBUG | com.objectway.bacco.integration.flow.InitializerIntegrationFlowConfiguration | cleanTmp | INITIALIZATION - Directory bacco created
2017-04-07 06:25:04.857 |  |  |  | [taskScheduler-9] | ERROR | org.springframework.integration.handler.LoggingHandler | handleMessageInternal | org.springframework.messaging.MessagingException: The path [/tmp/bacco/bank_ftp/00082] does not denote a properly accessible directory.
    at org.springframework.integration.file.DefaultDirectoryScanner.listFiles(DefaultDirectoryScanner.java:83)
    at org.springframework.integration.file.FileReadingMessageSource.scanInputDirectory(FileReadingMessageSource.java:293)
    at org.springframework.integration.file.FileReadingMessageSource.receive(FileReadingMessageSource.java:272)
    at org.springframework.integration.file.remote.synchronizer.AbstractInboundFileSynchronizingMessageSource.doReceive(AbstractInboundFileSynchronizingMessageSource.java:191)
    at org.springframework.integration.file.remote.synchronizer.AbstractInboundFileSynchronizingMessageSource.doReceive(AbstractInboundFileSynchronizingMessageSource.java:59)
    at org.springframework.integration.endpoint.AbstractMessageSource.receive(AbstractMessageSource.java:134)
    at org.springframework.integration.endpoint.SourcePollingChannelAdapter.receiveMessage(SourcePollingChannelAdapter.java:175)
    at org.springframework.integration.endpoint.AbstractPollingEndpoint.doPoll(AbstractPollingEndpoint.java:224)
    at org.springframework.integration.endpoint.AbstractPollingEndpoint.access$000(AbstractPollingEndpoint.java:57)
    at org.springframework.integration.endpoint.AbstractPollingEndpoint$1.call(AbstractPollingEndpoint.java:176)
    at org.springframework.integration.endpoint.AbstractPollingEndpoint$1.call(AbstractPollingEndpoint.java:173)
    at org.springframework.integration.endpoint.AbstractPollingEndpoint$Poller$1.run(AbstractPollingEndpoint.java:330)
    at org.springframework.integration.util.ErrorHandlingTaskExecutor$1.run(ErrorHandlingTaskExecutor.java:55)
    at org.springframework.core.task.SyncTaskExecutor.execute(SyncTaskExecutor.java:50)
    at org.springframework.integration.util.ErrorHandlingTaskExecutor.execute(ErrorHandlingTaskExecutor.java:51)
    at org.springframework.integration.endpoint.AbstractPollingEndpoint$Poller.run(AbstractPollingEndpoint.java:324)
    at org.springframework.scheduling.support.DelegatingErrorHandlingRunnable.run(DelegatingErrorHandlingRunnable.java:54)
    at org.springframework.scheduling.concurrent.ReschedulingRunnable.run(ReschedulingRunnable.java:81)
    at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
    at java.util.concurrent.FutureTask.run(FutureTask.java:266)
    at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:180)
    at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
    at java.lang.Thread.run(Thread.java:745)

2017-04-07 06:25:04.882 |  |  |  | [taskScheduler-9] | INFO | com.objectway.bacco.service.logging.logback.LogServiceLogback | info | null | null | null | ERROR - Processing error
2017-04-07 06:25:04.930 |  |  |  | [taskScheduler-9] | WARN | org.springframework.integration.channel.MessagePublishingErrorHandler | handleError | Error message was not delivered.
org.springframework.integration.transformer.MessageTransformationException: Failed to transform Message; nested exception is org.springframework.messaging.MessageHandlingException: nested exception is org.springframework.expression.spel.SpelEvaluationException: EL1007E:(pos 0): Property or field 'headers' cannot be found on null
    at org.springframework.integration.transformer.MessageTransformingHandler.handleRequestMessage(MessageTransformingHandler.java:95)
    at org.springframework.integration.handler.AbstractReplyProducingMessageHandler.handleMessageInternal(AbstractReplyProducingMessageHandler.java:99)
    at org.springframework.integration.handler.AbstractMessageHandler.handleMessage(AbstractMessageHandler.java:127)
    at org.springframework.integration.dispatcher.AbstractDispatcher.tryOptimizedDispatch(AbstractDispatcher.java:116)
    at org.springframework.integration.dispatcher.UnicastingDispatcher.doDispatch(UnicastingDispatcher.java:147)
    at org.springframework.integration.dispatcher.UnicastingDispatcher.dispatch(UnicastingDispatcher.java:120)
    at org.springframework.integration.channel.AbstractSubscribableChannel.doSend(AbstractSubscribableChannel.java:77)
    at org.springframework.integration.channel.AbstractMessageChannel.send(AbstractMessageChannel.java:442)
    at org.springframework.integration.channel.AbstractMessageChannel.send(AbstractMessageChannel.java:392)
    at org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:115)
    at org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:45)
    at org.springframework.messaging.core.AbstractMessageSendingTemplate.send(AbstractMessageSendingTemplate.java:105)
    at org.springframework.integration.handler.AbstractMessageProducingHandler.sendOutput(AbstractMessageProducingHandler.java:231)
    at org.springframework.integration.handler.AbstractMessageProducingHandler.produceOutput(AbstractMessageProducingHandler.java:154)
    at org.springframework.integration.handler.AbstractMessageProducingHandler.sendOutputs(AbstractMessageProducingHandler.java:102)
    at org.springframework.integration.handler.AbstractReplyProducingMessageHandler.handleMessageInternal(AbstractReplyProducingMessageHandler.java:105)
    at org.springframework.integration.handler.AbstractMessageHandler.handleMessage(AbstractMessageHandler.java:127)
    at org.springframework.integration.dispatcher.BroadcastingDispatcher.invokeHandler(BroadcastingDispatcher.java:227)
    at org.springframework.integration.dispatcher.BroadcastingDispatcher.dispatch(BroadcastingDispatcher.java:176)
    at org.springframework.integration.channel.AbstractSubscribableChannel.doSend(AbstractSubscribableChannel.java:77)
    at org.springframework.integration.channel.AbstractMessageChannel.send(AbstractMessageChannel.java:442)
    at org.springframework.integration.channel.MessagePublishingErrorHandler.handleError(MessagePublishingErrorHandler.java:85)
    at org.springframework.integration.util.ErrorHandlingTaskExecutor$1.run(ErrorHandlingTaskExecutor.java:58)
    at org.springframework.core.task.SyncTaskExecutor.execute(SyncTaskExecutor.java:50)
    at org.springframework.integration.util.ErrorHandlingTaskExecutor.execute(ErrorHandlingTaskExecutor.java:51)
    at org.springframework.integration.endpoint.AbstractPollingEndpoint$Poller.run(AbstractPollingEndpoint.java:324)
    at org.springframework.scheduling.support.DelegatingErrorHandlingRunnable.run(DelegatingErrorHandlingRunnable.java:54)
    at org.springframework.scheduling.concurrent.ReschedulingRunnable.run(ReschedulingRunnable.java:81)
    at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
    at java.util.concurrent.FutureTask.run(FutureTask.java:266)
    at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:180)
    at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
    at java.lang.Thread.run(Thread.java:745)
Caused by: org.springframework.messaging.MessageHandlingException: nested exception is org.springframework.expression.spel.SpelEvaluationException: EL1007E:(pos 0): Property or field 'headers' cannot be found on null
    at org.springframework.integration.handler.MethodInvokingMessageProcessor.processMessage(MethodInvokingMessageProcessor.java:96)
    at org.springframework.integration.transformer.AbstractMessageProcessingTransformer.transform(AbstractMessageProcessingTransformer.java:90)
    at org.springframework.integration.transformer.MessageTransformingHandler.handleRequestMessage(MessageTransformingHandler.java:89)
    ... 34 common frames omitted
Caused by: org.springframework.expression.spel.SpelEvaluationException: EL1007E:(pos 0): Property or field 'headers' cannot be found on null
    at org.springframework.expression.spel.ast.PropertyOrFieldReference.readProperty(PropertyOrFieldReference.java:220)
    at org.springframework.expression.spel.ast.PropertyOrFieldReference.getValueInternal(PropertyOrFieldReference.java:94)
    at org.springframework.expression.spel.ast.PropertyOrFieldReference.getValueInternal(PropertyOrFieldReference.java:81)
    at org.springframework.expression.spel.ast.CompoundExpression.getValueRef(CompoundExpression.java:51)
    at org.springframework.expression.spel.ast.CompoundExpression.getValueInternal(CompoundExpression.java:87)
    at org.springframework.expression.spel.ast.SpelNodeImpl.getValue(SpelNodeImpl.java:120)
    at org.springframework.expression.spel.standard.SpelExpression.getValue(SpelExpression.java:267)
    at com.objectway.bacco.integration.component.analytics.ProcessTransformer.evaluateExpression(ProcessTransformer.java:118)
    at com.objectway.bacco.integration.component.analytics.ProcessTransformer.retrieveValueByKey(ProcessTransformer.java:115)
    at com.objectway.bacco.integration.component.analytics.ProcessTransformer.transform(ProcessTransformer.java:57)
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.lang.reflect.Method.invoke(Method.java:498)
    at org.springframework.expression.spel.support.ReflectiveMethodExecutor.execute(ReflectiveMethodExecutor.java:113)
    at org.springframework.expression.spel.ast.MethodReference.getValueInternal(MethodReference.java:129)
    at org.springframework.expression.spel.ast.MethodReference.access$000(MethodReference.java:49)
    at org.springframework.expression.spel.ast.MethodReference$MethodValueRef.getValue(MethodReference.java:347)
    at org.springframework.expression.spel.ast.CompoundExpression.getValueInternal(CompoundExpression.java:88)
    at org.springframework.expression.spel.ast.SpelNodeImpl.getTypedValue(SpelNodeImpl.java:131)
    at org.springframework.expression.spel.standard.SpelExpression.getValue(SpelExpression.java:330)
    at org.springframework.integration.util.AbstractExpressionEvaluator.evaluateExpression(AbstractExpressionEvaluator.java:166)
    at org.springframework.integration.util.MessagingMethodInvokerHelper.processInternal(MessagingMethodInvokerHelper.java:317)
    at org.springframework.integration.util.MessagingMethodInvokerHelper.process(MessagingMethodInvokerHelper.java:155)
    at org.springframework.integration.handler.MethodInvokingMessageProcessor.processMessage(MethodInvokingMessageProcessor.java:93)

来自文档F.4 当计划轮询器任务执行中发生异常时,这些异常将被包装在 ErrorMessages 中并发送到 errorChannel。 ErrorMessages 是问题吗?我找到了 ErrorMessage 类,但没有找到 ErrorMessages。

谢谢

编辑: failedMessage 为空。怎么可能?

@Bean
public MessageSource<?> dateMessageSource() {
    MethodInvokingMessageSource source = new MethodInvokingMessageSource();
    source.setObject( new SystemTime() );
    source.setMethodName( "getTime" );
    return source;
}

@Bean
public IntegrationFlow initIntegrationFlow( ProcessTransformer processTransformer,
                                            MongoCleanerService mongoCleanerService,
                                            ControlComponentService controlComponentService,
                                            DirectoryService directoryService,
                                            @Value( "${output.directory}" ) String outputDirectory,
                                            @Qualifier( DAILY_POLLER ) PollerSpec pollerSpec ) {
    return IntegrationFlows.from( dateMessageSource(), c -> c.poller( pollerSpec ) )
            .enrichHeaders( headerEnricherSpec -> headerEnricherSpec.headerFunction( CORRELATION_ID, message -> "00000" )
                    .headerFunction( NETWORK, message -> "-ALL-" )
                    .headerFunction( GROUP, message -> "ALL" )
                    .headerFunction( DOMAIN, ( message ) -> ( INIT ) ) )
            .transform( new FlowLogger()
                    .level( INFO )
                    .expression( "'INITIALIZATION - Start Initializer, drop collections and temporary files'" )
                    .logService( logService ) )
            .transform( processTransformer )
            .handle( ( payload, headers ) -> {
                controlComponentService.manageComponent( "ftpExportFlow", "start" );
                mongoCleanerService.clean();
                directoryService.cleanTmp( outputDirectory );
                return new InitializationMessage();
            } )
            .channel( INITIALIZED_CHANNEL )
            .get();
}

@Bean
public IntegrationFlow endWithErrorFlowBean( ProcessRepository processRepository,
                                             AnalyticsService analyticsService,
                                             LogService logService ) {
    return IntegrationFlows
            .from( IntegrationContextUtils.ERROR_CHANNEL_BEAN_NAME )
            .transform( new FlowLogger().level( INFO ).expression( "'ERROR - Processing error'" )
                    .logService( logService ) )
            .transform( new ProcessTransformer()
                    .processRepository( processRepository )
                    .analyticsService( analyticsService )
                    .processStatusType( ProcessStatusType.PROCESS_WITH_ERROR ) )
            .transform( source -> {
                controlBus.sendCommand( "@ftpExportFlow.stop()" );
                return processRepository.findByProcessStatusType( ProcessStatusType.PROCESS_WITH_ERROR );
            } )
            .filter( processes -> ( ( List<Process> ) processes ).size() == 1 )
            .channel( REPORT_INPUT_CHANNEL )
            .get();
} 

最佳答案

失败发生在消息创建之前 - 因此是 null failedMessage

2017-04-07 06:25:04.857 | | | | [taskScheduler-9] | ERROR | org.springframework.integration.handler.LoggingHandler | handleMessageInternal | org.springframework.messaging.MessagingException: The path [/tmp/bacco/bank_ftp/00082] does not denote a properly accessible directory.

只有在创建消息后,您才会获得 failedMessage 属性 - 通常是在下游流中发生错误时。

关于java - 错误消息未传递 - 失败消息为空?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/43318551/

相关文章:

spring - 如何通过 @WithAnonymousUser 正确测试 LoginForm

spring - SimpMessagingTemplate 与 MessageSendingOperations

sts 中的 spring servlet-context

java - 创建java函数来重复添加对象

java - 我应该手动下载斯坦福 NLP "hello world"的所有模型/分类器吗?

Java 和 Spring - 轮询 http 端点,直到服务器完成处理

java - Spring Integration Java DSL .handle(String beanName, String methodName)

spring - SCS - 消费kafka中最后一条可用消息

java - 为什么引用 String str1 = str + "test "与 String str3 = "checktest"不同

java - 克隆和减法集 - 这行得通吗?