java - Spring Integration 手动发布消息到 channel

标签 java spring spring-integration

我正在学习如何使用 Java Spring 框架并开始试验 Spring Integration。我正在尝试使用 Spring Integration 将我的应用程序连接到 MQTT 代理以发布和订阅消息,但我无法找到手动将消息发布到出站 channel 的方法。如果可能的话,我想专门使用 java 代码中的符号而不是定义 bean 和其他相关配置的 xml 文件来构建它。

在每个示例中,我都看到手动发布消息的解决方案似乎是使用 MessagingGateway 接口(interface),然后使用 SpringApplicationBuilder 获取 ConfigurableApplicationContext 以在 main 方法中获取对网关接口(interface)的引用。该引用然后用于发布消息。是否可以将 AutoWired 用于接口(interface)?在我的尝试中,我只得到一个 NullPointer。

我的目标是构建一个游戏,在其中我订阅一个主题以获取游戏消息,然后每当用户准备好采取下一步行动时,向该主题发布一条新消息。

更新: 这是我一直在研究如何设置出站 channel 的示例之一:https://docs.spring.io/spring-integration/reference/html/mqtt.html

在 Gary Russel 回答后更新 2:

这是我在查看示例后编写的一些示例代码,这些示例在 Controller.java 中运行 gateway.sendToMqtt 时将 @AutoWired 用于网关时让我得到一个 NullPointer。我在这里想要实现的是在 Controller 处理 GET 请求时手动发送一条 mqtt 消息。

应用程序.java

@SpringBootApplication
public class Application {

    public static void main(String[] args){
        SpringApplication.run(Application.class, args);
    }
}

Controller .java

@RestController
@RequestMapping("/publishMessage")
public class Controller {

    @Autowired
    static Gateway gateway;

    @RequestMapping(method = RequestMethod.GET)
    public int request(){
        gateway.sendToMqtt("Test Message!");
        return 0;
    }
}

MqttPublisher.java

@EnableIntegration
@Configuration
public class MqttPublisher {
    @Bean
    public MqttPahoClientFactory mqttClientFactory(){
        DefaultMqttPahoClientFactory factory = new DefaultMqttPahoClientFactory();
        factory.setServerURIs("tcp://localhost:1883");
        return factory;
    }

    @Bean
    @ServiceActivator(inputChannel = "mqttOutboundChannel")
    public MessageHandler mqttOutbound(){
        MqttPahoMessageHandler messageHandler =
                new MqttPahoMessageHandler("clientPublisher", mqttClientFactory());
        messageHandler.setAsync(true);
        messageHandler.setDefaultTopic("topic");
        return messageHandler;
    }

    @Bean
    public MessageChannel mqttOutboundChannel(){
        return new DirectChannel();
    }

    @MessagingGateway(defaultRequestChannel = "mqttOutboundChannel")
    public interface Gateway {

        void sendToMqtt(String data);
    }
}

更新:

不确定这是否是正确的日志记录,但这是我通过添加得到的:

logging.level.org.springframework.web=Debug
logging.level.org.hibernate=Error

到 application.properties。

https://hastebin.com/cuvonufeco.hs

最佳答案

使用 Messaging Gateway或者简单地向 channel 发送消息。

编辑

@SpringBootApplication
public class So47846492Application {

    public static void main(String[] args) {
        SpringApplication.run(So47846492Application.class, args).close();
    }

    @Bean
    public ApplicationRunner runner(MyGate gate) {
        return args -> {
            gate.send("someTopic", "foo");
            Thread.sleep(5_000);
        };
    }

    @Bean
    @ServiceActivator(inputChannel = "toMqtt")
    public MqttPahoMessageHandler mqtt() {
        MqttPahoMessageHandler handler = new MqttPahoMessageHandler("tcp://localhost:1883", "foo",
                clientFactory());
        handler.setDefaultTopic("myTopic");
        handler.setQosExpressionString("1");
        return handler;
    }

    @Bean
    public MqttPahoClientFactory clientFactory() {
        DefaultMqttPahoClientFactory factory = new DefaultMqttPahoClientFactory();
        factory.setUserName("guest");
        factory.setPassword("guest");
        return factory;
    }

    @Bean
    public MqttPahoMessageDrivenChannelAdapter mqttIn() {
        MqttPahoMessageDrivenChannelAdapter adapter =
                new MqttPahoMessageDrivenChannelAdapter("tcp://localhost:1883", "bar", "someTopic");
        adapter.setOutputChannelName("fromMqtt");
        return adapter;
    }

    @ServiceActivator(inputChannel = "fromMqtt")
    public void in(String in) {
        System.out.println(in);
    }

    @MessagingGateway(defaultRequestChannel = "toMqtt")
    public interface MyGate {

        void send(@Header(MqttHeaders.TOPIC) String topic, String out);

    }

}

关于java - Spring Integration 手动发布消息到 channel ,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/47846492/

相关文章:

java - 我想创建一个运算符来除以 Problem1 : ("+6x" ,"2"), get ("+3x");问题2 : ("12" ,"2"), 得到("6")。

java - 如何定义一个 "Savable"接口(interface),允许实现类通过任何所需的方法保存?

java - 当 WebMvcAutoConfiguration$EnableWebMvcConfiguration 只需要一个 entityManager 时如何在 Spring Boot 中进行 Multi-Tenancy

java - 在 Spring 发送电子邮件

java - Spring Integration DSL,从消息 channel 轮询

java - Selenium 单击有时会导致页面加载,有时不会

java - 日期未正确插入数据库

java - 如何访问 int-xml :xslt-transformer tag? 的多个 xsl 资源

exception-handling - Spring Integration - 在服务激活器组件中发生异常时写入错误队列

java - Hibernate 实体扩展基类,为实体形成的表没有基类中的属性列