java - Spring集成(MQTT): Retrieving published message

标签 java spring spring-boot mqtt

我是 Spring Boot 新手,正在尝试使用 Spring 集成中的示例来使用 MQTT 进行订阅和发布。我设法将其与 Thingsboard 集成,并且下面代码中的记录器能够接收来自 Thingsboard 的已发布消息。

public static void main(String[] args) {
    SpringApplication.run(MqttTest.class);
}

@Bean
public MqttPahoClientFactory mqttClientFactory() {
    DefaultMqttPahoClientFactory factory = new DefaultMqttPahoClientFactory();
    MqttConnectOptions options = new MqttConnectOptions();
    options.setServerURIs(new String[] { "URI HERE" });
    options.setUserName("ACCESS TOKEN HERE");
    factory.setConnectionOptions(options);
    return factory;
}

// consumer

@Bean
public IntegrationFlow mqttInFlow() {
    return IntegrationFlows.from(mqttInbound())
            .transform(p -> p)
            .handle(logger())
            .get();
}

private LoggingHandler logger() {
    LoggingHandler loggingHandler = new LoggingHandler("INFO");
    loggingHandler.setLoggerName("LoggerBot");
    return loggingHandler;
}

@Bean
public MessageProducerSupport mqttInbound() {
    MqttPahoMessageDrivenChannelAdapter adapter = new MqttPahoMessageDrivenChannelAdapter("Consumer",
            mqttClientFactory(), "v1/devices/me/rpc/request/+");
    adapter.setCompletionTimeout(5000);
    adapter.setConverter(new DefaultPahoMessageConverter());
    adapter.setQos(1);
    return adapter;
}

这是控制台输出。我能够接收从 thingsboard 仪表板发送的已发布 json 消息。我想知道是否有一个调用方法来检索 json 消息字符串,以便我可以进一步处理它。谢谢。

2019-02-01 14:06:23.590  INFO 13416 --- [ Call: Consumer] LoggerBot : {"method":"setValue","params":true}
2019-02-01 14:06:24.840  INFO 13416 --- [ Call: Consumer] LoggerBot : {"method":"setValue","params":false}

最佳答案

要处理已发布的消息,请将消息句柄订阅到流以使用消息。

消息处理程序

    @Bean
    public IntegrationFlow mqttInFlow() {
        return IntegrationFlows.from(mqttInbound())
                .transform(p -> p)
                .handle( mess -> {
                   System.out.println("mess"+mess);
                 })            
                .get();
    }

服务激活器


    @Bean
    public IntegrationFlow mqttInFlow() {
        return IntegrationFlows.from(mqttInbound())
                .transform(p -> p)
                .handle("myService","handleHere")
                .handle(logger())
                .get();
    }

@Component
public class MyService {

    @ServiceActivator
    public Object handleHere(@Payload Object mess) {
        System.out.println("payload "+mess);
        return mess;
    }
}

注意:正如我们所讨论的,有很多不同的方法可以实现它。 这只是一个供您理解的示例。

关于java - Spring集成(MQTT): Retrieving published message,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/54474014/

相关文章:

java - 通过docker compose制作镜像自动启动Spring Boot和Cassandra

java - Quarkus:对以字符串形式给出的 URL 的请求

java - 从另一个类调用变量时是否可以避免使用 "static"?

java - Spring 安全: Login authentication controller

java - 考虑在配置中定义一个名为 'entityManagerFactory' 的 bean - Spring boot

java - joinPoint.proceed() 有什么作用?

java - 检测到预设 ID 为 "detached entity"的新实体

java - JAR可运行文件无法在其他PC上执行

java - spring boot中如何访问非主DataSource?

java - 如何从 Maven 中的另一个依赖项添加源?