spring - MQTT Apache Camel 路由错误导致应用程序停止

标签 spring apache-camel jms mqtt

我正在使用 Apache Camel 和 Spring(不是 Spring Boot)和 xml。

我有我的 camel-context.xml 配置文件,其中包含从 MQTT 服务器到 JMS 服务器以及反之亦然的示例路由,只是为了发送消息。

这是我的 camel-context.xml:

<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
    xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
    xmlns:context="http://www.springframework.org/schema/context"
    xmlns:util="http://www.springframework.org/schema/util"
    xmlns:camel="http://camel.apache.org/schema/spring"
    xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd
                        http://www.springframework.org/schema/context http://www.springframework.org/schema/context/spring-context.xsd
                        http://www.springframework.org/schema/util http://www.springframework.org/schema/util/spring-util.xsd
                        http://camel.apache.org/schema/spring http://camel.apache.org/schema/spring/camel-spring.xsd">

    <bean id="quadtreeProcessor" class="es.gateway.router.QuadtreeProcessor" />

    <camelContext id="camelContext" xmlns="http://camel.apache.org/schema/spring">
        <route id="quadTreeConsumerProducer">
            <from uri="jms:topic:T_ETSI_PRODUCER"/>
            <process ref="quadtreeProcessor"/>
            <to uri="mqtt:quadtree?host=tcp://localhost:1883&amp;publishTopicName=${header.publishTopicName}"/>
        </route>

        <route id="quadTreeConsumerRoute">
            <from uri="mqtt:quadtree?host=tcp://localhost:1883&amp;subscribeTopicName=CONSUMER/DENM/#"/>
            <to uri="jms:topic:T_ETSI_CONSUMER"/>
        </route>
    </camelContext>

    <bean id="jmsConnectionFactory" class="org.apache.activemq.ActiveMQConnectionFactory">
        <property name="brokerURL" value="tcp://localhost:10011"/>
    </bean>
    <bean id="activemq" class="org.apache.activemq.camel.component.ActiveMQComponent" />

这是我的 Main.class:

package es.conncar.main;

import org.apache.camel.RuntimeCamelException;
import org.apache.camel.main.MainListener;
import org.apache.camel.main.MainListenerSupport;
import org.apache.camel.main.MainSupport;
import org.apache.log4j.Logger;
import org.springframework.context.ApplicationContext;
import org.springframework.context.support.ClassPathXmlApplicationContext;

import es.gateway.config.Config;

public class Main {

    final static Logger logger = Logger.getLogger(Main.class);

    private static boolean exit = false;

    private org.apache.camel.spring.Main camelMain;

    public static void main(String[] args) throws Exception {
        try {
            ApplicationContext springContext = new ClassPathXmlApplicationContext("app-context.xml");
            logger.info("Spring context initialized");

            Config config = (Config) springContext.getBean("config");

            Main main = new Main();
            main.boot();
        } catch (Exception e) {
            logger.error("Unknown error in main", e);
        }
    }

    public void boot() throws Exception {
        camelMain = new org.apache.camel.spring.Main();
        camelMain.addMainListener((MainListener) new Events());
        camelMain.setApplicationContextUri("camel-context.xml"); 
        logger.info("Starting Camel. Use ctrl + c to terminate the JVM.\n");
        camelMain.run();
    }

    public static class Events extends MainListenerSupport {

        @Override
        public void afterStart(MainSupport main) {
            logger.info("Camel is now started!");
        }

        @Override
        public void beforeStop(MainSupport main) {
            logger.info("Camel is now being stopped!");
        }
    }
}

在我的主类中运行相同的路由和配置时出现问题,该类在几秒后因 RuntimeCamelException 而终止。

异常跟踪是:

Camel 启动正常

[30/11/2018 08:37:08][INFO ][es.gateway.main.Main] - Spring context initialized
[30/11/2018 08:37:08][INFO ][es.gateway.main.Main] - Starting Camel. Use ctrl + c to terminate the JVM. 

10 秒后

