spring-kafka - Spring Kafka 2.2.2 Consumer仅返回一条记录而不是max-poll-records

标签 spring-kafka consumer

我们正在使用 Spring Kafka 2.2.2 版本,使用 @KafkaListener 和 ConcurrentKafkaListenerContainerFactory 从 Kafka 检索记录。我们已将 max-poll-records 配置为 5,但是它始终只向消费者提供列表中的 1 条记录,而不是 5 条记录。

虽然配置相同,但它可以在 Spring Kafka 2.1.4.Release 中运行。

这是我们的 application.yml 配置:

spring:
  kafka:
    bootstrap-servers: localhost:9092
    consumer:
      enable-auto-commit: false
      max-poll-records: 5
      bootstrap-servers: localhost:9092
      group-id: group_id
      key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
      value-deserializer: com.gap.cascade.li.data.xx.xx.CustomDeserialiser

这是我们的 ConcurrentKafkaListenerContainerFactory:

  @Bean
  public ConcurrentKafkaListenerContainerFactory<String, String> kafkaListenerContainerFactory() {
    ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<>();
    factory.setConsumerFactory(consumerFactory());
    factory.setBatchListener(true);

    return factory;
  }

我们是否缺少 Spring Kafka 2.2.2 版本需要完成的任何配置?

最佳答案

假设你有一个监听者

@KafkaListener(...)
public void listen(List<...> data) {
    ...
}

设置 factory.setBatchListener(true); 应该适合您(只要有多个记录准备就绪)。

您还可以使用启动属性

spring:
  kafka:
    listener:
      type: batch

做同样的事情;避免需要声明自己的工厂。

如果您打开 DEBUG 日志记录,容器将记录轮询返回的记录数。您还可以设置 fetch.min.bytesfetch.max.wait.ms 来影响如果只有一条记录立即准备就绪,则返回多少条记录...

spring:
  kafka:
    consumer:
      auto-offset-reset: earliest
      enable-auto-commit: false
      properties:
        fetch.min.bytes: 10000
        fetch.max.wait.ms: 2000
    listener:
      type: batch

顺便说一句,当前的 2.2.x 版本是 2.2.7(启动 2.1.6)。

关于spring-kafka - Spring Kafka 2.2.2 Consumer仅返回一条记录而不是max-poll-records,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/56724878/

相关文章:

java - 如何使用 consumer 和 supplier 代替 java 8 中的 Reflection

java - RabbitMQ - 只有一个队列,多个消费者接收不同的消息

java - 多次从 Kafka 读取同一条消息

python-3.x - Celery SQS 消费者上的填充不正确

spring-boot - 我可以在运行时向我的@kafkalistener 添加主题吗

spring - EmbeddedKafka如何在单元测试中检查收到的消息

rabbitmq - 从rabbitmq断开连接的方法

Python AsyncJsonWebSocketConsumer 问题

apache-kafka - 如何使用 spring-kafka 为监听器传递多个引导服务器

java - 访问外部库构造对象中的 Spring 应用程序上下文