我试图实现对 kafka 主题上的每条消息进行一次处理。这是我的配置:
config.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
config.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);
config.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
config.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, JsonDeserializer.class);
config.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
config.put(ConsumerConfig.MAX_POLL_INTERVAL_MS_CONFIG, 120000);
config.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, 25);
config.put(ConsumerConfig.FETCH_MIN_BYTES_CONFIG, 4096000);
config.put(ConsumerConfig.FETCH_MAX_WAIT_MS_CONFIG, 120000);
config.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, 600000);
config.put(ConsumerConfig.MAX_PARTITION_FETCH_BYTES_CONFIG, 8192000);
我已将确认模式
设置为手动,并发度设置为 2。
但是它不止一次地消耗消息。有没有人遇到过这个问题。
此外,通过上述配置,消费者在一批中总是只收到一条消息。我尝试增加 fetch.min.bytes
和 fetch.max.wait.ms
,但没有任何影响。
我对 ConcurrentKafkaListenerContainerFactory 进行如下更改后,批量配置的问题得到解决:
ConcurrentKafkaListenerContainerFactory<String, String> factory =
new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(consumerFactory());
factory.setBatchListener(true);
factory.getContainerProperties().setPollTimeout(3600000);
factory.getContainerProperties().setAckMode(org.springframework.kafka.listner.ContainerProperties.AckMode.MANUAL); factory.setMessageConverter(new BatchMessagingMessageConverter(stringJsonMessageConverter()));
最佳答案
要获得恰好一次语义,您必须使用 transactions .
但是,恰好一次语义仅适用于
read from Kafka -> process -> write to Kafka
即使如此,它也只适用于整个流程(读/处理/写)。
对于同一条记录,单独的读取和处理步骤可能会被多次调用(如果处理或写入失败)。唯一的保证是整个事情只会被处理一次。
关于java - 我们可以使用spring kafka批处理监听器来实现消息的一次处理吗?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/61184794/