[30/11/2018 08:37:21][ERROR][es.gateway.main.Main] - Unknown error in Camel
    org.apache.camel.RuntimeCamelException: java.util.concurrent.TimeoutException
        at org.apache.camel.util.ObjectHelper.wrapRuntimeCamelException(ObjectHelper.java:1830)
        at org.apache.camel.spring.SpringCamelContext.start(SpringCamelContext.java:136)
        at org.apache.camel.spring.CamelContextFactoryBean.start(CamelContextFactoryBean.java:369)
        at org.apache.camel.spring.CamelContextFactoryBean.onApplicationEvent(CamelContextFactoryBean.java:416)
        at org.apache.camel.spring.CamelContextFactoryBean.onApplicationEvent(CamelContextFactoryBean.java:94)
        at org.springframework.context.event.SimpleApplicationEventMulticaster.doInvokeListener(SimpleApplicationEventMulticaster.java:172)
        at org.springframework.context.event.SimpleApplicationEventMulticaster.invokeListener(SimpleApplicationEventMulticaster.java:165)
        at org.springframework.context.event.SimpleApplicationEventMulticaster.multicastEvent(SimpleApplicationEventMulticaster.java:139)
        at org.springframework.context.support.AbstractApplicationContext.publishEvent(AbstractApplicationContext.java:393)
        at org.springframework.context.support.AbstractApplicationContext.publishEvent(AbstractApplicationContext.java:347)
        at org.springframework.context.support.AbstractApplicationContext.finishRefresh(AbstractApplicationContext.java:883)
        at org.springframework.context.support.AbstractApplicationContext.refresh(AbstractApplicationContext.java:546)
        at org.springframework.context.support.ClassPathXmlApplicationContext.<init>(ClassPathXmlApplicationContext.java:139)
        at org.springframework.context.support.ClassPathXmlApplicationContext.<init>(ClassPathXmlApplicationContext.java:93)
        at org.apache.camel.spring.Main.createDefaultApplicationContext(Main.java:222)
        at org.apache.camel.spring.Main.doStart(Main.java:154)
        at org.apache.camel.support.ServiceSupport.start(ServiceSupport.java:61)
        at org.apache.camel.main.MainSupport.run(MainSupport.java:170)
        at es.gateway.main.Main.boot(Main.java:105)
        at es.gateway.main.Main.main(Main.java:77)
    Caused by: java.util.concurrent.TimeoutException
        at org.fusesource.mqtt.client.Promise.await(Promise.java:83)
        at org.apache.camel.component.mqtt.MQTTEndpoint.connect(MQTTEndpoint.java:348)
        at org.apache.camel.component.mqtt.MQTTConsumer.doStart(MQTTConsumer.java:38)
        at org.apache.camel.support.ServiceSupport.start(ServiceSupport.java:61)
        at org.apache.camel.impl.DefaultCamelContext.startService(DefaultCamelContext.java:3705)
        at org.apache.camel.impl.DefaultCamelContext.doStartOrResumeRouteConsumers(DefaultCamelContext.java:4023)
        at org.apache.camel.impl.DefaultCamelContext.doStartRouteConsumers(DefaultCamelContext.java:3958)
        at org.apache.camel.impl.DefaultCamelContext.safelyStartRouteServices(DefaultCamelContext.java:3878)
        at org.apache.camel.impl.DefaultCamelContext.doStartOrResumeRoutes(DefaultCamelContext.java:3642)
        at org.apache.camel.impl.DefaultCamelContext.doStartCamel(DefaultCamelContext.java:3494)
        at org.apache.camel.impl.DefaultCamelContext.access$000(DefaultCamelContext.java:209)
        at org.apache.camel.impl.DefaultCamelContext$2.call(DefaultCamelContext.java:3253)
        at org.apache.camel.impl.DefaultCamelContext$2.call(DefaultCamelContext.java:3249)
        at org.apache.camel.impl.DefaultCamelContext.doWithDefinedClassLoader(DefaultCamelContext.java:3272)
        at org.apache.camel.impl.DefaultCamelContext.doStart(DefaultCamelContext.java:3249)
        at org.apache.camel.support.ServiceSupport.start(ServiceSupport.java:61)
        at org.apache.camel.impl.DefaultCamelContext.start(DefaultCamelContext.java:3165)
        at org.apache.camel.spring.SpringCamelContext.start(SpringCamelContext.java:133)
        ... 18 more

看起来路由在无法连接到 MQTT 代理时抛出超时(没关系),但此异常并未在路由中捕获,而是在上下文 - 主类中捕获(这不行)。

我遵循了这本食谱:

http://camel.apache.org/running-camel-standalone-and-have-it-keep-running.html

org.apache.camel.spring.Main 类中使用 camel-spring JAR

我还查看了 Apache Camel in Action 一书中的第 13 章,但我没有找到解决方案。看来我正在开始并配置 camel 上下文就好了。

谁有这方面的经验?当路由中出现 RuntimeExceptions 时,有没有办法让路由正常工作并使主程序保持事件状态?我希望如此!

提前致谢!

编辑:我发现有几个话题在谈论这个。该解决方案似乎正在激活监督 Controller ,但我找不到没有 Spring Boot 的方法(我使用的是带有混合 xml 和注释配置的普通 Spring)。有人可以帮忙吗?

How to start camel even if the MQTT server is not reachable

Spring Context shutting down camel routes when activemq connection is lost

EDIT2:我检查过问题仅出在 MQTT。 JMS 连接正常工作,即使在 JMS 代理关闭时也能继续连接并优雅地处理重新连接,而 MQTT 连接则不行。

EDIT3:Paho 工作正常但普通 MQTT 不行。我已经在 xml 配置 (uri) 中使用 Paho 测试了相同的代码,它的行为符合预期,类似于 JMS。该路由抛出异常但继续前进,它不会引发导致应用程序停止的上下文的异常。也许我在 MQTT 客户端中缺少选项?

最佳答案

我建议使用 camel-paho,因为 Eclipse Paho 维护得更活跃 比 camel-mqtt 使用的旧 FuseSource MQTT 客户端库要好。

也就是说,camel-mqtt 组件的问题在于它在启动时需要有效连接。您可以配置其重新连接选项以设置各种延迟等。

还有一个替代的路由启动机制,它允许使用监控路由并处理重试等的后台线程启动路由。它是 SupervisingRouteController,请注意还有一些事情需要实现围绕其 JMX 管理功能,但在其他方面应该没问题。而且它缺乏更合适的文档。我们正在考虑使其在 Camel 3 中更加突出或默认。

这里有一个例子:https://github.com/apache/camel/tree/master/examples/camel-example-spring-boot-supervising-route-controller

关于spring - MQTT Apache Camel 路由错误导致应用程序停止,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/53546604/

相关文章:

java - 将字符串映射到类(MapStruct)时出现问题

java - 有没有办法将实体加载到派生类的实例中?

java - 集成 springMVC 和 extjs

apache-camel - Apache Camel : Route from camel tomcat servlet to cxfbean component

java - 如何获取Cxfrs :server input and output types

java - Log4j JMS 附加程序示例

symfony - 为 InvalidConfigurationException 配置 jms_i18n_routing

java - 使用 Spring Security 当用户不存在时如何显示错误消息?

java - Apache Camel,将文件移动到 .camel 的重新排序器

java - 运行多个 Spring Boot 测试时,@MockBean 在 JMS 监听器中使用不同的实例