java - Camel split 后异常不会升级

标签 java split apache-camel

我们在 Camel 中定义了一个路由,并且必须找出处理器中是否抛出异常。当我们只有一个处理器时,Camel 会在 sendBody() 方法中重新抛出异常。如果前面有拆分/聚合,则不会抛出异常。所以下面例子的结果是

before throwing Exception

after sendBody

如果我省略从 .split 到 .completionSize(1) 的所有内容,则输出为

before throwing Exception

Exception thrown

如果在拆分后发生异常,有什么想法可以查明吗?

private static final String DIRECT_START = "direct:start";

public static void main(String[] args) throws Exception {
    CamelContext context = new DefaultCamelContext();

    context.addRoutes(new RouteBuilder() {
        @Override
        public void configure() throws Exception {

            from(DIRECT_START)
            .split(body())
                .aggregate(constant(true), new AggregationStrategy() {
                    public Exchange aggregate(Exchange oldExchange, Exchange newExchange) {
                        return oldExchange == null ? newExchange : oldExchange;
                    }
                })
                .completionSize(1)
            .process(new Processor() {
                public void process(Exchange exchange) throws Exception {
                    System.out.println("before throwing Exception");
                    exchange.setException(new Exception());
                    throw new Exception("my Exception");
                }
            });
        }});        

    context.start();

    ProducerTemplate producer = context.createProducerTemplate();
    try {
        producer.sendBody(DIRECT_START, Integer.valueOf(42));
        System.out.println("after sendBody");
    } catch (Exception e) {
        System.out.println("Exception thrown");
    }

    context.stop();

}

为了事后检查异常,我们找到了解决方案。我们向 onException() 注册了一个 ErrorProcessor,它将状态设置到上下文属性中。

但这不会中断 producer.sendBody(..)。我们有超长时间运行的处理器,我们必须中断它们。

所以问题是,我们可以配置 Camel 以在 sendBody 中抛出 Exception 还是可以在 Exceptionhandler 中执行此操作?

最佳答案

我强烈推荐在 Camel in Action(第 8.3.5 节)中有一个很好的章节介绍 Splitter EIP 和异常处理。该部分解释说:

When using a custom AggregationStrategy with the Splitter, it's important to know that you're responsible for handling exceptions. If you don't propagate the exception back, the Splitter will assume you have handled the exception, and ignore it.

您已经使用了 split()方法而不指定聚合器。在Camel documentation , 他们指定

The splitter will by default return the original input message

这意味着交换离开 split()方法没有异常,因此不会将异常传播回您的调用代码。您从处理器抛出的异常在技术上位于拆分器内部。即使您使用了聚合器,它也没有与 split 关联调用,并且您还没有明确结束 splitend() .因此,当您的处理器抛出异常时,拆分器会忽略它,因为您没有提供聚合器来处理和传播异常。

我们可以通过将您的聚合策略传递给 split 来对此进行测试像这样调用作为参数:

.split(body(), new AggregationStrategy() {
    public Exchange aggregate(Exchange oldExchange, Exchange newExchange) {
        System.out.println("Aggregating");
        return oldExchange == null ? newExchange : oldExchange;
    }
})
    .log("test") // inside the split/aggregator EIP
.end() // outside the split/aggregator EIP
.process(new Processor() {
    public void process(Exchange exchange) throws Exception {
        System.out.println("before throwing Exception");
        throw new Exception("my Exception");
    }
});

你会得到输出:

test
Aggregating
before throwing Exception
Exception thrown

如果您希望处理器位于拆分/聚合器 EIP 内,如下所示:

.split(body(), new AggregationStrategy() {
    public Exchange aggregate(Exchange oldExchange, Exchange newExchange) {
        System.out.println("Aggregating");
        return oldExchange == null ? newExchange : oldExchange;
    }
})
    .process(new Processor() { // inside the split/aggregator EIP
        public void process(Exchange exchange) throws Exception {
            System.out.println("before throwing Exception");
            throw new Exception("my Exception");
        }
    })
.end(); // outside the split/aggregator EIP

你会得到输出:

before throwing Exception
Aggregating
Exception thrown

请注意,在拆分/聚合器 EIP 中,聚合器是如何在抛出异常后运行的?这很重要,因为没有聚合器传递异常,拆分器将忽略它。为了使其工作,您需要在聚合器内正确传播您的异常。例如,在您的代码中,如果 newExchange是包含一个异常,它会被忽略,因为你没有传播它。您需要更改聚合器以添加:

if (newExchange.getException() != null) {
    oldExchange.setException(newExchange.getException());
}

注意:如果您有 onException()在split EIP内部调用,设置异常处理中,调用getException()时不再返回.所以如果你想处理你的异常,但仍然通过聚合器传播它们,你可以使用 exchange.getProperty(Exchange.EXCEPTION_CAUGHT);

您还可以使用 .stopOnException() ,像这样:

.split(body()).stopOnException()
    .process(new Processor() { // inside the split/aggregator EIP
        public void process(Exchange exchange) throws Exception {
            System.out.println("before throwing Exception");
            throw new Exception("my Exception");
        }
    });

这会导致拆分在出现异常时停止,并传播它。但是,当您在 stopOnException() 之后放置聚合器时,它不再有效。我不完全确定为什么。我猜这是因为聚合器改变了 exchange对象。

另请注意,您无需在处理器中为交易所设置异常(exception)。当处理器抛出异常时,Camel 会为你做这件事。所以行 exchange.setException(new Exception());在处理器中是没有必要的。

tl;dr 所以是的,您可以从拆分内部将异常传播到调用方法。您只需要确保它是通过与拆分关联的聚合器或设置 stopOnException() 完成的。 .这取决于您尝试通过拆分/聚合/处理实现的最佳方法是什么。

关于java - Camel split 后异常不会升级,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/35338736/

相关文章:

java - 如何从camel路由中的camel上下文获取属性值

java - ImageJ:获取特定的Radiobox

java - 创建描边形状

java - 如何在Windows中向临时文件夹写入和读取文件

r - 拆分字符串并检查所有元素在 R 中是否唯一

list - 在指定元素处拆分列表

java - mysql批量更新限制

python - 使用 shlex 拆分多行字符串并保留引号字符

java - Camel 作为 JMS 客户端

java - 访问 Camel Freemarker ftl 中的 RequestParameters