我有一个 Camel 路由,它从队列中取出一条消息,将它发送到一个 bean 进行处理,然后将消息排回另一个队列。
我正在尝试消除第二个队列中的“重复消息”。 Camel 是否有任何端点、处理器、EIP 等,我可以配置这些端点、处理器、EIP 等,以便在消息被发送到第二个队列之前在途中对消息进行重复数据删除?
例子:
<route id="myRoute">
<from uri="{{queue-1-uri}}" />
<to uri="bean:myBean?method=process" />
<!-- How to dedupe right here??? -->
<to uri="{{queue-2-uri}}" />
</route>
更新:可能是这样的:
<route id="myRoute">
<from uri="{{queue-1-uri}}" />
<to uri="bean:myBean?method=process" />
<filter>
<method>what goes here???</method>
<to uri="{{queue-2-uri}}" />
</filter>
</route>
根据 Ralf 的建议,我可以在 <method></method>
中引用一个 bean。然后使用缓存将消息保存在内存中。
假设这个新 bean 被称为 FilterBean
它有一个 dedupe()
它的方法:如何在 Spring XML 中连接它,bean 需要实现哪些类/接口(interface)才能从路由内部调用?
最佳答案
我认为您正在寻找 Camel 提供的 Idempotent Consumer
。根据您的需要,有不同的方法,例如内存、JDBC
、Hazelcast
...我不确定它是否适合您,因为您使用的是 bean
在消费者之后,但值得一试。 Camel网站的简单示例如下:
<!-- repository for the idempotent consumer -->
<bean id="myRepo" class="org.apache.camel.processor.idempotent.MemoryIdempotentRepository"/>
<camelContext xmlns="http://camel.apache.org/schema/spring">
<route>
<from uri="direct:start"/>
<idempotentConsumer messageIdRepositoryRef="myRepo">
<!-- use the messageId header as key for identifying duplicate messages -->
<header>messageId</header>
<!-- if not a duplicate send it to this mock endpoint -->
<to uri="mock:result"/>
</idempotentConsumer>
</route>
</camelContext>
您可以在此处找到更多信息:Idempotent Consumer
关于java - Camel EIP 过滤重复项,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/21761470/