我尝试处理 Spring 集成流程中发生的异常。
流程如下:
源 -> 分割 -> 句柄 -> 句柄
我的来源给出了一个对象列表作为有效负载。 拆分一次发出一个元素。
第一个处理程序遇到异常。 我预计,由于异常正在发布到下面示例中配置的错误 channel ,因此列表中的其他元素将继续发出。 但在第一个异常之后,流程停止了! 是否有我缺少的配置?
@Bean
public IntegrationFlow pubSubFlow(PublishSubscribeChannel publishSubscribeChannel,
@Qualifier("myMessagePublishingErrorHandler") MessagePublishingErrorHandler messagePublishingErrorHandler) {
return flow -> flow
.channel(publishSubscribeChannel)
.publishSubscribeChannel(config -> config
.subscribe(f1 -> f1
.split()
.handle("action", "act")
.handle(m1 -> System.out.println(">>>" + m1)))
.subscribe(f1 -> f1
.split()
.handle(m1 -> System.out.println("<<<" + m1)))
.errorHandler(messagePublishingErrorHandler));
}
错误处理程序:
@Bean
public MessagePublishingErrorHandler myMessagePublishingErrorHandler(@Qualifier("appErrorChannel") DirectChannel directChannel) {
MessagePublishingErrorHandler messagePublishingErrorHandler = new MessagePublishingErrorHandler();
messagePublishingErrorHandler.setDefaultErrorChannel(directChannel);
return messagePublishingErrorHandler;
}
@Bean
public DirectChannel appErrorChannel() {
return new DirectChannel();
}
@Bean
public IntegrationFlow errorFlow(@Qualifier("appErrorChannel") DirectChannel directChannel) {
return IntegrationFlows.from(directChannel).handle(System.out::println).get();
}
最佳答案
我对您的 pubSubFlow
代码进行了一些重构,以修复错误并使其更具可读性。请考虑在将来尊重我们帮助您使问题尽可能清晰的努力。
所以,您的代码不反射(reflect)描述。您在每个发布-订阅分支中都有 .split()
,而不是像您希望的那样在主流中。
您在 publishSubscribeChannel
上有一个 messagePublishingErrorHandler
,因此,任何下游错误确实都会在那里处理,但这是针对发送到该子流的消息完成的。由于拆分器是该子流的一部分(对于一条传入消息),它肯定会停止工作,因为它刚刚冒泡了一个错误。
请重新考虑您想要从流程中获得什么。如果您只需要拆分一次然后发布-订阅,则将 .split()
移至 之前 publishSubscribeChannel()
。
但是请注意,如果配置了 Executor
,messagePublishingErrorHandler
将在 PublishSubscribeChannel
上工作。
无论如何,总有办法在拆分器之后放置一个 ExecutorChannel
来并行处理项目,并且不会因错误而影响主拆分循环。
关于java - 异常后未发出 Spring 集成消息,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/49632773/