所以设置如下:
<tx:advice id="txAdvice2" transaction-manager="dataSourceTransactionManager">
<tx:attributes>
<tx:method name="*" rollback-for="Throwable" no-rollback-for="ListenerExecutionFailedException"/>
</tx:attributes>
</tx:advice>
<int-amqp:inbound-channel-adapter channel="input-channel" queue-names="probni" message-converter="jsonMessageConverter"
channel-transacted="true"
advice-chain="txAdvice2" />
<int:chain input-channel="input-channel" output-channel="output-channel">
<int:service-activator ref="h1Handler" method="handle" />
<int:service-activator ref="h2Handler" method="handle" />
<int:service-activator ref="h3Handler" method="handle" />
<int:splitter />
</int:chain>
<int-amqp:outbound-channel-adapter channel="output-channel" exchange-name="outputit" amqp-template="rabbitTemplate" />
如果在此线程执行期间(因为所有此链 amqpIN-process-amqpOUT 都应在单线程中执行)我抛出 ListenerExecutionFailedException,dataSourceTransactionManager 将执行提交,但 amqp 也会重新排队消息,因为传播了异常。
在这种情况下,我怎样才能告诉rabbit成功确认消息?
此外,我发现我必须放入 no-rollback-for 属性实际异常类,因为我的内部异常仅存储在“cause”属性中,而 RuleBasedTransactionAttribute 不会检查该属性。
还有一件事,如果我像这样进行配置:
<int-amqp:inbound-channel-adapter channel="input-channel" queue-names="probni" message-converter="jsonMessageConverter"
channel-transacted="true"
transaction-manager="dataSourceTransactionManager"
transaction-attribute="transactionAttribute" />
transactionAttribute(RuleBasedTransactionAttribute)根本不被考虑,即使我正确设置了 no-rollback-for,dataSourceTransactionManager 也总是回滚。
谢谢!
最佳答案
您可以将自定义 ErrorHandler
添加到监听器容器(您必须在外部配置容器并在 )。container
属性中提供引用
默认错误处理程序是一个带有 DefaultExceptionStrategy
的 ConditionalRejectingErrorHandler
,它将某些 LEFE 原因异常视为致命的:
private boolean isCauseFatal(Throwable cause) {
return cause instanceof MessageConversionException
|| cause instanceof org.springframework.messaging.converter.MessageConversionException
|| cause instanceof MethodArgumentNotValidException
|| cause instanceof MethodArgumentTypeMismatchException
|| cause instanceof NoSuchMethodException
|| cause instanceof ClassCastException
|| isUserCauseFatal(cause);
}
从版本 1.6.4 开始,您可以对默认的 DefaultExceptionStrategy
进行子类化,并将您的原因添加到 isUserCauseFatal()
。
在 1.6.4 之前,您必须提供自己的 FatalExceptionStrategy
(或错误处理程序实现)。
对于致命原因,处理程序会抛出 AmqpRejectAndDontRequeueException
,告诉容器 nack(而不是重新排队)消息。
编辑
顺便说一下,你不需要包装异常,容器会为你做这件事......
protected Exception wrapToListenerExecutionFailedExceptionIfNeeded(Exception e, Message message) {
if (!(e instanceof ListenerExecutionFailedException)) {
// Wrap exception to ListenerExecutionFailedException.
return new ListenerExecutionFailedException("Listener threw exception", e, message);
}
return e;
}
编辑2
我的错误,可以使用error-handler
属性指定ErrorHandler
。
编辑3
或者,只需抛出一个AmqpRejectAndDontRequeueException
(它将被包装在LEFE
中)。
关于单事务中的 Spring 集成和 JDBC,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/41023132/