我正在使用 Spring Boot 2.1.9 和 Spring Kafka 2.2.9。
如果消息失败多次(在 afterRollbackProcessor 中定义),消费者将停止轮询记录。但如果消费者重新启动,它会再次重新轮询相同的消息和进程。
但我不想再次重新轮询消息,阻止它的最佳方法是什么?
这是我的配置
@Configuration
@EnableKafka
public class KafkaReceiverConfig {
// Kafka Server Configuration
@Value("${kafka.servers}")
private String kafkaServers;
// Group Identifier
@Value("${kafka.groupId}")
private String groupId;
// Kafka Max Retry Attempts
@Value("${kafka.retry.maxAttempts:5}")
private Integer retryMaxAttempts;
// Kafka Max Retry Interval
@Value("${kafka.retry.interval:180000}")
private Long retryInterval;
// Kafka Concurrency
@Value("${kafka.concurrency:10}")
private Integer concurrency;
// Kafka Concurrency
@Value("${kafka.poll.timeout:300}")
private Integer pollTimeout;
// Kafka Consumer Offset
@Value("${kafka.consumer.auto-offset-reset:earliest}")
private String offset = "earliest";
@Value("${kafka.max.records:100}")
private Integer maxPollRecords;
@Value("${kafka.max.poll.interval.time:500000}")
private Integer maxPollIntervalMs;
@Value("${kafka.max.session.timeout:60000}")
private Integer sessionTimoutMs;
// Logger
private static final Logger log = LoggerFactory.getLogger(KafkaReceiverConfig.class);
@Bean
public KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<String, String>> kafkaListenerContainerFactory(
ChainedKafkaTransactionManager<String, String> chainedTM, MessageProducer messageProducer) {
ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(consumerFactory());
factory.setConcurrency(concurrency);
factory.getContainerProperties().setPollTimeout(pollTimeout);
factory.getContainerProperties().setAckMode(AckMode.RECORD);
factory.getContainerProperties().setSyncCommits(true);
factory.getContainerProperties().setAckOnError(false);
factory.getContainerProperties().setTransactionManager(chainedTM);
AfterRollbackProcessor<String, String> afterRollbackProcessor = new DefaultAfterRollbackProcessor<>(
(record, exception) -> {
log.warn("failed to process kafka message (retries are exausted). topic name:" + record.topic()
+ " value:" + record.value());
messageProducer.saveFailedMessage(record, exception);
}, retryMaxAttempts);
factory.setAfterRollbackProcessor(afterRollbackProcessor);
log.debug("Kafka Receiver Config kafkaListenerContainerFactory created");
return factory;
}
@Bean
public ConsumerFactory<String, String> consumerFactory() {
log.debug("Kafka Receiver Config consumerFactory created");
return new DefaultKafkaConsumerFactory<>(consumerConfigs());
}
@Bean
public Map<String, Object> consumerConfigs() {
Map<String, Object> props = new ConcurrentHashMap<>();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaServers);
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);
props.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, maxPollRecords);
props.put(ConsumerConfig.MAX_POLL_INTERVAL_MS_CONFIG, maxPollIntervalMs);
props.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, sessionTimoutMs);
props.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);
props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, offset);
props.put(ConsumerConfig.ISOLATION_LEVEL_CONFIG, "read_committed");
log.debug("Kafka Receiver Config consumerConfigs created");
return props;
}
}
我怎样才能实现这个目标?
最佳答案
将 commitRecovered
属性设置为 true,并注入(inject)使用与事务管理器相同的生产者工厂配置的 KafkaTemplate
。
/**
* {@inheritDoc}
* Set to true and the container will run the
* {@link #process(List, Consumer, Exception, boolean)} method in a transaction and,
* if a record is skipped and recovered, we will send its offset to the transaction.
* Requires a {@link KafkaTemplate}.
* @param commitRecovered true to process in a transaction.
* @since 2.2.5
* @see #isProcessInTransaction()
* @see #process(List, Consumer, Exception, boolean)
* @see #setKafkaTemplate(KafkaTemplate)
*/
@Override
public void setCommitRecovered(boolean commitRecovered) { // NOSONAR enhanced javadoc
super.setCommitRecovered(commitRecovered);
}
编辑
这是处理中的逻辑...
if (SeekUtils.doSeeks(((List) records), consumer, exception, recoverable,
getSkipPredicate((List) records, exception), this.logger)
&& isCommitRecovered() && this.kafkaTemplate != null && this.kafkaTemplate.isTransactional()) {
// if we get here it means retries are exhausted and we've skipped
ConsumerRecord<K, V> skipped = records.get(0);
this.kafkaTemplate.sendOffsetsToTransaction(
Collections.singletonMap(new TopicPartition(skipped.topic(), skipped.partition()),
new OffsetAndMetadata(skipped.offset() + 1)));
}
编辑2
在 2.2.x 中,该属性为
/**
* Set to true to run the {@link #process(List, Consumer, Exception, boolean)}
* method in a transaction. Requires a {@link KafkaTemplate}.
* @param processInTransaction true to process in a transaction.
* @since 2.2.5
* @see #process(List, Consumer, Exception, boolean)
* @see #setKafkaTemplate(KafkaTemplate)
*/
public void setProcessInTransaction(boolean processInTransaction) {
this.processInTransaction = processInTransaction;
}
关于spring - 如果消息失败并由 AfterRollbackProcessor 处理,如何在 Spring Kafka 中提交偏移量,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/59421181/