我正在使用 @KafkaListener 和 ConcurrentKafkaListenerContainerFactory 来收听 3 个 kafka 主题,每个主题有 10 个分区。我对它的工作原理有几个问题。
ConcurrentKafkaListenerContainerFactory<String, String> kafkaListenerContainerFactory(
ConsumerFactory<String, String> consumerFactory) {
ConcurrentKafkaListenerContainerFactory<String, String> factory =
new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(consumerFactory);
factory.setConcurrency(30);
factory.getContainerProperties().setSyncCommits(true);
return factory;
}
@KafkaListener(topics = "topic1", containerFactory="kafkaListenerContainerFactory")
public void handleMessage(final ConsumerRecord<Object, String> arg0) throws Exception {
}
@KafkaListener(topics = "topic2", containerFactory="kafkaListenerContainerFactory")
public void handleMessage(final ConsumerRecord<Object, String> arg0) throws Exception {
}
@KafkaListener(topics = "topic3", containerFactory="kafkaListenerContainerFactory")
public void handleMessage(final ConsumerRecord<Object, String> arg0) throws Exception {
}
我的 listener.ackmode 返回并且 enable.auto.commit 设置为 false 和 partition.assignment.strategy: org.apache.kafka.clients.consumer.RoundRobinAssignor
1)我对并发的理解是,因为我将并发(在工厂级别)设置为 30,并且我总共有 30 个分区(所有三个主题一起)要读取,每个线程将被分配一个分区。我的理解正确吗?如果我在 @KafkaListener 注释中再次覆盖并发,它会产生什么影响?
2) spring 调用 poll() 方法时,是否从所有三个主题进行轮询?
3) 由于我将 listener.ackmode 设置为返回,它是否会等到所有在单个 poll() 中返回的记录完成后再发出下一个 poll()?如果我的记录处理时间超过 max.poll.interval.ms 会怎样?假设在单个 poll() 调用中返回 1-100 个偏移量,并且我的代码只能在 max.poll.interval.ms 被命中之前处理 50,此时将发出另一个轮询,因为它已经命中 max.poll .interval.ms?如果是这样,下一个 poll() 会从偏移量 51 返回记录吗?
非常感谢您的时间和帮助
最佳答案
my listener.ackmode is return
没有这样的ackmode;由于您没有在出厂时设置它,因此您的实际确认模式是 BATCH(默认值)。要使用 ack 模式记录(如果这就是您的意思),您必须如此配置工厂容器属性。
my understanding about concurrency is ...
您的理解有误;并发不能大于具有最多分区的主题中的分区数 (如果监听器监听多个主题) 。由于每个主题中只有 10 个分区,因此您的实际并发数为 10。
覆盖监听器上的
concurrency
只会覆盖出厂设置;您总是需要至少与并发一样多的分区。When spring call the poll() method, does it poll from all three topics?
不是那种配置;您有 3 个并发容器,每个容器有 30 个消费者在监听一个主题。您有 90 个消费者。
如果所有 3 个主题都有一个监听器,则投票将返回所有 3 个主题的记录;但是您仍然可能有 20 个空闲的消费者,这取决于分区分配器如何分配分区 - 请参阅日志“分配的分区”以了解分区的确切分配方式。循环分配器应该可以分配它们。
will spring issue another poll at this time
Spring 无法控制 - 如果您花费的时间太长,则消费者线程在监听器中 - 消费者不是线程安全的,因此我们无法发出异步轮询。
您 必须 处理
max.poll.records
中的 max.poll.interval.ms
以避免 Kafka 重新平衡分区。ack 模式没有区别;这一切都是关于及时处理民意调查的结果。
关于apache-kafka - Spring Kafka 轮询,@KafkaListener 和监听器确认模式设置为记录,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/58681876/