我需要从队列中读取消息并调用 REST 服务进行发布。
然后,我正在研究Camel的JMS事务,看起来你可以设置一个maximumRedeliveries
来再次处理队列消息,所以在失败的情况下回滚事务,我是想知道如果在同一个 Camel route 我们必须调用 REST 服务来发布某些内容,那么该部分如何回滚?
最大交付配置:
errorHandler(new TransactionErrorHandlerBuilder()
.loggingLevel(LoggingLevel.ERROR)
.useOriginalMessage()
.maximumRedeliveries(2)
.logHandled(false)
.logExhausted(true)
);
路线的伪代码:
//Reading message from the queue
from("activemq:AMQ.App.EMC2.In.PMQueue?jmsMessageType=Bytes").
transacted().
unmarshal(jaxbDataFormat).bean(pmMessageEnricher).
to("direct:start-post");
//Then doing the post
from("direct:start-post").
setHeader(Exchange.HTTP_METHOD, constant("POST")).
setHeader(Exchange.CONTENT_TYPE, constant("application/json")).
setBody(constant(pmMessageEnricher.toJson())).
to("http://xxx").
to("direct:start-somethingelse");
//Then doing something else
from("direct:start-somethingelse").
blabla...
假设在 start-somethingelse
中发生异常,如何回滚 REST post 调用?因为我们以无状态的方式调用外部服务。
最佳答案
你的怀疑是正确的。在 JMS 事务回滚的情况下,POST 请求无法回滚,因为服务提供者不是 JMS 事务的一部分。该事务仅在 JMS 代理和 Camel JMS 使用者之间进行(另请参阅 Camel transactional client )。
但是,如果您发现处理错误,则可以应用所需的补偿逻辑。例如,通过另一个请求删除已发布的数据。
顺便说一句:不要混淆 Camel 重新交付和经纪人重新交付!
Camel 归还由 Camel Errorhandler 完成(不是经纪人)。在您的示例中,它最多执行 2 次重新交付。但请注意,Camel 重新交付不会重新处理整个路线,而只是重新处理发生故障的处理器。
因此,如果 to("http://xxx")
失败并且 Camel Errorhandler 重新交付,Camel 仅重试 to("http://xxx")
.
相反,如果您的 JMS 事务回滚,代理会将消息重新传递给 Camel,并再次处理整个路由。
请注意不要使用 Camel Errorhandler“掩盖”JMS 重新交付。
关于java - 带有 JMS 事务和 REST 后调用的 Camel,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/53391511/