java - 获取相关 ID 为 92 的元数据时出错 : {myTest=UNKNOWN_TOPIC_OR_PARTITION}

标签 java apache-kafka kafka-producer-api

我创建了一个示例应用程序来检查我的生产者的代码。当我在没有分区键的情况下发送数据时,我的应用程序运行良好。但是,在指定数据分区的键时,我收到错误:

[kafka-producer-network-thread | producer-1] WARN org.apache.kafka.clients.NetworkClient - [Producer clientId=producer-1] Error while fetching metadata with correlation id 37 : {myTest=UNKNOWN_TOPIC_OR_PARTITION}
[kafka-producer-network-thread | producer-1] WARN org.apache.kafka.clients.NetworkClient - [Producer clientId=producer-1] Error while fetching metadata with correlation id 38 : {myTest=UNKNOWN_TOPIC_OR_PARTITION}
[kafka-producer-network-thread | producer-1] WARN org.apache.kafka.clients.NetworkClient - [Producer clientId=producer-1] Error while fetching metadata with correlation id 39 : {myTest=UNKNOWN_TOPIC_OR_PARTITION}

对于消费者和生产者来说。我在互联网上搜索了很多,他们建议验证 kafka.acl 设置。我在 HDInsight 上使用kafka,但我不知道如何验证它并解决此问题。

我的集群具有以下配置:

  1. 头节点:2
  2. 工作节点:4
  3. 动物园管理员:3

我的生产者代码:

public static void produce(String brokers, String topicName) throws IOException{

    // Set properties used to configure the producer
    Properties properties = new Properties();
      // Set the brokers (bootstrap servers)
    properties.setProperty("bootstrap.servers", brokers);
    properties.setProperty(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
  properties.setProperty(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());

    // specify the protocol for Domain Joined clusters

    //To create an Idempotent Producer
    properties.setProperty(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, "true");
    properties.setProperty(ProducerConfig.ACKS_CONFIG, "all");
    properties.setProperty(ProducerConfig.RETRIES_CONFIG, Integer.toString(Integer.MAX_VALUE));
    properties.setProperty(ProducerConfig.TRANSACTIONAL_ID_CONFIG, "test-transactional-id"); 
    KafkaProducer<String, String> producer = new KafkaProducer<>(properties);
    producer.initTransactions();
    // So we can generate random sentences
    Random random = new Random();
    String[] sentences = new String[] {
            "the cow jumped over the moon",
            "an apple a day keeps the doctor away",
            "four score and seven years ago",
            "snow white and the seven dwarfs",
            "i am at two with nature",
         };


    for(String sentence: sentences){
        // Send the sentence to the test topic
        try
        {
            String key=sentence.substring(0,2);
            producer.beginTransaction();
            producer.send(new ProducerRecord<String, String>(topicName,key,sentence)).get();
        }
        catch (Exception ex)
        {
          System.out.print(ex.getMessage());
            throw new IOException(ex.toString());
        }
        producer.commitTransaction();
    }
}

此外,我的主题由 3 个分区组成,复制因子=3

最佳答案

我将复制因子设置为小于分区数量,这对我有用。这对我来说听起来很奇怪,但是是的,它在之后就开始工作了。

关于java - 获取相关 ID 为 92 的元数据时出错 : {myTest=UNKNOWN_TOPIC_OR_PARTITION},我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/61217084/

相关文章:

java - 应用程序在尝试读取联系人时挂起

java - 从其他类调用java swing控件和容器

java - 如何使用带有 while 循环的方法

java - 使用kafka的双向消息系统

java - kafka 在客户端 java 中覆盖 advertised.host.name

java - GSON解析错误:Expected BEGIN_OBJECT but was STRING when response value is null

apache-kafka - max.poll.records 如何影响消费者投票

ruby - Logstash-输入文件插件以将数据保留在内存中

java - 当我使用 Apache Storm 运行 JAR 时,为什么会引发 java.lang.NoClassDefFoundError ? (org/apache/storm/kafka/spout/KafkaSpoutConfig)

java - Logstash kafka 输入插件无法通过新消费者读取任何消息,并且无法将 auto_offset_reset 设置为最早