我是 Spring Boot 新手,正在尝试使用 Spring 集成中的示例来使用 MQTT 进行订阅和发布。我设法将其与 Thingsboard 集成,并且下面代码中的记录器能够接收来自 Thingsboard 的已发布消息。
public static void main(String[] args) {
SpringApplication.run(MqttTest.class);
}
@Bean
public MqttPahoClientFactory mqttClientFactory() {
DefaultMqttPahoClientFactory factory = new DefaultMqttPahoClientFactory();
MqttConnectOptions options = new MqttConnectOptions();
options.setServerURIs(new String[] { "URI HERE" });
options.setUserName("ACCESS TOKEN HERE");
factory.setConnectionOptions(options);
return factory;
}
// consumer
@Bean
public IntegrationFlow mqttInFlow() {
return IntegrationFlows.from(mqttInbound())
.transform(p -> p)
.handle(logger())
.get();
}
private LoggingHandler logger() {
LoggingHandler loggingHandler = new LoggingHandler("INFO");
loggingHandler.setLoggerName("LoggerBot");
return loggingHandler;
}
@Bean
public MessageProducerSupport mqttInbound() {
MqttPahoMessageDrivenChannelAdapter adapter = new MqttPahoMessageDrivenChannelAdapter("Consumer",
mqttClientFactory(), "v1/devices/me/rpc/request/+");
adapter.setCompletionTimeout(5000);
adapter.setConverter(new DefaultPahoMessageConverter());
adapter.setQos(1);
return adapter;
}
这是控制台输出。我能够接收从 thingsboard 仪表板发送的已发布 json 消息。我想知道是否有一个调用方法来检索 json 消息字符串,以便我可以进一步处理它。谢谢。
2019-02-01 14:06:23.590 INFO 13416 --- [ Call: Consumer] LoggerBot : {"method":"setValue","params":true}
2019-02-01 14:06:24.840 INFO 13416 --- [ Call: Consumer] LoggerBot : {"method":"setValue","params":false}
最佳答案
要处理已发布的消息,请将消息句柄订阅到流以使用消息。
消息处理程序
@Bean
public IntegrationFlow mqttInFlow() {
return IntegrationFlows.from(mqttInbound())
.transform(p -> p)
.handle( mess -> {
System.out.println("mess"+mess);
})
.get();
}
服务激活器
@Bean
public IntegrationFlow mqttInFlow() {
return IntegrationFlows.from(mqttInbound())
.transform(p -> p)
.handle("myService","handleHere")
.handle(logger())
.get();
}
@Component
public class MyService {
@ServiceActivator
public Object handleHere(@Payload Object mess) {
System.out.println("payload "+mess);
return mess;
}
}
注意:正如我们所讨论的,有很多不同的方法可以实现它。 这只是一个供您理解的示例。
关于java - Spring集成(MQTT): Retrieving published message,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/54474014/