java - 卡夫卡重复读

标签 java spring apache-kafka kafka-consumer-api spring-kafka

我正在为我的项目使用 Kafka 版本 0.10.2.1Spring boot

我有一个主题的 5 个分区,可供在不同计算机上运行的多个使用者(具有相同的 Group-Id)使用。

我面临的问题是:

我使用这些 Kafka 警告日志重复读取一条消息

组 my-consumer-group 的自动偏移提交失败:由于该组已重新平衡并将分区分配给另一个成员,因此无法完成提交。这意味着后续调用 poll() 之间的时间比配置的 max.poll.interval.ms 长,这通常意味着 poll 循环花费了太多时间处理消息。您可以通过增加 session 超时或使用 max.poll.records 减少 poll() 中返回的批处理的最大大小来解决此问题。

日志显示该问题是由于Kafka Consumer提交失败造成的。

以下是有关我的用例的一些详细信息:

  • 我的主题 My-Topic 有多个使用者,它们属于同一组 ID my-consumer-group

  • 消费者使用来自 Kafka 的消息,应用业务逻辑并将处理后的数据存储在 Cassandra

  • 从 Kafka 消费消息、应用业务逻辑然后将其保存到 Cassandra 的过程从 Kafka 消费的每条消息大约需要 10 毫秒

我正在使用以下代码创建 Kafka-consumer bean

@Configuration
@EnableKafka
public class KafkaConsumer {
    @Value("${spring.kafka.bootstrap-servers}")
    private String brokerURL;

    @Value("${spring.kafka.session.timeout}")
    private int sessionTimeout;

    @Value("${spring.kafka.consumer.my-group-id}")
    private String groupId;

    @Value("${spring.kafka.listener.concurrency}")
    private int concurrency;

    @Value("${spring.kafka.listener.poll-timeout}")
    private int timeout;

    @Value("${spring.kafka.consumer.enable-auto-commit}")
    private boolean autoCommit;

    @Value("${spring.kafka.consumer.auto-commit-interval}")
    private String autoCommitInterval;

    @Value("${spring.kafka.consumer.auto-offset-reset}")
    private String autoOffsetReset;

    @Bean
    KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<String, String>> kafkaListenerContainerFactory() {
        ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<>();
        factory.setConsumerFactory(consumerFactory());
        factory.setConcurrency(concurrency);
        factory.getContainerProperties().setPollTimeout(timeout);
        return factory;
    }

    @Bean
    public ConsumerFactory<String, String> consumerFactory() {
        return new DefaultKafkaConsumerFactory<>(consumerConfigs());
    }

    @Bean
    public Map<String, Object> consumerConfigs() {
        Map<String, Object> propsMap = new HashMap<>();
        propsMap.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, brokerURL);
        propsMap.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, autoCommit);
        propsMap.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, autoCommitInterval);
        propsMap.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, sessionTimeout);
        propsMap.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
        propsMap.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
        propsMap.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);
        propsMap.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, autoOffsetReset);
        return propsMap;
    }
} 

这些是我正在使用的 kafka 配置

spring.kafka.listener.concurrency=2
spring.kafka.listener.poll-timeout=3000
spring.kafka.consumer.auto-commit-interval=1000
spring.kafka.consumer.enable-auto-commit=true
spring.kafka.consumer.auto-offset-reset=earliest
spring.kafka.session.timeout=50000
spring.kafka.connection.timeout=10000
spring.kafka.topic.partition=5
spring.kafka.message.replication=2

我主要担心的是属于同一消费者组的多个 Kafka 消费者重复读取消息,在我的应用程序中,我必须避免重复输入数据库。

您能否帮我检查一下我的上述 Kafka 配置和 Kafka-consumer-code,以便我可以避免重复读取。

最佳答案

简单的答案是不要使用autoCommit - 它会按计划提交。

相反,让容器进行提交;使用AckMode RECORD

但是,您仍然应该使您的代码具有幂等性 - 始终存在重新交付的可能性;只是提交策略越可靠,概率会越小。

关于java - 卡夫卡重复读,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/45470749/

相关文章:

java - Maven 安装构建错误 - 依赖项路径

java - 使用 Spring Cloud Stream 的 Kafka Streams 进程中的 Serde 错误

java - maprsteam 与 spring 集成 java 客户端

java - 为什么我的 Spring @Autowired 字段为空?

java - 在 Tomcat 上的 Web 应用程序中使用 Batik 时出现 "SAX2 driver class org.apache.crimson.parser.XMLReaderImpl not found"

java - 如何禁用碎片 : Tomcat8 SSL return 2 reassembled SSL segments

java - 更新 java.sql.Timestamp

java - Spring,@Transactional 和 Hibernate 延迟加载

spring - 在服务器:"INFO: Initializing Spring root WebApplicationContext"中部署spring应用程序时Tomcat崩溃

spring-boot - 使用 Spring Cloud Stream kafka 动态更改实例索引