apache-kafka - Spring Kafka 轮询,@KafkaListener 和监听器确认模式设置为记录

标签 apache-kafka kafka-consumer-api spring-kafka

我正在使用 @KafkaListener 和 ConcurrentKafkaListenerContainerFactory 来收听 3 个 kafka 主题,每个主题有 10 个分区。我对它的工作原理有几个问题。

    ConcurrentKafkaListenerContainerFactory<String, String> kafkaListenerContainerFactory(
            ConsumerFactory<String, String> consumerFactory) {
        ConcurrentKafkaListenerContainerFactory<String, String> factory =
                new ConcurrentKafkaListenerContainerFactory<>();
        factory.setConsumerFactory(consumerFactory);
        factory.setConcurrency(30);
        factory.getContainerProperties().setSyncCommits(true);
        return factory;
    }
    @KafkaListener(topics = "topic1", containerFactory="kafkaListenerContainerFactory")
    public void handleMessage(final ConsumerRecord<Object, String> arg0) throws Exception {
    }
    @KafkaListener(topics = "topic2", containerFactory="kafkaListenerContainerFactory")
    public void handleMessage(final ConsumerRecord<Object, String> arg0) throws Exception {
    }
    @KafkaListener(topics = "topic3", containerFactory="kafkaListenerContainerFactory")
    public void handleMessage(final ConsumerRecord<Object, String> arg0) throws Exception {
    }

我的 listener.ackmode 返回并且 enable.auto.commit 设置为 false 和 partition.assignment.strategy: org.apache.kafka.clients.consumer.RoundRobinAssignor

1)我对并发的理解是,因为我将并发(在工厂级别)设置为 30,并且我总共有 30 个分区(所有三个主题一起)要读取,每个线程将被分配一个分区。我的理解正确吗?如果我在 @KafkaListener 注释中再次覆盖并发,它会产生什么影响?

2) spring 调用 poll() 方法时,是否从所有三个主题进行轮询?

3) 由于我将 listener.ackmode 设置为返回,它是否会等到所有在单个 poll() 中返回的记录完成后再发出下一个 poll()?如果我的记录处理时间超过 max.poll.interval.ms 会怎样?假设在单个 poll() 调用中返回 1-100 个偏移量,并且我的代码只能在 max.poll.interval.ms 被命中之前处理 50,此时将发出另一个轮询,因为它已经命中 max.poll .interval.ms?如果是这样,下一个 poll() 会从偏移量 51 返回记录吗?

非常感谢您的时间和帮助

最佳答案

my listener.ackmode is return



没有这样的ackmode;由于您没有在出厂时设置它,因此您的实际确认模式是 BATCH(默认值)。要使用 ack 模式记录(如果这就是您的意思),您必须如此配置工厂容器属性。

my understanding about concurrency is ...



您的理解有误;并发不能大于具有最多分区的主题中的分区数 (如果监听器监听多个主题) 。由于每个主题中只有 10 个分区,因此您的实际并发数为 10。

覆盖监听器上的 concurrency 只会覆盖出厂设置;您总是需要至少与并发一样多的分区。

When spring call the poll() method, does it poll from all three topics?



不是那种配置;您有 3 个并发容器,每个容器有 30 个消费者在监听一个主题。您有 90 个消费者。

如果所有 3 个主题都有一个监听器,则投票将返回所有 3 个主题的记录;但是您仍然可能有 20 个空闲的消费者,这取决于分区分配器如何分配分区 - 请参阅日志“分配的分区”以了解分区的确切分配方式。循环分配器应该可以分配它们。

will spring issue another poll at this time



Spring 无法控制 - 如果您花费的时间太长,则消费者线程在监听器中 - 消费者不是线程安全的,因此我们无法发出异步轮询。

必须 处理 max.poll.records 中的 max.poll.interval.ms 以避免 Kafka 重新平衡分区。

ack 模式没有区别;这一切都是关于及时处理民意调查的结果。

关于apache-kafka - Spring Kafka 轮询,@KafkaListener 和监听器确认模式设置为记录,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/58681876/

相关文章:

java - 在 Spring boot kafka 上添加分区需要一些时间才能生效

java - 如何从 Java 中 Kafka 的简单消费者返回的 ByteBufferMessageSet 中获取 OffSet?

java - 在 Kafka 中重播消息

apache-kafka - 保持两个 kafka 集群同步的最佳方法是什么

java - Confluence Cloud Apache Kafka Consumer - 主题 [test-1] 不存在且 MissingTopicsFatal 为 true

java - 使用 strimzi 在 Openshift 上设置 Kafka

java - Elasticsearch异常: mappping depth in index has been exceeded

java - Kafka消费者集群环境偏移

apache-kafka - 计算存储在 kafka 主题中的消息数

java - Kafka 消费者需要很长的轮询持续时间