java - 从 Java API 创建 Kafka 主题

标签 java apache-kafka kafka-producer-api

我正在尝试使用 Java API 创建 Kafka 主题,但无法获取 LEADER。

代码:

int partition = 0;
        ZkClient zkClient = null;
        try {
            String zookeeperHosts = "localhost:2181"; // If multiple zookeeper then -> String zookeeperHosts = "192.168.20.1:2181,192.168.20.2:2181";
            int sessionTimeOutInMs = 15 * 1000; // 15 secs
            int connectionTimeOutInMs = 10 * 1000; // 10 secs

            zkClient = new ZkClient(zookeeperHosts, sessionTimeOutInMs, connectionTimeOutInMs, ZKStringSerializer$.MODULE$);

            String topicName = "mdmTopic5";
            int noOfPartitions = 2;
            int noOfReplication = 1;
            Properties topicConfiguration = new Properties();
            AdminUtils.createTopic(zkClient, topicName, noOfPartitions, noOfReplication, topicConfiguration);

        } catch (Exception ex) {
            ex.printStackTrace();
        } finally {
            if (zkClient != null) {
                zkClient.close();
            }
        }

错误:

[2017-10-19 12:14:42,263] WARN Error while fetching metadata with correlation id 1 : {mdmTopic5=LEADER_NOT_AVAILABLE} (org.apache.kafka.clients.NetworkClient)
[2017-10-19 12:14:42,370] WARN Error while fetching metadata with correlation id 3 : {mdmTopic5=LEADER_NOT_AVAILABLE} (org.apache.kafka.clients.NetworkClient)
[2017-10-19 12:14:42,479] WARN Error while fetching metadata with correlation id 4 : {mdmTopic5=LEADER_NOT_AVAILABLE} (org.apache.kafka.clients.NetworkClient)

Kafka 0.11.0.1支持AdminUtils吗????请让我知道如何在此版本中创建主题。

提前致谢。

最佳答案

自 Kafka 0.11 起,就有了一个用于创建(和删除)主题的适当的管理 API,我建议使用它而不是直接连接到 Zookeeper。

参见 AdminClient.createTopics(): http://kafka.apache.org/0110/javadoc/org/apache/kafka/clients/admin/AdminClient.html#createTopics(java.util.Collection)

关于java - 从 Java API 创建 Kafka 主题,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/46824140/

相关文章:

java - 如何使用Java中的函数列表比较两个对象的字段并对结果求和

node.js - NestJs EventBus 在 EventHandler 复制事件

java - 终止期间线程延迟

java - SQL 插入 ORA-00911 : invalid character

apache-kafka - 无法删除 kafka 主题

apache-spark - Spark 和 Zookeeper HA 的多个 Master

apache-kafka - 如何修复 kafka.common.errors.TimeoutException : Expiring 1 record(s) xxx ms has passed since batch creation plus linger time

apache-kafka - 我可以根据Kafka中的特定条件进行消费吗?

go - 如何通过 Kafka 生产者发送 Protobuf 消息

java - Hibernate GWT 集成抛出 "java.lang.NoSuchMethodError: javax.persistence.OneToMany.orphanRemoval()Z"