我有这样的集成流程:
@Bean
public IntegrationFlow inboundRequestFlow()
{
return IntegrationFlows.from( inboundRequestGateway() )
.log( ... )
.filter( requestValidator,
spec -> spec.discardChannel( invalidRequestChannel() ) )
.bridge( spec -> spec.requiresReply( false ) )
.channel( asyncFlowChannel )
.wireTap(
//sends a NO_CONTENT reply if the request is ok
flow -> flow.enrichHeaders( spec -> spec.header( HttpHeaders.STATUS_CODE, HttpStatus.NO_CONTENT ).defaultOverwrite( true ) )
.transform( payload -> "" )
.channel( inboundGatewayReplyChannel() )
).get();
}
它在 http 网关上接收请求,对其进行验证,如果一切正常,则将请求发送到“asyncFlowChannel”并使用 204 回复入站网关。
“asyncFlowChannel”是在执行程序 channel 上运行的另一个 IntegrationFlow 的起点:
@Bean
public IntegrationFlow outboundFlow()
{
return IntegrationFlows.from( asyncFlowChannel)
.log( ... )
.transform( ... )
.transform( ... )
.split(... )
.resequence( ... )
.enrichHeaders( ... )
.log( ... )
.transform( ... )
.handle( this.outboundSOAPGateway() )
.log( .. )
.handle( ... )
.bridge( spec -> spec.requiresReply( false ) )
.channel( anotherAsyncFlowChannel )
.get();
}
如果我的 outboundGateway 发生异常(由于网络相关的 IO 错误或错误响应),我想记录该错误并采取适当的措施。但我无法在 outboundSOAPGateway 上设置错误 channel ,并且启动流上的 inboundRequestGateway 已收到回复。
我得到的错误的唯一线索是这个日志:
10:19:53.002 WARN [outbound-flow-0] org.springframework.messaging.core.GenericMessagingTemplate$TemporaryReplyChannel - Reply message received but the receiving thread has already received a reply:ErrorMessage [payload=..., headers=...]
我的问题是:在异步流中处理出站网关上的错误(其中启动流的入站网关已收到其回复)的正确方法是什么?
最佳答案
任何 MessageHandler
端点都可以通过 AbstractRequestHandlerAdvice
提供。其中之一是ExpressionEvaluatingRequestHandlerAdvice
,您可以在其中捕获异常并将其发送到failureChannel
:https://docs.spring.io/spring-integration/docs/5.0.0.RELEASE/reference/html/messaging-endpoints-chapter.html#expression-advice .
为此目的,.handle( this.outboundSOAPGateway() )
可以与第二个参数一起提供,例如:
.handle((GenericHandler<?>) (p, h) -> {
throw new RuntimeException("intentional");
}, e -> e.advice(retryAdvice()))
在本例中我使用
@Bean
public RequestHandlerRetryAdvice retryAdvice() {
RequestHandlerRetryAdvice requestHandlerRetryAdvice = new RequestHandlerRetryAdvice();
requestHandlerRetryAdvice.setRecoveryCallback(new ErrorMessageSendingRecoverer(recoveryChannel()));
return requestHandlerRetryAdvice;
}
但这同样适用于 ExpressionEvaluatingRequestHandlerAdvice
。
顺便说一句,retryAdvice()
也可能对您有用。请参阅其 ErrorMessageSendingRecoverer
。
关于java - 异步流中出站网关的错误处理,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/48061376/