java - o.apache.kafka.clients.NetworkClient - 引导代理 <主机名> :9092 disconnected

标签 java spring spring-boot spring-kafka

我正在尝试使用 Spring Kafka Consumer 来消费来自 Kafka 主题的消息。但是我看到以下错误。当我使用本地计算机中设置的 kafka 主题的消息时,这工作得很好 -

[org.springframework.kafka.KafkaListenerEndpointContainer#0-0-C-1] 警告 o.apache.kafka.clients.NetworkClient - Bootstrap 代理 <hostname>:9092已断开连接

我可以使用命令行读取消息

./kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic ctp_verbose_amcs --from-beginning --zookeeper localhost:2181

代码

@EnableKafka
@Configuration
public class KafkaConsumerConfig {

    private static Logger logger = LoggerFactory.getLogger(KafkaConsumerConfig.class);

    @Value(value = "${kafka.bootstrapAddress:localhost:9092}")
    private String bootstrapAddress;

    @Value(value = "${groupId:amcs-tas}")
    private String groupId;

    @Bean
    public ConsumerFactory<String, Map<String, Object>> consumerFactory() {
        Map<String, Object> props = new HashMap<>();
        props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapAddress);
        props.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);
        return new DefaultKafkaConsumerFactory<>(props, new StringDeserializer(), new ConciseMessageDeserializer());
    }

    @Bean
    public ConcurrentKafkaListenerContainerFactory<String, Map<String, Object>> kafkaListenerContainerFactory() {
        ConcurrentKafkaListenerContainerFactory<String, Map<String, Object>> factory = new ConcurrentKafkaListenerContainerFactory<>();
        factory.setConsumerFactory(consumerFactory());
        return factory;
    }
}

服务器日志中的错误消息

[2017-09-20 14:33:44,448] ERROR Closing socket for <hostname>:9092-10.251.127.31:51014 because of error (kafka.network.Processor)
kafka.network.InvalidRequestException: Error getting request for apiKey: 3 and apiVersion: 2
        at kafka.network.RequestChannel$Request.liftedTree2$1(RequestChannel.scala:95)
        at kafka.network.RequestChannel$Request.<init>(RequestChannel.scala:87)
        at kafka.network.Processor$$anonfun$processCompletedReceives$1.apply(SocketServer.scala:488)
        at kafka.network.Processor$$anonfun$processCompletedReceives$1.apply(SocketServer.scala:483)
        at scala.collection.Iterator$class.foreach(Iterator.scala:893)
        at scala.collection.AbstractIterator.foreach(Iterator.scala:1336)
        at scala.collection.IterableLike$class.foreach(IterableLike.scala:72)
        at scala.collection.AbstractIterable.foreach(Iterable.scala:54)
        at kafka.network.Processor.processCompletedReceives(SocketServer.scala:483)
        at kafka.network.Processor.run(SocketServer.scala:413)
        at java.lang.Thread.run(Thread.java:745)
Caused by: java.lang.IllegalArgumentException: Invalid version for API key 3: 2
        at org.apache.kafka.common.protocol.ProtoUtils.schemaFor(ProtoUtils.java:31)
        at org.apache.kafka.common.protocol.ProtoUtils.requestSchema(ProtoUtils.java:44)
        at org.apache.kafka.common.protocol.ProtoUtils.parseRequest(ProtoUtils.java:60)
        at org.apache.kafka.common.requests.MetadataRequest.parse(MetadataRequest.java:96)
        at org.apache.kafka.common.requests.AbstractRequest.getRequest(AbstractRequest.java:48)
        at kafka.network.RequestChannel$Request.liftedTree2$1(RequestChannel.scala:92)
        ... 10 more

最佳答案

Caused by: java.lang.IllegalArgumentException: Invalid version for API key 3: 2

您的客户端版本与代理版本不兼容。

参见the project page底部的兼容性矩阵。

关于java - o.apache.kafka.clients.NetworkClient - 引导代理 <主机名> :9092 disconnected,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/46324067/

相关文章:

java - 如何在不使用任何 String、StringBuilder 的情况下打印最多 1000 个的所有回文?

java - 在 WebSocket JSR 356 中获取原始消息而不是字符串

java - spring data JPA CriteriaBuilder.isNull 返回语法错误

java - 如何在 Jpa 实体中使用 Java 继承

java - Spring Boot @RestController 和 @Controller

java - 如何改进这个 Java 方法?

Java Regex - 使用别名更改路径

java - Spring JDBC 不使用 log4j 记录 SQL

java - 以太坊区 block 链,带有用于非支付应用程序的java api

java - 使用 Spring-security 进行密码加密和解密