我正在为我的项目使用 Kafka 版本 0.10.2.1
和 Spring boot。
我有一个主题的 5 个分区,可供在不同计算机上运行的多个使用者(具有相同的 Group-Id)使用。
我面临的问题是:
我使用这些 Kafka 警告日志重复读取一条消息
组 my-consumer-group 的自动偏移提交失败:由于该组已重新平衡并将分区分配给另一个成员,因此无法完成提交。这意味着后续调用 poll() 之间的时间比配置的 max.poll.interval.ms 长,这通常意味着 poll 循环花费了太多时间处理消息。您可以通过增加 session 超时或使用 max.poll.records 减少 poll() 中返回的批处理的最大大小来解决此问题。
日志显示该问题是由于Kafka Consumer提交失败造成的。
以下是有关我的用例的一些详细信息:
我的主题
My-Topic
有多个使用者,它们属于同一组 IDmy-consumer-group
消费者使用来自 Kafka 的消息,应用业务逻辑并将处理后的数据存储在
Cassandra
从 Kafka 消费消息、应用业务逻辑然后将其保存到 Cassandra 的过程从 Kafka 消费的每条消息大约需要 10 毫秒。
我正在使用以下代码创建 Kafka-consumer bean
@Configuration
@EnableKafka
public class KafkaConsumer {
@Value("${spring.kafka.bootstrap-servers}")
private String brokerURL;
@Value("${spring.kafka.session.timeout}")
private int sessionTimeout;
@Value("${spring.kafka.consumer.my-group-id}")
private String groupId;
@Value("${spring.kafka.listener.concurrency}")
private int concurrency;
@Value("${spring.kafka.listener.poll-timeout}")
private int timeout;
@Value("${spring.kafka.consumer.enable-auto-commit}")
private boolean autoCommit;
@Value("${spring.kafka.consumer.auto-commit-interval}")
private String autoCommitInterval;
@Value("${spring.kafka.consumer.auto-offset-reset}")
private String autoOffsetReset;
@Bean
KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<String, String>> kafkaListenerContainerFactory() {
ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(consumerFactory());
factory.setConcurrency(concurrency);
factory.getContainerProperties().setPollTimeout(timeout);
return factory;
}
@Bean
public ConsumerFactory<String, String> consumerFactory() {
return new DefaultKafkaConsumerFactory<>(consumerConfigs());
}
@Bean
public Map<String, Object> consumerConfigs() {
Map<String, Object> propsMap = new HashMap<>();
propsMap.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, brokerURL);
propsMap.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, autoCommit);
propsMap.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, autoCommitInterval);
propsMap.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, sessionTimeout);
propsMap.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
propsMap.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
propsMap.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);
propsMap.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, autoOffsetReset);
return propsMap;
}
}
这些是我正在使用的 kafka 配置
spring.kafka.listener.concurrency=2
spring.kafka.listener.poll-timeout=3000
spring.kafka.consumer.auto-commit-interval=1000
spring.kafka.consumer.enable-auto-commit=true
spring.kafka.consumer.auto-offset-reset=earliest
spring.kafka.session.timeout=50000
spring.kafka.connection.timeout=10000
spring.kafka.topic.partition=5
spring.kafka.message.replication=2
我主要担心的是属于同一消费者组的多个 Kafka 消费者重复读取消息,在我的应用程序中,我必须避免重复输入数据库。
您能否帮我检查一下我的上述 Kafka 配置和 Kafka-consumer-code,以便我可以避免重复读取。
最佳答案
简单的答案是不要使用autoCommit
- 它会按计划提交。
相反,让容器进行提交;使用AckMode
RECORD
。
但是,您仍然应该使您的代码具有幂等性 - 始终存在重新交付的可能性;只是提交策略越可靠,概率会越小。
关于java - 卡夫卡重复读,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/45470749/