Spring集成MQTT发布订阅多个主题

标签 spring spring-integration publish-subscribe mqtt

我正在尝试构建一个应用程序,它订阅多个 mqtt 主题,获取信息,处理它并形成 xml,并在处理时触发一个事件,以便这些可以发送到某个云服务器,并从那里成功响应发送回mqtt channel 。

<int-mqtt:message-driven-channel-adapter
    id="mqttAdapter" client-id="${clientId}" url="${brokerUrl}" topics="${topics}"
    channel="startCase" auto-startup="true" />

<int:channel id="startCase" />

<int:service-activator id="startCaseService"
        input-channel="startCase" ref="msgPollingService" method="pollMessages" />

    <bean id="mqttTaskExecutor"
        class="org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor">
        <property name="corePoolSize" value="5" />
        <property name="maxPoolSize" value="10" />
    </bean>

    <bean id="msgPollingService" class="com.xxxx.xxx.mqttclient.mqtt.MsgPollingService">
        <property name="taskExecutor" ref="mqttTaskExecutor" />
        <property name="vendorId" value="${vendorId}" />
    </bean>

我的问题是如何将其发布到多个 channel ,即我是否可以选择将 X 消息发布到 Y 主题。目前我有以下内容:

<int:channel id="outbound" />

<int-mqtt:outbound-channel-adapter
    id="mqtt-publish" client-id="kj" client-factory="clientFactory"
    auto-startup="true" url="${brokerUrl}" default-qos="0"
    default-retained="true" default-topic="${responseTopic}" channel="outbound" />

    <bean id="eventListner" class="com.xxxx.xxxx.mqttclient.event.EventListener">
        <property name="sccUrl" value="${url}" />
        <property name="restTemplate" ref="restTemplate" />
        <property name="channel" ref="outbound" />
    </bean>

我可以这样发布:

channel.send(MessageBuilder.withPayload("customResponse").build());

我可以做这样的事情:

channel.send(Message<?>, topic)

最佳答案

你的配置看起来不错。然而MessageChannel是松散耦合的抽象,只能处理 Message

所以,你请求 a-la channel.send(Message<?>, topic)对于消息传递概念来说不正确。

但是我们有一个技巧给你。来自 AbstractMqttMessageHandler :

String topic = (String) message.getHeaders().get(MqttHeaders.TOPIC);
.....
this.publish(topic == null ? this.defaultTopic : topic, mqttMessage, message);

所以,您需要从代码中得到的是:

channel.send(MessageBuilder.withPayload("customResponse").setHeader(MqttHeaders.TOPIC, topic).build());

换句话说,您应该发送 Messagemqtt_topic header实现动态发布<int-mqtt:outbound-channel-adapter> .

从另一方面来看,我们不建议使用 MessageChannel直接来自应用程序。 <gateway>带有服务接口(interface)就是针对最终应用程序的这种情况。那在哪里topic可以是标记为 @Header(MqttHeaders.TOPIC) 的服务方法参数之一

关于Spring集成MQTT发布订阅多个主题,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/28313985/

相关文章:

scala - tomcat 网络套接字 : cannot connect to tomcat server

c# - Nservicebus 不在 msmq 中存储订阅者

android - 使用MQTT协议(protocol)的android消息系统的可行性

java - 如何使用ajax文件上传和spring mvc上传文件?

java - spring 如何将参数注入(inject)到 Controller 方法中

java - Spring Boot 集成 - 在连接初始化时发送问候

java - Spring集成+Spring AMQP : How can I passing MessageProperties to int:service-activator?

java - 对运行不同进程的同一应用程序使用不同的 log4j 配置文件

java - 从基于配置的服务 bean 切换到基于注释的服务 bean 时的 TransactionProxyFactoryBean

java - maprsteam 与 spring 集成 java 客户端