java - 在Java中创建之前检查kafka中是否存在主题

标签 java apache-kafka

我正在尝试使用以下方法在 kafka 0.8.2 中创建一个主题:

AdminUtils.createTopic(zkClient, myTopic, 2, 1, properties);

如果我在本地多次运行代码进行测试,则会失败,因为主题已经创建。有没有办法在创建主题之前检查主题是否存在? TopicCommand api 似乎没有为 listTopicsdescribeTopic 返回任何内容 .

最佳答案

您可以使用 kakfa-client 版本 0.11.0.0 的 AdminClient

示例代码:

    Properties config = new Properties();
    config.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, "localhist:9091");

    AdminClient admin = AdminClient.create(config);
    ListTopicsResult listTopics = admin.listTopics();
    Set<String> names = listTopics.names().get();
    boolean contains = names.contains("TEST_6");
    if (!contains) {
        List<NewTopic> topicList = new ArrayList<NewTopic>();
        Map<String, String> configs = new HashMap<String, String>();
        int partitions = 5;
        Short replication = 1;
        NewTopic newTopic = new NewTopic("TEST_6", partitions, replication).configs(configs);
        topicList.add(newTopic);
        admin.createTopics(topicList);
    }

关于java - 在Java中创建之前检查kafka中是否存在主题,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/30572023/

相关文章:

c# - 在 C# 或 Java 中抽象一个 id 属性

java - findViewById 无法识别

docker - 如何部署其他Kafka Broker(Docker镜像)

java - 如果某些 Kafka 节点时间偏移不同步,Spark 流作业会卡住

java - 使用Optional.of()方法进行方法链接是个好主意吗?

java - 如何断开 Spring Webflux 应用程序中的空闲客户端?

java - 中缀到后缀表达式中的结合性规则

nullpointerexception - NPE 同时反序列化 kafka 流中的 avro 消息

node.js - 我可以限制kafka Node 消费者的消费吗?

java - 由 : java. io.NotSerializedException : org. apache.kafka.clients. Producer.KafkaProducer 引起