java - Kafka Java Consumer SDK 长拉取而不使用 while

标签 java apache-kafka

我尝试使用 Kafka Java SDK 来实现消费者,但是我看到的大多数消费者示例都使用 while(true) 循环,并在循环内调用 consume 方法来获取一条消息。

while (true) {
            final ConsumerRecords<Long, String> consumerRecords =
                    consumer.poll(1000);
            if (consumerRecords.count()==0) {
                noRecordsCount++;
                if (noRecordsCount > giveUp) break;
                else continue;
            }
            consumerRecords.forEach(record -> {
                System.out.printf("Consumer Record:(%d, %s, %d, %d)\n",
                        record.key(), record.value(),
                        record.partition(), record.offset());
            });
            consumer.commitAsync();
        }

我想知道是否有任何优雅的方法可以在不使用 while 循环的情况下处理这个问题,这类似于以下 RabbitMQ 实现:

Consumer consumer = new DefaultConsumer(channel) {
      @Override
      public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body)
          throws IOException {
        String message = new String(body, "UTF-8");
        System.out.println(" [x] Received '" + message + "'");
      }
    };
    channel.basicConsume(QUEUE_NAME, true, consumer);

最佳答案

您可以尝试使用 Spring-kafka,它具有 @KafkaListener 注释并使方法监听主题,了解更多信息 here

因为在apache-kafka中没有优雅的方法来使方法作为主题的监听器,因为消费者需要在一定的时间间隔内轮询记录,需要循环中的代码

@KafkaListener(topics = "topicName", group = "foo")
public void listen(String message) {
System.out.println("Received Messasge in group foo: " + message);
}

关于java - Kafka Java Consumer SDK 长拉取而不使用 while,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/51814849/

相关文章:

apache-kafka - Apache Kafka 默认编码器不工作

java - http客户端错误

Java隐藏JFrame1和JFrame2,当JFrame0停用时?

java - 将keras模型加载到java程序以预测新输入

apache-kafka - 在kafka中发送同步消息?

java - 有没有办法从Kafka主题中获取最后一条消息?

java - Log4j2不同jar的不同日志

java - 如何通过其图形拖动选项卡?

apache-kafka - Kafka - 消耗直到空

java - 使用Python反序列化Java org.apache.kafka.common.serialization序列化对象