我想知道以下是否是 Camel 幂等消费者的预期行为:
我的路由有removeOnFailure=true,这意味着基本上当交换失败时幂等消费者应该从存储库中删除标识符。这带来了一个非常有趣的场景,允许在交易所上进行重复。
假设我有标识符=12345,并且第一次尝试执行交换是成功的,这意味着标识符已添加到幂等存储库中。下次尝试使用相同的标识符(即 12345)失败,因为它被捕获为重复消息(CamelDuplicateMessage)。但此时,removeOnFailure=true 将从存储库中删除标识符,下次尝试将允许交换成功完成,而不会捕获默认消息。因此,在交易所上创造了复制的空间。
有人可以建议这是预期行为还是某些错误吗?
示例路线:
from("direct:Route-DeDupeCheck").routeId("Route-DeDupeCheck")
.log(LoggingLevel.DEBUG, "~~~~~~~ Reached to Route-DeDupeCheck: ${property.xref}")
.idempotentConsumer(simple("${property.xref}"), MemoryIdempotentRepository.memoryIdempotentRepository()) //TODO: To replace with Redis DB for caching
.removeOnFailure(true)
.skipDuplicate(false)
.filter(exchangeProperty(Exchange.DUPLICATE_MESSAGE).isEqualTo(true))
.log("~~~~~~~ Duplicate Message Found!")
.to("amq:queue:{{jms.duplicateQueue}}?exchangePattern=InOnly") //TODO: To send this to Duplicate JMS Queue
.throwException(new AZBizException("409", "Duplicate Message!"));
最佳答案
你的基本前提是错误的。
Next attempt to use same identifier i.e 12345 fails as this is caught as Duplicate Message (CamelDuplicateMessage)
当出现重复消息时,不视为失败。它只是在进一步处理中被忽略(除非您将 skipDuplicate
选项设置为 true)。
因此,您刚才解释的情况无论如何都不会发生。
测试起来非常容易。考虑到你有这样的路线,
public void configure() throws Exception {
//getContext().setTracing(true); Use this to enable tracing
from("direct:abc")
.idempotentConsumer(header("myid"),
MemoryIdempotentRepository.memoryIdempotentRepository(200))
.removeOnFailure(true)
.log("Recieved id : ${header.myid}");
}
}
还有这样的制片人
@EndpointInject(uri = "direct:abc")
ProducerTemplate producerTemplate;
for(int i=0, i<5,i++) {
producerTemplate.sendBodyAndHeader("somebody","myid", "1");
}
您在日志中看到的是
INFO 18768 --- [tp1402599109-31] route1 : Recieved id : 1
而且就一次。
关于apache-camel - Camel 幂等消费者对于removeOnFailure=true 的错误行为,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/49173395/