spring - 如果消息失败并由 AfterRollbackProcessor 处理,如何在 Spring Kafka 中提交偏移量

标签 spring spring-boot apache-kafka spring-kafka

我正在使用 Spring Boot 2.1.9 和 Spring Kafka 2.2.9。

如果消息失败多次(在 afterRollbackProcessor 中定义),消费者将停止轮询记录。但如果消费者重新启动,它会再次重新轮询相同的消息和进程。

但我不想再次重新轮询消息,阻止它的最佳方法是什么?

这是我的配置

@Configuration
@EnableKafka
public class KafkaReceiverConfig {

    // Kafka Server Configuration
    @Value("${kafka.servers}")
    private String kafkaServers;

    // Group Identifier
    @Value("${kafka.groupId}")
    private String groupId;

    // Kafka Max Retry Attempts
    @Value("${kafka.retry.maxAttempts:5}")
    private Integer retryMaxAttempts;

    // Kafka Max Retry Interval
    @Value("${kafka.retry.interval:180000}")
    private Long retryInterval;

    // Kafka Concurrency
    @Value("${kafka.concurrency:10}")
    private Integer concurrency;

    // Kafka Concurrency
    @Value("${kafka.poll.timeout:300}")
    private Integer pollTimeout;

    // Kafka Consumer Offset
    @Value("${kafka.consumer.auto-offset-reset:earliest}")
    private String offset = "earliest";

    @Value("${kafka.max.records:100}")
    private Integer maxPollRecords;

    @Value("${kafka.max.poll.interval.time:500000}")
    private Integer maxPollIntervalMs;

    @Value("${kafka.max.session.timeout:60000}")
    private Integer sessionTimoutMs;

    // Logger
    private static final Logger log = LoggerFactory.getLogger(KafkaReceiverConfig.class);


    @Bean
    public KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<String, String>> kafkaListenerContainerFactory(
            ChainedKafkaTransactionManager<String, String> chainedTM, MessageProducer messageProducer) {
        ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<>();
        factory.setConsumerFactory(consumerFactory());
        factory.setConcurrency(concurrency);
        factory.getContainerProperties().setPollTimeout(pollTimeout);
        factory.getContainerProperties().setAckMode(AckMode.RECORD);
        factory.getContainerProperties().setSyncCommits(true);
        factory.getContainerProperties().setAckOnError(false);
        factory.getContainerProperties().setTransactionManager(chainedTM);

        AfterRollbackProcessor<String, String> afterRollbackProcessor = new DefaultAfterRollbackProcessor<>(
                (record, exception) -> {
                    log.warn("failed to process kafka message (retries are exausted). topic name:" + record.topic()
                            + " value:" + record.value());
                    messageProducer.saveFailedMessage(record, exception);
                }, retryMaxAttempts);

        factory.setAfterRollbackProcessor(afterRollbackProcessor);
        log.debug("Kafka Receiver Config kafkaListenerContainerFactory created");
        return factory;
    }


    @Bean
    public ConsumerFactory<String, String> consumerFactory() {
        log.debug("Kafka Receiver Config consumerFactory created");
        return new DefaultKafkaConsumerFactory<>(consumerConfigs());
    }

    @Bean
    public Map<String, Object> consumerConfigs() {
        Map<String, Object> props = new ConcurrentHashMap<>();
        props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaServers);
        props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
        props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
        props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);
        props.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, maxPollRecords);
        props.put(ConsumerConfig.MAX_POLL_INTERVAL_MS_CONFIG, maxPollIntervalMs);
        props.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, sessionTimoutMs);
        props.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);
        props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, offset);
        props.put(ConsumerConfig.ISOLATION_LEVEL_CONFIG, "read_committed");
        log.debug("Kafka Receiver Config consumerConfigs created");
        return props;
    }

}

我怎样才能实现这个目标?

最佳答案

commitRecovered 属性设置为 true,并注入(inject)使用与事务管理器相同的生产者工厂配置的 KafkaTemplate

/**
 * {@inheritDoc}
 * Set to true and the container will run the
 * {@link #process(List, Consumer, Exception, boolean)} method in a transaction and,
 * if a record is skipped and recovered, we will send its offset to the transaction.
 * Requires a {@link KafkaTemplate}.
 * @param commitRecovered true to process in a transaction.
 * @since 2.2.5
 * @see #isProcessInTransaction()
 * @see #process(List, Consumer, Exception, boolean)
 * @see #setKafkaTemplate(KafkaTemplate)
 */
@Override
public void setCommitRecovered(boolean commitRecovered) { // NOSONAR enhanced javadoc
    super.setCommitRecovered(commitRecovered);
}

编辑

这是处理中的逻辑...

    if (SeekUtils.doSeeks(((List) records), consumer, exception, recoverable,
            getSkipPredicate((List) records, exception), this.logger)
                && isCommitRecovered() && this.kafkaTemplate != null && this.kafkaTemplate.isTransactional()) {

        // if we get here it means retries are exhausted and we've skipped

        ConsumerRecord<K, V> skipped = records.get(0);
        this.kafkaTemplate.sendOffsetsToTransaction(
                Collections.singletonMap(new TopicPartition(skipped.topic(), skipped.partition()),
                        new OffsetAndMetadata(skipped.offset() + 1)));
    }

编辑2

在 2.2.x 中,该属性为

/**
 * Set to true to run the {@link #process(List, Consumer, Exception, boolean)}
 * method in a transaction. Requires a {@link KafkaTemplate}.
 * @param processInTransaction true to process in a transaction.
 * @since 2.2.5
 * @see #process(List, Consumer, Exception, boolean)
 * @see #setKafkaTemplate(KafkaTemplate)
 */
public void setProcessInTransaction(boolean processInTransaction) {
    this.processInTransaction = processInTransaction;
}

关于spring - 如果消息失败并由 AfterRollbackProcessor 处理,如何在 Spring Kafka 中提交偏移量,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/59421181/

相关文章:

spring-boot - Spring-data-elasticsearch:更新到 7.6.2 后无法从 Flux<SearchHit<Sugestao>> 转换为 Flux<Sugestao>。如何处理 SearchHit?

jdbc - Apache Kafka JDBC 连接器 - SerializationException : Unknown magic byte

java - 我如何不使用 Maven 引入第三方依赖项,而是如何将该依赖项的源代码直接实现到我的项目中?

java - 如何在hibernate中更新父表的主键,从而自动更新子表的外键?

java - RequestMapping 适用于私有(private)方法

apache-kafka - 如何在Kafka ksql中使用字符串数组字段

apache-kafka - 如何在通过指定主题名称创建流形式模式注册表时在 ksql 中保留字段区分大小写

java - Spring 启动: Configuration Scan missing classes due to "weird" setup

java - Spring 部分更新对象数据绑定(bind)

css - Spring Dispatcher Servlet 在尝试加载我的 CSS 样式表时导致问题