我的项目中存在效率问题,该项目使用 Camel 和 Esper component 。
我有几个外部数据源向 Camel 端点提供信息。每个接收数据的 Camel 端点都会将其传输到处理该数据的路由,然后将其传递到 Esper 端点。
下图说明了这种行为:
效率问题在于所有这些都是由单个Java线程完成的。因此,如果我有很多来源,就会出现巨大的瓶颈。
以下代码准确地说明了图像的情况:
public final void configure() throws OperationNotSupportedException{
RouteDefinition route = from("xmpp://localhost:5222/?blablabla...");
// apply some filter
FilterDefinition filterDefinition = route.filter().method(...);
// apply main processor
ExpressionNode expressionNode = filterDefinition.process(...);
// set destination
expressionNode = filterDefinition.to("esper://session_X");
}
要解决这个问题,我必须使用线程池或使用某种并行处理来处理这种情况。我不能使用多播、收件人列表等模式,因为所有这些都会发送向多个端点/客户端发送相同的消息,但我的示例中并非如此。
一个可能的解决方案是每个“数据源端点 -> 路由 -> Esper 端点”组合有 1 个线程,如下图所示:
另一种可能的解决方案是让 1 个线程从数据源接收所有内容,然后将其分派(dispatch)给与另一个端点一起处理路由处理的多个线程:
PS:我愿意接受您可能提出的任何其他可能的建议。
为了实现其中之一,我考虑使用 Camel SEDA component然而,这个组件似乎不允许我拥有动态线程池,因为 concurrentConsumers
属性是静态的。此外,我不确定是否可以使用 SEDA 端点,因为我相信(尽管我不完全确定)端点的语法 .to("seda:esper://session_X?concurrentConsumers =10")
对于 Camel 无效。
所以,此时我很迷茫,不知道该怎么办: - SEDA 是我正在寻找的解决方案吗? - 如果是,鉴于语法问题,如何将其与 Esper 端点集成? - 有没有其他解决方案/Camel 组件可以解决我的问题?
最佳答案
您必须定义一个单独的 seda
路由,将消息分发到 esper 引擎,例如(使用流畅样式):
public final void configure() throws OperationNotSupportedException{
from("xmpp://localhost:5222/?blablabla...")
.filter().method(...)
.process(...)
.to("seda:sub");
from("seda:sub?concurrentConsumers=10)
.to("esper://session_X");
}
也就是说,只有在丢失消息不是问题的情况下才应使用seda
。否则,您应该使用更强大的协议(protocol),例如允许保留消息的jms
。
编辑:
除了 seda
之外,您还可以使用 threads()
,其中您可以通过定义 ExecutorService
来自定义线程行为:
public final void configure() throws OperationNotSupportedException{
from("xmpp://localhost:5222/?blablabla...")
.filter().method(...)
.process(...)
.threads()
.executorService(Executors.newFixedThreadPool(2))
.to("esper://session_X");
}
如果您使用seda
或threads()
,在发生故障时可能会失去事务安全性。对于这种情况或者如果您需要平衡多个远程主机的工作负载,您可以使用jms
。有关此解决方案的更多信息,请参阅 here .
关于java - 如何利用Camel提高效率?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/21973917/