java - 如何使用工厂为特定主题配置 Spring Kafka Listener?

标签 java spring apache-kafka kafka-consumer-api spring-kafka

我希望能够通过属性读取主题,而无需在 Kafka 监听器注释上指定任何内容。不使用 Spring Boot。

我尝试通过“topics”键直接从属性对象读取主题。这会产生错误: IllegalStateException:必须提供 topics、topicPattern 或 topicPartitions。

// some class
@KafkaListener
public void listener(List<String> messages) {
  System.out.print(messages);
}

//some other class
@Bean
public ConsumerFactory<String, String> consumerFactory(Properties topicProp) {
  return new DefaultKafkaConsumerFactory(topicProp);
}

@Bean
public ConcurrentKafkaListenerContainerFactory<String, String> kafkaListenerContainerFactory() {
  ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<>();

  Properties prop = new Properties();
  prop.setProperty("topics", "my-custom-topic");

  factory.setConsumerFactory(this.consumerFactory(prop));
  return factory;
}

Is this possible?

最佳答案

您可以在主题中引用其他bean(或bean上的方法)

@Bean
public String topicName() {
    return "my-custom-topic";
}

...

@KafkaListener(topics = "#{@topicName}")
...

@KafkaListener(topics = "#{@someBean.someMethod()}")

关于java - 如何使用工厂为特定主题配置 Spring Kafka Listener?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/57064593/

相关文章:

java - Spring 显示用户刚刚上传的图片,无需重启服务器

java - 不要坚持主题卡夫卡

java - JNI 中的回调帮助

java - 如何将 H2 控制台连接到嵌入式 Spring H2 DB

java - 来自客户端 Java 端的 GWT 查询

java - 如何读取flink中的前N条kafka消息?

apache-spark - spark.streaming.kafka.maxRatePerPartition 如何与 spark.streaming.backpressure.enabled incase 与 Kafka 进行 Spark 流相关?

java - 我有一个 if 语句问题

java - 如何在不使用@transactional注解的情况下进行事务管理

java.lang.NoClassDefFoundError : org/apache/http/impl/conn/PoolingClientConnectionManager 错误