java - Apache Camel : How do I use Aggregation to get the actual old exchange

标签 java apache-camel

大家好,

我目前有一个用例,我需要调用 route 的端点,但之后继续使用原始消息。
我发现这样做的方法是使用 multicast() ,然后使用 AggregationStrategy AggregationStrategies.useOriginal(),即 UseOriginalAggregationStrategy()
需要明确的是,我没有将消息发送到多个端点,只是使用 multicast() 将数据“重置”到之前的状态

现在我遇到了一个问题,我必须将一些信息从 multicast() 内部传播到外部路由并稍后使用它(下面的示例)。
它尝试使用更新的 UseOriginalAggregationStrategy() 来实现此目的,其中我仅“重置”Message,但保留 Exchange 不变,因此我可以更新属性,并且在 multicast()

之后仍然保留它们

聚合策略:

public class UseOriginalMessageAggregationStrategy implements AggregationStrategy {

  @Override
  public Exchange aggregate(Exchange oldExchange, Exchange newExchange) {
    if (oldExchange != null && oldExchange.getIn() != null) {
      newExchange.setIn(oldExchange.getIn());
    }
    return newExchange;
  }
}

单元测试:

@Test
public void test() throws Exception {
  camelContext.addRoutes(new RouteBuilder() {
    @Override
    public void configure() throws Exception {
      from("direct:test")
          .multicast(new UseOriginalMessageAggregationStrategy())
            .process(exchange -> { // This could also be a 'to()', it doesn't make a difference
              exchange.setProperty("c", 1);
              exchange.getIn().setHeader("d", 1);
            })
          .end()
          .log("Property a: ${exchangeProperty.a}")
          .log("Header b: ${header.b}")
          .log("Property c: ${exchangeProperty.c}")
          .log("Header d: ${header.d}");
    }
  });

  Exchange exchange = new ExchangeBuilder(camelContext)
      .withProperty("a", 1)
      .withHeader("b", 1)
      .build();
  camelContext.createProducerTemplate().send("direct:test", exchange);
}

现在我期望的是这个日志:

Property a: 1
Header b: 1
Property c: 1
Header d:

但实际发生的是:

Property a: 1
Header b: 1
Property c: 1
Header d: 1

请注意,“Header d”仍然被设置,即使它属于 multicast() 内的消息,它现在应该已经被覆盖了。

经过一些调试后,我注意到,发生这种情况是因为 AggregationStrategy 中的“oldExchange”实际上为 null,此时日志有意义,因为它正是被记录的“newExchange”。

现在我变得非常困惑。
即使 'oldExchange' 为 null,这意味着我们无法访问 multicast() 之前的 Exchange,UseOriginalAggregationStrategy() 仍然能够获取正是交换。

这是上面的 UnitTest 示例:

@Test
public void test() throws Exception {
  camelContext.addRoutes(new RouteBuilder() {
    @Override
    public void configure() throws Exception {

      from("direct:test")
          .multicast(AggregationStrategies.useOriginal()) // <- Using the camel AggregationStrategy now
            .process(exchange -> {
              exchange.setProperty("c", 1);
              exchange.getIn().setHeader("d", 1);
            })
          .end()
          .log("Property a: ${exchangeProperty.a}")
          .log("Header b: ${header.b}")
          .log("Property c: ${exchangeProperty.c}")
          .log("Header d: ${header.d}");
    }
  });

  Exchange exchange = new ExchangeBuilder(camelContext)
      .withProperty("a", 1)
      .withHeader("b", 1)
      .build();
  camelContext.createProducerTemplate().send("direct:test", exchange);
}

此日志(如您所料):

Property a: 1
Header b: 1
Property c:
Header d:

显然它只是通过返回 null 来实现这一点。
因此,当我在 AggregationStrategy 中返回 null 时,我得到了实际的初始 Exchange,但无法直接在 AggregationStrategy 中访问该 Exchange?
这对我来说有点奇怪,所以我猜我遗漏了一些东西。

最诚挚的问候
克里斯

长话短说: 如何将 Exchange 发送到另一个端点,并将消息重置为之前的状态,同时保留在该另一个端点中设置的 ExchangeProperties

最佳答案

您的目标是通过附加属性来丰富您的原始负载。 企业集成模式明智的丰富模式比多播更适合。 https://camel.apache.org/components/latest/eips/content-enricher.html以及该页面中的ExampleAggregationStrategy 类。

public class ExampleAggregationStrategy implements AggregationStrategy {

    public Exchange aggregate(Exchange original, Exchange resource) {

        Object resourceResponse = resource.getIn().getBody();
        // retrieve stuff you want from resource exchange and add them as properties to to original
        return original;
    }
}

关于java - Apache Camel : How do I use Aggregation to get the actual old exchange,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/62385400/

相关文章:

xpath - 带有节点列表的 Apache Camel XPath

java - Camel 按长度而不是按标记拆分 InputStream

java.lang.ClassCastException : ConditionalOnJava$JavaVersion cant be cast to org. springframework.boot.system.JavaVersion 错误

java - Apache-Camel 控制台输入

java - Apache Camel 中 Seda 的并发消费者

java - TreeMap 还是 HashMap ?

java - 在 apache camel 中找不到带有方案 http 错误的组件

java - 如何使用 Java 存储数据以在图表上显示?

java - 如何使用 Firebase 数据填充 Spinner?

java - JFrame 将位置设置在 MAC OSX 窗口的中心