java - Apache Camel - 拆分和聚合 - 旧交换始终为空

标签 java spring apache-camel

我看到这个问题已被问过很多次,但没有一个帖子有帮助或有一个决定性的解决方案。我正在拆分一条消息,然后使用 Aggregator2 将其聚合。代码抛出异常,因为 oldExchange 始终为空。所以为了测试我设计了一个小代码。

我读了一个 orders.xml 文件,看起来像这样

<Orders xmlns="http://some/schema/Order">
    <Order>
            <orderNum>1</orderNum>
    </Order>
    <Order>
            <orderNum>2</orderNum>
    </Order>
    <Order>
            <orderNum>3</orderNum>
    </Order>
    <Order>
            <orderNum>5</orderNum>
    </Order>
    <Order>
            <orderNum>6</orderNum>
    </Order>

我的 Camel 上下文看起来像这样

<camel:route>
<camel:from uri="file:src/data/catask/test?noop=true"/>
<camel:log message="${body}"></camel:log>
<camel:split>
<camel:xpath>//te:Orders/*</camel:xpath>
<camel:to uri="direct:logQueries"/>
<camel:to uri="direct:aggegateQueries"/>  
</camel:split>

</camel:route>

<camel:route>
<camel:from uri="direct:logQueries"/>
<camel:log message="After the call : \n ${body}"></camel:log>  
</camel:route>

 <camel:route>
<camel:from uri="direct:aggegateQueries"/>
<camel:aggregate strategyRef="aggrTask" completionInterval="8000" >
<camel:correlationExpression>
<camel:xpath>//te:Order</camel:xpath>
</camel:correlationExpression>
<camel:to uri="file:src/data/catask/output?fileName=output.xml"/>  

</camel:aggregate>
</camel:route>  

我的聚合策略类看起来像这样

   public Exchange aggregate(Exchange oldExchange, Exchange newExchange) { 
            if (oldExchange == null) { 
            System.out.println("Returning new exchange"); 
                return newExchange; 
            } 

            String oldBody = oldExchange.getIn().getBody(String.class); 
            String newBody = newExchange.getIn().getBody(String.class); 
            oldExchange.getIn().setBody(oldBody + "+" + newBody); 
            return oldExchange; 
        } 

问题是,当聚合结果保存在 output.xml 文件中时,它只包含它从 Orders.xml 中读取的最后一条记录。

<Order xmlns="http://some/schema/Order">
            <orderNum>6</orderNum>
    </Order>

我进一步调查并发现这是因为在第一次调用之后 oldExchange 应该有一些值但事实证明它总是空的。我认为因为它是从一个文件中读取所有内容并将其拆分,所以只有交换。

>有什么建议吗??

更新 1: Per Claus 我只能使用 Splitter 来解决这个问题。我这样做了,并且能够成功加入所有消息。但是我仍在寻找一种使用 Aggregator2 的方法。这是我如何仅使用 Splitter 完成的。

camel:route>
<camel:from uri="file:src/data/catask/test?noop=true"/>
<camel:log message="${body}"></camel:log>
<camel:split strategyRef="aggrTask"> 
<camel:xpath>//te:Orders/*</camel:xpath>
<camel:to uri="direct:logQueries"/>
 < 
</camel:split>

</camel:route>

<camel:route>
<camel:from uri="direct:logQueries"/>
<camel:log message="After the call : \n ${body}"></camel:log>  
</camel:route>

最佳答案

我想我想通了如何使用 Aggregator 来聚合消息。我添加了一个名为 id 的 headerName 并将其用作我的相关 ID。

<camel:route>
  <camel:from uri="file:src/data/catask/test?noop=true"/>
  <camel:log message="${body}"></camel:log>
  <camel:split>
    <camel:xpath>//te:Orders/*</camel:xpath>
    <camel:to uri="direct:addHeaders"/>
    <camel:to uri="direct:aggegateQueries"/>
  </camel:split>
</camel:route>

<camel:route>
  <camel:from uri="direct:addHeaders"/>
  <camel:setHeader headerName="id">
    <camel:constant>order</camel:constant>
  </camel:setHeader>
</camel:route>

<camel:route>
  <camel:from uri="direct:aggegateQueries"/>
  <camel:aggregate strategyRef="aggrTask" completionInterval="8000" >
    <camel:correlationExpression>
      <simple>header.id</simple>
    </camel:correlationExpression>
    <camel:to uri="file:src/data/catask/output?fileName=output.xml"/>
    <camel:log message="MERGED:: /n ${body}"></camel:log>
  </camel:aggregate>
</camel:route>  

这会汇总我的消息。但是我仍然不确定尽管使用了正确的 XPATH 为什么 Camel 认为它是不同类型的消息?

从 camel 论坛复制 CLAUS 的解释: “看起来你的相关表达式是一个新组 每条消息,例如每个 xpath 结果都是不同的。 如果您想拆分和加入相同的消息,请参阅此 eip http://camel.apache.org/composed-message-processor.html 并查看仅使用拆分器的示例。这更容易做到。 "

我使用 Xpath Evaluator 工具测试了 Xpath 表达式,还打印出了相关表达式结果,并且我所有带有//Order 的消息都相同。前-

Group 1: 
<Order>  
  <orderNum>1</orderNum>  
</Order>  

Group 2: 
<Order>  
  <orderNum>2</orderNum>  
</Order> 

关于java - Apache Camel - 拆分和聚合 - 旧交换始终为空,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/21294160/

相关文章:

java - Spring Integration 是否有 JSR-303 验证过滤器实现?

java - 使用 Apache Camel 访问 Azure 服务总线?

java - Camel从FTP读取文件并存储在本地资源文件夹中

java - 如何解决java中的开放重定向问题

java - 如何将日期参数分配给当前时区的 hibernate 查询?

java - 在 Service 类中获取 Spring ThreadPoolExecutor

java - 如何执行我的 java 桌面应用程序的单个实例?

java - 提交表单后返回上一页(Spring/Hibernate)

java - Spring Boot 延迟 Flyway 初始化循环依赖

java - 如何设置 apache Camel Groovy 脚本组件的属性