apache-kafka - Kafka消费者轮询和重新连接

标签 apache-kafka long-polling kafka-consumer-api

我们刚刚开始在我们的项目中使用 Kafka。我们正在使用 kafka_2.11-0.9.0.0。我有一些与 KafkaConsumer 相关的问题。

1) 我在启动 Zookeeper 和 Kafka 服务器之前启动了 Kafka Consumer,但我的 KafkaConsumer 客户端仍然能够连接。我有以下代码行

    Consumer<String, String> consumer =  new KafkaConsumer<String,String>(props);
    consumer.subscribe(getConsumerRegisteredTopics());
    while (true) {
        ConsumerRecords<String, String> records = consumer.poll(Long.MAX_VALUE);
        for (ConsumerRecord<String, String> record : records){
           processRecord (record)
        }
   }  

2) 我读到,Zookeeper 通过使用 poll(long timeout) 方法调用来跟踪事件的 Consumer。如果我使用 Long.MAX_VALUE 在 poll() 中超时,zookeeper 将如何跟踪我的消费者。你能帮我理解 KafkaConsumer 轮询调用的行为吗。

提前致谢。

最佳答案

1) 如果您在启动消费者之前没有启动 zookeeper 和 kafka,它无法连接,但会尝试从 kafka 读取元数据。我的经验是,KafkaConsumer 的“轮询”调用将不确定地阻塞,直到它能够连接和读取元数据。换句话说……您的消费者实际上并没有连接,而是在等待 kafka 集群出现。

2) 轮询超时告诉消费者等待多长时间才能返回任何数据。您必须确保在轮询返回后尽快再次调用轮询以使您的消费者保持活跃。给轮询调用的超时与 KafkaConsumer 的保持事件机制无关(这由您的消费者的消费者属性的 session.timeout.ms 属性控制).

关于apache-kafka - Kafka消费者轮询和重新连接,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/38597037/

相关文章:

java - 如何只打印kafka记录值而不打印所有其他数据?

c - librdkafka C API Kafka 消费者无法正确读取所有消息

apache-kafka - 如何在没有 kill 命令的情况下安全地重启 kafka-manager?

scala - KeeperErrorCode = NoNode for/brokers/topics/test-topic/partitions

apache-kafka - 如何在 kubernetes 上为 Kafka 多代理设置指定广告监听器并对外公开集群?

ssl - WARN 发送 SSL 关闭消息失败(Kafka SSL 配置问题)

android - 使用改造和 kotlin channel 实现长轮询

php - 使用 Laravel 的实时用户消息

python - Python(和 Flask)中的非消息队列/简单长轮询

apache-kafka - 卡夫卡消费者卡在(重新)加入组中