我们正在使用 Spring Kafka 2.2.2 版本,使用 @KafkaListener 和 ConcurrentKafkaListenerContainerFactory 从 Kafka 检索记录。我们已将 max-poll-records 配置为 5,但是它始终只向消费者提供列表中的 1 条记录,而不是 5 条记录。
虽然配置相同,但它可以在 Spring Kafka 2.1.4.Release 中运行。
这是我们的 application.yml 配置:
spring:
kafka:
bootstrap-servers: localhost:9092
consumer:
enable-auto-commit: false
max-poll-records: 5
bootstrap-servers: localhost:9092
group-id: group_id
key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
value-deserializer: com.gap.cascade.li.data.xx.xx.CustomDeserialiser
这是我们的 ConcurrentKafkaListenerContainerFactory:
@Bean
public ConcurrentKafkaListenerContainerFactory<String, String> kafkaListenerContainerFactory() {
ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(consumerFactory());
factory.setBatchListener(true);
return factory;
}
我们是否缺少 Spring Kafka 2.2.2 版本需要完成的任何配置?
最佳答案
假设你有一个监听者
@KafkaListener(...)
public void listen(List<...> data) {
...
}
设置 factory.setBatchListener(true);
应该适合您(只要有多个记录准备就绪)。
您还可以使用启动属性
spring:
kafka:
listener:
type: batch
做同样的事情;避免需要声明自己的工厂。
如果您打开 DEBUG 日志记录,容器将记录轮询返回的记录数。您还可以设置 fetch.min.bytes
和 fetch.max.wait.ms
来影响如果只有一条记录立即准备就绪,则返回多少条记录...
spring:
kafka:
consumer:
auto-offset-reset: earliest
enable-auto-commit: false
properties:
fetch.min.bytes: 10000
fetch.max.wait.ms: 2000
listener:
type: batch
顺便说一句,当前的 2.2.x 版本是 2.2.7(启动 2.1.6)。
关于spring-kafka - Spring Kafka 2.2.2 Consumer仅返回一条记录而不是max-poll-records,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/56724878/