java - Camel Advice不使用指定文件替换端点

标签 java apache-camel spring-camel

我对 Camel 还很陌生,尽管已经阅读了 Apache Camel 文档,但我仍然坚持认为这是一个我忽略的小问题。

我有一个 Spring Boot 应用程序,它定义了一个 Camel 路由,该路由通过 HTTP 调用消耗 csv 格式的实时价格,使用 Bindy 将 CSV 转换为 POJO (LivePrice),然后将数据保存到存储中。

这是路由定义:

@Component
public class LivePricesCSVRoute extends RouteBuilder {

    @Override
    public void configure() throws Exception {

        from("kafka:" + "{{kafka.topic.live.prices.csv}}" + "{{kafka.broker.location}}")
                .routeId("live.prices-persistence-route")
                .transacted()
                .unmarshal()
                .bindy(BindyType.Csv, LivePrice.class).id("convertToCsv")
                .process(exchange -> {
                    List<LivePrice> object = (List<LivePrice>) exchange.getIn().getBody();
                    object.remove(0); // omit the header
                    logger.info(object);
                })
        .bean("livePriceServiceImpl", "populateLivePrices").id("populateLivePrices");
    }
}

我想为此路线创建一个集成测试,其中我提供一个包含两行和一个标题的测试 csv 文件作为输入,而不是期望有关主题 kafka.topic.live.prices.csv 的消息。

Date,Symbol,Open,
2019-07-09,BTCUSD,12347.18
2019-07-08,BTCUSD,11475.07

我还想在持久化之前拦截 Exchange,并将其发送到端点 mock:output,我可以在其中执行断言。

这是我编写的测试:


@RunWith(CamelSpringBootRunner.class)
@SpringBootTest
@MockEndpoints
@UseAdviceWith
@DirtiesContext(classMode = DirtiesContext.ClassMode.AFTER_EACH_TEST_METHOD)
public class LivePricesPersistenceRouteTest {

    @Autowired
    CamelContext camelContext;

    @EndpointInject(uri = "mock:output")
    private MockEndpoint mockOutput;

    @Test
    public void testSendLivePricesCsvToTopic() throws Exception {

        camelContext.getRouteDefinition("live-prices-persistence-route")
                .adviceWith(camelContext, new AdviceWithRouteBuilder() {

                    @Override
                    public void configure() throws Exception {
                        replaceFromWith("file://testCsvFile.csv");                      
                        intercept()
                                .to("mock:output");
                    }
                });

      camelContext.start();

      Exchange exchange = mockOutput.assertExchangeReceived(0);
      List<LivePrice> livePrices = (List<LivePrice>)exchange.getIn().getBody();
      assertThat(livePrices.get(0).getDate(), is("2019-07-09"));        
      // TODO ADD MORE ASSERTIONS
      mockOutput.assertIsSatisfied();

    }
}

当我运行测试时,Camel 会记录以下内容:

2019-07-13 14:35:16.587  INFO 90356 --- [           main] org.apache.camel.model.RouteDefinition   : Adviced route before/after as XML:
<?xml version="1.0" encoding="UTF-8" standalone="yes"?>
<route xmlns="http://camel.apache.org/schema/spring" customId="true" id="live-prices-persistence-route">
    <from uri="kafka:{{kafka.topic.live.prices.csv}}{{kafka.broker.location}}"/>
    <transacted>
        <unmarshal customId="true" id="convertToCsv">
            <bindy type="Csv"/>
        </unmarshal>
        <process/>
        <bean customId="true" id="populateLivePrices" method="populateLivePrices" ref="livePriceServiceImpl"/>
    </transacted>
</route>

<?xml version="1.0" encoding="UTF-8" standalone="yes"?>
<route xmlns="http://camel.apache.org/schema/spring" customId="true" id="live-prices-persistence-route">
    <from uri="file://testCsvFile.csv"/>
    <intercept>
        <to uri="mock:output"/>
    </intercept>
    <transacted>
        <unmarshal customId="true" id="convertToCsv">
            <bindy type="Csv"/>
        </unmarshal>
        <process/>
        <bean customId="true" id="populateLivePrices" method="populateLivePrices" ref="livePriceServiceImpl"/>
    </transacted>
</route>

但是测试失败并显示以下输出:

