我有一个批处理路由,它使用文件夹中的 XML 文件。它过滤、转换并最终将分组文档保存到磁盘。由于这是一个批处理路由,我要求在对源文件夹进行一次轮询后将其关闭,这就是下面代码中 RouteTerminator 的用途。 (它使用 routeID
在 camelContext
上调用 stopRoute()
和 removeRoute()
。)
from("file:" + sourcePath)
.filter().xquery("//DateTime > xs:dateTime('2013-05-07T23:59:59')")
.filter().xquery("//DateTime < xs:dateTime('2013-05-09T00:00:00')")
.aggregate(constant(true))
.completionFromBatchConsumer()
.groupExchanges()
.to("xquery:" + xqueryPath)
.to("file:" + targetPath)
.process(new Processor() {
@Override
public void process(Exchange exchange) throws Exception {
new RouteTerminator(routeID, exchange.getContext()).start();
}
})
.end();
这会在单个文件收集后正确关闭路由,并且在 onException
中重复该过程后,它还会在引发异常时优雅地关闭路由。不幸的是,如果路由过滤掉每个 Exchange,它就永远不会到达处理器。相反,在过滤过程中交换会被丢弃,并且路由保持开放。
我想将过滤器移到aggregate
调用中,因为这可能会使路由持续到最后,但此方法不接受 XQuery 过滤器。 XPath 不是一个选项,因为它不支持日期时间比较。
在这种情况下如何强制整个路线停止?
最佳答案
I tried again and now have a solution where I call setHeader to set a Filtered header.
Unfortunately I can't seem to break out of the choice to use it as a simple switch/case so I have to route both the .when() and .otherwise() to the same second direct route.
In that route I then aggregate and call a basic merge bean that builds a Document out of every Exchange and adds it to a GenericFile if the header matches. It seems like there should be an easier way to simply set a header based on an xquery though...
关于java - 当所有消息都被过滤后停止路由,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/16440858/