java - 如何从环境变量将动态主题名称传递给@KafkaListener(topics)

标签 java spring-boot kafka-consumer-api spring-kafka spring-el

我正在编写一个 Kafka 消费者。我需要将环境变量主题名称传递给 @KafkaListener(topics = ...)。这是我到目前为止所尝试过的:

 import org.springframework.beans.factory.annotation.Autowired;
 import org.springframework.kafka.annotation.KafkaListener; 
 import org.springframework.stereotype.Service;

 @Service
 public class KafkaConsumer {

     @Autowired
     private EnvProperties envProperties;

     private final String topic = envProperties.getTopic();

     @KafkaListener(topics = "#{'${envProperties.getTopic()}'}", groupId = "group_id")
     public void consume(String message) {
        logger.info("Consuming messages " +envProperties.getTopic());
     }
}

我在 topics = "#{'${envProperties.getTopic()}'}" 行收到错误,应用程序无法启动。

如何从环境变量动态设置此主题名称?

最佳答案

通常,您不能引用声明 SpEL 的 bean 中的字段或属性。但是,@KafkaListener 有特殊的语法来支持它。

See the documentation .

Starting with version 2.1.2, the SpEL expressions support a special token __listener which is a pseudo bean name which represents the current bean instance within which this annotation exists.

因此,如果您将 public EnvProperties getEnvProperties() 添加到类中,则类似

#{__listener.envProperties.topic}

应该可以。

关于java - 如何从环境变量将动态主题名称传递给@KafkaListener(topics),我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/54038928/

相关文章:

java - Spring Data Neo4j 未能通过 junit 测试设置

java - 我如何找到验证我的 JDK 安全提供程序的加密强度?

java - Spring Boot 使用 1.5.6 发布重新打包的子模块 jar

java - 忽略在 yaml 配置中为 spring boot admin 应用程序指定的斜杠字符

java - 在 Kafka 中使用实时消息

尝试从 aws lambda 处理函数调用 SDN 时出现 java.lang.NullPointerException

java - 创建表后立即插入行 - Oracle

java - 如何在 Spring Boot 中设置基于预身份验证 header 的身份验证?

node.js - kafka-node 异步消费者处理程序

apache-kafka - 卡夫卡消费者 : Want to read same message again if not committed previous messages offset and auto commit is disabled