java - 从 Java API 创建 Kafka 主题

标签 java apache-kafka kafka-producer-api

我正在尝试使用 Java API 创建 Kafka 主题,但无法获取 LEADER。

代码:

int partition = 0;
        ZkClient zkClient = null;
        try {
            String zookeeperHosts = "localhost:2181"; // If multiple zookeeper then -> String zookeeperHosts = "192.168.20.1:2181,192.168.20.2:2181";
            int sessionTimeOutInMs = 15 * 1000; // 15 secs
            int connectionTimeOutInMs = 10 * 1000; // 10 secs

            zkClient = new ZkClient(zookeeperHosts, sessionTimeOutInMs, connectionTimeOutInMs, ZKStringSerializer$.MODULE$);

            String topicName = "mdmTopic5";
            int noOfPartitions = 2;
            int noOfReplication = 1;
            Properties topicConfiguration = new Properties();
            AdminUtils.createTopic(zkClient, topicName, noOfPartitions, noOfReplication, topicConfiguration);

        } catch (Exception ex) {
            ex.printStackTrace();
        } finally {
            if (zkClient != null) {
                zkClient.close();
            }
        }

错误:

[2017-10-19 12:14:42,263] WARN Error while fetching metadata with correlation id 1 : {mdmTopic5=LEADER_NOT_AVAILABLE} (org.apache.kafka.clients.NetworkClient)
[2017-10-19 12:14:42,370] WARN Error while fetching metadata with correlation id 3 : {mdmTopic5=LEADER_NOT_AVAILABLE} (org.apache.kafka.clients.NetworkClient)
[2017-10-19 12:14:42,479] WARN Error while fetching metadata with correlation id 4 : {mdmTopic5=LEADER_NOT_AVAILABLE} (org.apache.kafka.clients.NetworkClient)

Kafka 0.11.0.1支持AdminUtils吗????请让我知道如何在此版本中创建主题。

提前致谢。

最佳答案

自 Kafka 0.11 起,就有了一个用于创建(和删除)主题的适当的管理 API,我建议使用它而不是直接连接到 Zookeeper。

参见 AdminClient.createTopics(): http://kafka.apache.org/0110/javadoc/org/apache/kafka/clients/admin/AdminClient.html#createTopics(java.util.Collection)

关于java - 从 Java API 创建 Kafka 主题,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/46824140/

相关文章:

java - 获取 "java.lang.UnsupportedOperationException:"

java - .split() 和 [\\W] 创建一个额外的空字符串?

apache-kafka - Kafka在生产者与主题之间设置了压缩类型

kubernetes - 在这个例子中,如何捕获zookeeper-service的IP地址,并将其正确设置到需要它的kafka-broker中?

hadoop - 在Kafka中如果参数 "retries"设置为1或更多,那么会自动重试吗?

java - 卡夫卡流-TimeoutException : Expiring * record(s) for TOPIC:* ms has passed since batch creation

java - 缩放文件夹中的图像

java - 找不到 com.microsoft.sqlserver.jdbc.SQLServerDriver 错误

apache-kafka - kafka key 的最大长度是多少?

apache-kafka - Kafka生产者读取数据文件