java - Apache Kafka LEADER_NOT_AVAILABLE

标签 java queue apache-kafka

我遇到了一个我不理解的 apache Kafka 问题。我在我的经纪人中订阅了一个名为“topic-received”的主题。这是代码:

protected String readResponse(final String idMessage) {
    if (props != null) {
        kafkaClient = new KafkaConsumer<>(props);
        logger.debug("Subscribed to topic-received");
        kafkaClient.subscribe(Arrays.asList("topic-received"));
        logger.debug("Waiting for reading : topic-received");
        ConsumerRecords<String, String> records =    
                       kafkaClient.poll(kafkaConfig.getRead_timeout());

        if (records != null) {
            for (ConsumerRecord<String, String> record : records) {
                logger.debug("Resultado devuelto : "+record.value());
                return record.value();
            }
        }
    }
    return null;
}

当发生这种情况时,我从另一个点向“topic-received”发送消息。代码如下:

private void sendMessageToKafkaBroker(String idTopic, String value) {
    Producer<String, String> producer = null;
    try {
        producer = new KafkaProducer<String, String>(mapProperties());
        ProducerRecord<String, String> producerRecord = new 
                   ProducerRecord<String, String>("topic-received", value);
        producer.send(producerRecord);
        logger.info("Sended value "+value+" to topic-received");
    } catch (ExceptionInInitializerError eix) {
        eix.printStackTrace();
    } catch (KafkaException ke) {
        ke.printStackTrace();
    } finally {
        if (producer != null) {
            producer.close();
        }
    }
}

我第一次尝试使用主题“topic-received”时,我收到这样的警告

"WARN 13164 --- [nio-8085-exec-3] org.apache.kafka.clients.NetworkClient   :  
 Error while fetching metadata with correlation id 1 : {topic-  
 received=LEADER_NOT_AVAILABLE}"

但是如果我再试一次,对于这个主题“topic-received”,工作正常,并且没有出现警告。无论如何,这对我没有用,因为我每次都必须从一个主题中收听并发送到一个新主题(由字符串标识符引用,例如:.. 12Erw45-2345Saf-234DASDFasd)

在 google 中寻找 LEADER_NOT_AVAILABLE,有些人谈论将下一行添加到 server.properties 中:

host.name=127.0.0.1
advertised.port=9092
advertised.host.name=127.0.0.1

但它对我不起作用(不知道为什么)。

我尝试使用以下代码在所有这些过程之前创建主题:

 private void createTopic(String idTopic) {
    String zookeeperConnect = "localhost:2181";
    ZkClient zkClient = new ZkClient(zookeeperConnect,10000,10000, 
    ZKStringSerializer$.MODULE$);
    ZkUtils zkUtils = new ZkUtils(zkClient, new 
    ZkConnection(zookeeperConnect),false);
    if(!AdminUtils.topicExists(zkUtils,idTopic)) {
        AdminUtils.createTopic(zkUtils, idTopic, 2, 1, new Properties(), 
    null);
        logger.debug("Created topic "+idTopic+" by super user");
    }
    else{
        logger.debug("topic "+idTopic+" already exists");
    }
  }

没有错误,但它仍然会一直监听直到超时。

我已经查看了代理的属性以检查是否有任何帮助,但我还没有找到足够清楚的东西。我用来阅读的 Prop 是:

    props = new Properties();
    props.put("bootstrap.servers", kafkaConfig.getBootstrap_servers());
    props.put("key.deserializer", kafkaConfig.getKey_deserializer());
    props.put("value.deserializer", kafkaConfig.getValue_deserializer());
    props.put("key.serializer", kafkaConfig.getKey_serializer());
    props.put("value.serializer", kafkaConfig.getValue_serializer());
    props.put("group.id",kafkaConfig.getGroupId());

并且,用于发送...

   Properties props = new Properties();
    props.put("bootstrap.servers", kafkaConfig.getHost() + ":" + 
    kafkaConfig.getPort());
    props.put("group.id", kafkaConfig.getGroup_id());
    props.put("enable.auto.commit", kafkaConfig.getEnable_auto_commit());
    props.put("auto.commit.interval.ms", 
    kafkaConfig.getAuto_commit_interval_ms());
    props.put("session.timeout.ms", kafkaConfig.getSession_timeout_ms());
    props.put("key.deserializer", kafkaConfig.getKey_deserializer());
    props.put("value.deserializer", kafkaConfig.getValue_deserializer());
    props.put("key.serializer", kafkaConfig.getKey_serializer());
    props.put("value.serializer", kafkaConfig.getValue_serializer());

有什么线索吗?为什么我必须使用来自代理和主题的消息的唯一方法是在出错后重复请求?

提前致谢

最佳答案

当尝试向不存在的主题生成消息时会发生这种情况

请注意:在某些 Kafka 安装中,框架可以在主题不存在时自动创建主题,这解释了为什么您在一开始只看到一次该问题。

关于java - Apache Kafka LEADER_NOT_AVAILABLE,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/39718975/

相关文章:

java - 为什么 Direct ByteBuffer 在 HornetQ 服务器上不断增加导致 OOM?

java - 使用 NTLM 身份验证在 ja​​va 中访问 Sharepoint 列表

java - 该 Activity 已经有一个由窗口装饰提供的操作栏。更新android studio后

c - 这个简单的队列实现有什么问题吗?

apache-zookeeper - Zookeeper 具有 KeeperException 但 Kafka 能够创建主题并生产/消费

javascript - 如何使用kafka-node从主题读取数据?

java - 如何将 List<String> 转换为 List<Integer>?

php - 如何搭建一个带有倒计时功能的mysql排队系统?

php - 如何为 Laravel 5 设置数据库队列驱动程序?

apache-kafka - Kafka 将单个日志事件行聚合为组合日志事件