java - 我们可以使用spring kafka批处理监听器来实现消息的一次处理吗?

标签 java spring-kafka

我试图实现对 kafka 主题上的每条消息进行一次处理。这是我的配置:

config.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");    
config.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);
config.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
config.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, JsonDeserializer.class);
config.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
config.put(ConsumerConfig.MAX_POLL_INTERVAL_MS_CONFIG, 120000);
config.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, 25);
config.put(ConsumerConfig.FETCH_MIN_BYTES_CONFIG, 4096000);
config.put(ConsumerConfig.FETCH_MAX_WAIT_MS_CONFIG, 120000);
config.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, 600000);  
config.put(ConsumerConfig.MAX_PARTITION_FETCH_BYTES_CONFIG, 8192000);

我已将确认模式设置为手动,并发度设置为 2。

但是它不止一次地消耗消息。有没有人遇到过这个问题。 此外,通过上述配置,消费者在一批中总是只收到一条消息。我尝试增加 fetch.min.bytesfetch.max.wait.ms,但没有任何影响。

我对 ConcurrentKafkaListenerContainerFactory 进行如下更改后,批量配置的问题得到解决:

ConcurrentKafkaListenerContainerFactory<String, String> factory =
    new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(consumerFactory());

factory.setBatchListener(true);
factory.getContainerProperties().setPollTimeout(3600000);

factory.getContainerProperties().setAckMode(org.springframework.kafka.listner.ContainerProperties.AckMode.MANUAL); factory.setMessageConverter(new BatchMessagingMessageConverter(stringJsonMessageConverter()));

最佳答案

要获得恰好一次语义,您必须使用 transactions .

但是,恰好一次语义仅适用于

read from Kafka -> process -> write to Kafka

即使如此,它也只适用于整个流程(读/处理/写)。

对于同一条记录,单独的读取和处理步骤可能会被多次调用(如果处理或写入失败)。唯一的保证是整个事情只会被处理一次。

关于java - 我们可以使用spring kafka批处理监听器来实现消息的一次处理吗?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/61184794/

相关文章:

java - 线程中的异常 "AWT-EventQueue-0"java.lang.StringIndexOutOfBoundsException : String index out of range: 97

java - HashMap 插入空值而不是字符串?

java - SpringBoot @KafkaListener 得到 MessageConversionException : Cannot convert from A to B

java - Spring Kafka Consumer 如何跳过 Avro Deserializer 异常

java - Android Auto - 后台服务与Activity之间的通信

java - 如何将数据从服务器放到 Kinesis Stream

java - 如何将模拟注入(inject)具有@Transactional 的@Service

spring-boot - Spring Cloud Kafka Stream 无法创建生产者配置错误

java - 如何编写 Kafka 消费者——单线程 vs 多线程

spring - Kafka 监听器中的钩子(Hook)