java - Spring MqttPahoMessageDrivenChannelAdapter 失去连接 :Connection lost; retrying

标签 java spring spring-integration mqtt paho

我们正在使用 Spring message-driven-channel-adapter 来订阅 MQTT 主题。但是我们经常遇到错误。我已经使用 JavaScript 客户端(mqttws31.js)测试了连接,它工作正常。表示连接没有问题。


org.springframework.integration.mqtt.inbound.MqttPahoMessageDrivenChannelAdapter connectionLost
SEVERE: Lost connection:Connection lost; retrying...

MQTT 消息:-

[payload=6483D03E4C75BA943148F18D73,1.00,1E, headers={mqtt_retained=false, mqtt_qos=0, 
id=5fa41168-34c6-1e3d-a775-e3146842990a, mqtt_topic=TEST/GATEWAY2, mqtt_duplicate=false, timestamp=1499067757559}]


<bean id="clientFactory"
    <property name="userName" value="${mqtt.username}" />
    <property name="password" value="${mqtt.password}" />

    id="mqttInbound" client-id="${}" url="${mqtt.url}"
    topics="${topics}" client-factory="clientFactory" auto-startup="true"
    channel="output" error-channel="errorChannel" />

<int:channel id="output" />
<int:channel id="errorChannel" />

<int:service-activator input-channel="errorChannel"
    ref="errorMessageLogger" method="logError" />
<bean id="errorMessageLogger" class="com.mqtt.ErrorMessageLogger" />

<int:service-activator input-channel="output"
    method="handleMessage" ref="mqttLogger" />
<bean id="mqttLogger" class="com.mqtt.MqttReciever" />

pom.xml :


调试 org.eclipse.paho.client.mqttv3-1.1.1-sources.jar 时:-


public void run() {
        final String methodName = "run";
        MqttToken token = null;

        while (running && (in != null)) {
            try {
                //@TRACE 852=network read message
                receiving = in.available() > 0;
                MqttWireMessage message = in.readMqttWireMessage();
                receiving = false;

                // instanceof checks if message is null
                if (message instanceof MqttAck) {
                    token = tokenStore.getToken(message);
                    if (token!=null) {
                        synchronized (token) {
                            // Ensure the notify processing is done under a lock on the token
                            // This ensures that the send processing can complete  before the
                            // receive processing starts! ( request and ack and ack processing
                            // can occur before request processing is complete if not!
                    } else if(message instanceof MqttPubRec || message instanceof MqttPubComp || message instanceof MqttPubAck) {
                        //This is an ack for a message we no longer have a ticket for.
                        //This probably means we already received this message and it's being send again
                        //because of timeouts, crashes, disconnects, restarts etc.
                        //It should be safe to ignore these unexpected messages.
                        log.fine(CLASS_NAME, methodName, "857");
                    } else {
                        // It its an ack and there is no token then something is not right.
                        // An ack should always have a token assoicated with it.
                        throw new MqttException(MqttException.REASON_CODE_UNEXPECTED_ERROR);
                } else {
                    if (message != null) {
                        // A new message has arrived
            catch (MqttException ex) {
                //@TRACE 856=Stopping, MQttException
                running = false;
                // Token maybe null but that is handled in shutdown
                clientComms.shutdownConnection(token, ex);
            catch (IOException ioe) {
                //@TRACE 853=Stopping due to IOException

                running = false;
                // An EOFException could be raised if the broker processes the
                // DISCONNECT and ends the socket before we complete. As such,
                // only shutdown the connection if we're not already shutting down.
                if (!clientComms.isDisconnecting()) {
                    clientComms.shutdownConnection(token, new MqttException(MqttException.REASON_CODE_CONNECTION_LOST, ioe));
            finally {
                receiving = false;

        //@TRACE 854=<

在上述方法中,有时 in.readMqttWireMessage() 抛出 IOException。因此,基于 catch block ,它使用 clientComms.shutdownConnection(token, ...



只是想分享一下以防有帮助... 我遇到了同样的异常,并通过确保生成唯一的客户端 ID(使用 MqttAsyncClient.generateClientId())修复了它,如下所述:

关于java - Spring MqttPahoMessageDrivenChannelAdapter 失去连接 :Connection lost; retrying,我们在Stack Overflow上找到一个类似的问题:


java - 带图标的抽屉导航 ListAdapter

eclipse - 需要做一个简单的 spring 3 示例工作

java - Spring - RestTemplate 调用 https rest 服务时出错(证书错误)

spring-integration - 具有基本身份验证的 Http 出站 channel 适配器

java - 如何解决 : java. sql.SQLException : setDate, 异常 = 无效的年份值

java - 安卓代码说明

JAVA - 不允许按住跳跃按钮


java - Spring Integration Java DSL流的STS可视化

java - 无法发送 UDP 消息响应