java.lang.AssertionError: mock://output Not enough messages received. Was: 0
at org.apache.camel.component.mock.MockEndpoint.fail(MockEndpoint.java:1494)
    at org.apache.camel.component.mock.MockEndpoint.assertTrue(MockEndpoint.java:1482)
    at org.apache.camel.component.mock.MockEndpoint.assertExchangeReceived(MockEndpoint.java:1078)
    at com.xxx.liveprices.routes.LivePricesPersistenceRouteTest.testSendLivePricesCsvToTopic(LivePricesPersistenceRouteTest.java:78)
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.lang.reflect.Method.invoke(Method.java:498)
    at org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:50)
    at org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12)
    at org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:47)
    at org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17)
    at org.springframework.test.context.junit4.statements.RunBeforeTestExecutionCallbacks.evaluate(RunBeforeTestExecutionCallbacks.java:74)
    at org.springframework.test.context.junit4.statements.RunAfterTestExecutionCallbacks.evaluate(RunAfterTestExecutionCallbacks.java:84)
    at org.springframework.test.context.junit4.statements.RunBeforeTestMethodCallbacks.evaluate(RunBeforeTestMethodCallbacks.java:75)
    at org.springframework.test.context.junit4.statements.RunAfterTestMethodCallbacks.evaluate(RunAfterTestMethodCallbacks.java:86)
    at org.springframework.test.context.junit4.statements.SpringRepeat.evaluate(SpringRepeat.java:84)
    at org.junit.runners.ParentRunner.runLeaf(ParentRunner.java:325)
    at org.springframework.test.context.junit4.SpringJUnit4ClassRunner.runChild(SpringJUnit4ClassRunner.java:251)
    at org.springframework.test.context.junit4.SpringJUnit4ClassRunner.runChild(SpringJUnit4ClassRunner.java:97)
    at org.junit.runners.ParentRunner$3.run(ParentRunner.java:290)
    at org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:71)
    at org.junit.runners.ParentRunner.runChildren(ParentRunner.java:288)
    at org.junit.runners.ParentRunner.access$000(ParentRunner.java:58)
    at org.junit.runners.ParentRunner$2.evaluate(ParentRunner.java:268)
    at org.springframework.test.context.junit4.statements.RunBeforeTestClassCallbacks.evaluate(RunBeforeTestClassCallbacks.java:61)
    at org.springframework.test.context.junit4.statements.RunAfterTestClassCallbacks.evaluate(RunAfterTestClassCallbacks.java:70)
    at org.junit.runners.ParentRunner.run(ParentRunner.java:363)
    at org.springframework.test.context.junit4.SpringJUnit4ClassRunner.run(SpringJUnit4ClassRunner.java:190)
    at org.junit.runner.JUnitCore.run(JUnitCore.java:137)
    at com.intellij.junit4.JUnit4IdeaTestRunner.startRunnerWithArgs(JUnit4IdeaTestRunner.java:68)
    at com.intellij.rt.execution.junit.IdeaTestRunner$Repeater.startRunnerWithArgs(IdeaTestRunner.java:47)
    at com.intellij.rt.execution.junit.JUnitStarter.prepareStreamsAndStart(JUnitStarter.java:242)
    at com.intellij.rt.execution.junit.JUnitStarter.main(JUnitStarter.java:70)

任何人都可以指导我为什么该文件没有被使用以及为什么 Exchange 没有被拦截并发送到模拟端点吗?

最佳答案

经过更多阅读后,我仍然无法确定为什么以下代码没有读取并交换我的输入数据:

replaceFromWith("file://testCsvFile.csv");

我选择以字符串形式提供 CSV 文件的内容,并使用 weaveById 替换输入数据。

这是实现我的目标的测试:

@RunWith(CamelSpringBootRunner.class)
@SpringBootTest
@MockEndpoints
@UseAdviceWith
@DirtiesContext(classMode = DirtiesContext.ClassMode.AFTER_EACH_TEST_METHOD)
public class LivePricesPersistenceRouteTest {

    @Autowired
    CamelContext camelContext;

    @Autowired
    ProducerTemplate producerTemplate;

    @EndpointInject(uri = "mock:output")
    private MockEndpoint mockOutput;

    @Test
    public void testSendLivePricesCsvToTopic() throws Exception {

        camelContext.getRouteDefinition("live-prices-persistence-route")
                .adviceWith(camelContext, new AdviceWithRouteBuilder() {

                    @Override
                    public void configure() throws Exception {
                        replaceFromWith("direct:test");
                        weaveById("populateLivePrices").replace().inOut("mock:output");

                    }
                });

        camelContext.start();

        String message = "Date,Symbol,Open,\n" +
                "2019-07-09,BTCUSD,12347.18\n" +
                "2019-07-08,BTCUSD,11475.07";

        producerTemplate.sendBody("direct:test", message);

        Exchange exchange = mockOutput.assertExchangeReceived(0);
        List<LivePrice> livePrices = (List<LivePrice>)exchange.getIn().getBody();
        assertThat(livePrices.get(0).getDate(), is("2019-07-09"));
        assertThat(livePrices.get(0).getOpen(), is("12347.18"));

        assertThat(livePrices.get(1).getDate(), is("2019-07-08"));
        assertThat(livePrices.get(1).getOpen(), is("11475.07"));

        mockOutput.assertIsSatisfied();
    }
}

关于java - Camel Advice不使用指定文件替换端点,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/57019776/

相关文章:

java - 使用 BeanIO 在单个文件中解析多个对象

spring-boot - 无法启动路由 [A],因为不允许多个消费者使用同一端点 : [E]

java - React Native Android 应用程序突然停止工作。 __fbBatchedBridge(<未知文件> :1

java - 尝试使用整数数组解决回文

Java APT 如何在某些 apt 条件成立时中断 maven1 构建

java - @Value 注解不从属性文件中注入(inject)值

java - Camel Splitters 执行后会保留交换主体吗?

java - 如何使用 Apache Camel/Spring-boot 订阅持久主题?

java - Camel - 如何限制 sftp 消费者的文件大小?

apache-camel - 如何在多个 Apache Camel 路由中包含常见行为?