java - 通过 Spring-Kafka 列出 Kafka 主题

标签 java apache-kafka kafka-consumer-api spring-kafka

我们想通过 spring-kafka 列出所有 Kafka 主题,以获得类似于 kafka 命令的结果:

bin/kafka-topics.sh --list --zookeeper localhost:2181

在下面的服务中运行 getTopics() 方法时,我们得到 org.apache.kafka.common.errors.TimeoutException: Timeout expired while fetching topic metadata

配置:

@EnableKafka
@Configuration
public class KafkaConfig {
    @Bean
    public ConsumerFactory<String, String> consumerFactory() {
        Map<String, Object> props = new HashMap<>();
        props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:2181");
        props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, 
            StringDeserializer.class);
        props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, 
            StringDeserializer.class);
        return new DefaultKafkaConsumerFactory<>(props);
}

服务:

@Service
public class TopicServiceKafkaImpl implements TopicService {
    @Autowired
    private ConsumerFactory<String, String> consumerFactory;

    @Override
    public Set<String> getTopics() {
        try (Consumer<String, String> consumer = 
            consumerFactory.createConsumer()) {
            Map<String, List<PartitionInfo>> map = consumer.listTopics();
            return map.keySet();
    }
}

Kafka 已启动并正在运行,我们可以从我们的应用程序成功向主题发送消息。

最佳答案

您可以使用 Admin Client 列出这样的主题

    Properties properties = new Properties();
    properties.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");

    AdminClient adminClient = AdminClient.create(properties);

    ListTopicsOptions listTopicsOptions = new ListTopicsOptions();
    listTopicsOptions.listInternal(true);

    System.out.println("topics:" + adminClient.listTopics(listTopicsOptions).names().get());

关于java - 通过 Spring-Kafka 列出 Kafka 主题,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/53527808/

相关文章:

java - 有没有办法向 InitialLdapContext 提供 SocketFactory_instance_?

java - 如何修复错误 : could not find the required version of the Java(TM) 2 runtime environment in '(null)'

java - 压缩日志在 Kafka 中保存多长时间?

java - 执行 kafka-console-consumer.sh 时,zookeeper 不是可识别的选项

java - KafkaConsumer assignment() 返回空

java - libsvm java 中的交叉验证准确性

java - 无法使用 Selenium WebDriver 清除自动输入清晰和模糊文本的文本区域

java - 卡夫卡 : How do I enable client logging?

java - 所有 kafka-pod 升级后,java 中的 kafka 消费者客户端无法重新连接到 kubernetes kafka 代理

kotlin - 使用自定义(ConsumerAware)错误处理程序时如何查找和提交?