java - 我如何使用 Kafka Consumer 的动态主题

标签 java spring-boot kafka-consumer-api

下面给出 Kafka 的配置来定位服务器 @ bean 公共(public) ConsumerFactory ConsumerFactory() { 映射配置 = new HashMap<>();

    config.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "127.0.0.1:9092");
    config.put(ConsumerConfig.GROUP_ID_CONFIG, "group_id");
    config.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
    config.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);

    return new DefaultKafkaConsumerFactory<>(config);
}


@Bean
public ConcurrentKafkaListenerContainerFactory<String, String> kafkaListenerContainerFactory() {
    ConcurrentKafkaListenerContainerFactory<String, String> factory = new 
    ConcurrentKafkaListenerContainerFactory();
    factory.setConsumerFactory(consumerFactory());
    return factory;
}



here I need to change the topic as a dynamic content
@KafkaListener(topics = "mytopic")
private void consume2(String Data) {
    String userID = null;
    String topic = null;
    String message = null;
    System.out.println(Data);
}

最佳答案

你有没有尝试过类似的事情

@KafkaListener(topics = "${kafka.topics}")

并将其设置为环境变量或应用程序属性?

环境变量KAFKA_TOPICS=mytopic application.properties kafka.topics=mytopic

关于java - 我如何使用 Kafka Consumer 的动态主题,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/58732155/

相关文章:

java - 将字符串的ArrayList对齐到(custom)Box中将无法正确定位

java - 如何将 JDBI 添加到 Spring Boot 应用程序中?

amazon-web-services - ECS 服务的 AWS 网络负载均衡器健康检查失败

java - 在Camel-Kafka中创建并发Kafka消费者

java - "package"和 "module"有什么区别?

java - Gradle 不会导入 bintray 依赖项,但不会抛出任何错误

java - nullpointerexception array = array.clone 不起作用?

java - javax.validation.constraints 的注释不起作用(忽略)

multithreading - Kafka 高级消费者 : Can partitions have multiple threads consuming it?

java - Java Kafka Consumer 在多线程中的使用