java - 整合Spring Boot和Reactor-Kafka的KafkaReceiver

标签 java spring spring-boot project-reactor spring-kafka

我正在尝试使用库 reactor-kafka 开发一个 Spring Boot 应用程序,以对从 Kafka 主题读取的一些消息使用react。

我有一个构建KafkaReceiver 的配置类。

@Configuration
public class MyConfiguration {

    @Bean
    public KafkaReceiver<String, String> kafkaReceiver() {
        Map<String, Object> props = new HashMap<>();
        // Options initialisation...
        final ReceiverOptions<String, String> receiverOptions =
                ReceiverOptions.<String, string>create(props)
                               .subscription(Collections.singleton(consumer.getTopic()));
        return KafkaReceiver.create(receiverOptions);
    } 
}

那么……现在呢?使用不太 react 的 spring-kafka 库,我可以用 @KafkaListener 注释一个方法,Spring Boot 会为我创建一个从 Kafka 主题监听的线程。

我应该把 KafkaReceiver 放在哪里?在所有示例中,我发现直接使用 main 方法,但这不是 Boot 方式

我正在使用 Spring Boot 2.1.3 和 Reactor-Kafka 1.1.0

提前致谢。

最佳答案

既然你有那个 KafkaReceiver bean,现在你可以这样做:

@Bean
public ApplicationRunner runner(KafkaReceiver<String, String> kafkaReceiver) {
        return args -> {
                kafkaReceiver.receive()
                          ...
                          .sunbscribe();
        };
}

ApplicationRunner bean 将在 ApplicationContext 准备就绪时被踢出。有关详细信息,请参阅其 JavaDocs。

关于java - 整合Spring Boot和Reactor-Kafka的KafkaReceiver,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/55242567/

相关文章:

spring - 如何知道当前 Web 访问者是否使用 Spring Security 3.0 登录

spring - 使用 Spring @Procedure 调用存储过程而不绑定(bind)到表

java - JHipster Spring 启动失败,出现 Logback 配置错误

java - 如何通过 Jenkins 运行 jar 文件?

java - NoDefFound 错误

java - 如何在 MongoDB Java 驱动程序中将动态字段插入到文档中

java - 如何阻止 hibernate 工具实现 Serialized 接口(interface)

java - spring-boot Amazon Elastic Beanstalk 忽略 'SPRING_APPLICATION_JSON'

java - 从单个 tomcat 上运行的多个应用程序关闭 spring boot 应用程序

java - 需要帮助理解集合在 lambda 语句中的使用