apache-kafka - Apache 卡夫卡 : Exactly Once in Version 0. 10

标签 apache-kafka kafka-consumer-api

为了实现 Kafka 消费者对消息的一次性处理,我一次提交一条消息,如下所示

public void commitOneRecordConsumer(long seconds) {
        KafkaConsumer<String, String> consumer = consumerConfigFactory.getConsumerConfig();

        try {

            while (running) {
                ConsumerRecords<String, String> records = consumer.poll(1000);
                try {
                    for (ConsumerRecord<String, String> record : records) {

                        processingService.process(record);

                        consumer.commitSync(Collections.singletonMap(new TopicPartition(record.topic(),record.partition()), new OffsetAndMetadata(record.offset() + 1)));

                        System.out.println("Committed Offset" + ": " + record.offset());

                    }
                } catch (CommitFailedException e) {
                    // application specific failure handling
                }
            }
        } finally {
            consumer.close();
        }
    }

上面的代码将消息的处理异步委托(delegate)给下面的另一个类。
@Service
public class ProcessingService {

    @Async
    public void process(ConsumerRecord<String, String> record) throws InterruptedException {
        Thread.sleep(5000L);
        Map<String, Object> map = new HashMap<>();
        map.put("partition", record.partition());
        map.put("offset", record.offset());
        map.put("value", record.value());
        System.out.println("Processed" + ": " + map);
    }

}

但是,这仍然不能保证完全一次传递,因为如果处理失败,它可能仍然提交其他消息,并且以前的消息将永远不会被处理和提交,我在这里有什么选择?

最佳答案

0.10.2 及更早版本的原始答案(对于 0.11 及更高版本,请参阅答案打击)
目前,Kafka 无法提供开箱即用的一次性处理。如果在成功处理消息后提交消息,则可以进行至少一次处理,或者如果在 poll() 之后直接提交消息,则可以进行最多一次处理。在开始处理之前。
(另见 http://docs.confluent.io/3.0.0/clients/consumer.html#synchronous-commits 中的“交货保证”段)
但是,如果您的处理是幂等的,则至少一次保证“足够好”,即即使您处理两次记录,最终结果也将是相同的。幂等处理的示例是将消息添加到键值存储。即使您添加了两次相同的记录,第二次插入也只会替换第一个当前键值对,并且 KV 存储中仍然会包含正确的数据。

In your example code above, you update a HashMap and this would be an idempotent operation. Even if your might have an inconsistent state in case of failure if for example only two put calls are executed before the crash. However, this inconsistent state would be fixed on reprocessing the same record again.

The call to println() is not idempotent though because this is an operation with "side effect". But I guess the print is for debugging purpose only.


作为替代方案,您需要在用户代码中实现事务语义,这需要在失败的情况下“撤消”(部分执行)操作。一般来说,这是一个难题。
Apache Kafka 0.11+ 的更新(对于 0.11 之前的版本,请参见上面的答案)
从 0.11 开始,Apache Kafka 支持使用 Kafka Streams 的幂等生产者、事务生产者和一次性处理。它还添加了 "read_committed"模式让消费者只读取提交的消息(并删除/过滤中止的消息)。
  • https://kafka.apache.org/documentation/#semantics
  • https://www.confluent.io/blog/exactly-once-semantics-are-possible-heres-how-apache-kafka-does-it/
  • https://www.confluent.io/blog/transactions-apache-kafka/
  • https://www.confluent.io/blog/enabling-exactly-kafka-streams/
  • 关于apache-kafka - Apache 卡夫卡 : Exactly Once in Version 0. 10,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/38884267/

    相关文章:

    linux - 运行几天后的 Kafka LeaderNotAvailableException

    java - 卡夫卡消费者滞后

    apache-kafka - 发布和使用不同类型消息的最佳方式是什么?

    java - Datastax Kafka 连接器无法解析 Json 主题

    java - 如何转换/ fork Kafka 流并将其发送到特定主题?

    apache-kafka - 通过 SSH 隧道从 Kafka 集群消费

    networking - Kafka Docker network_mode

    java - @KafkaListener 每天消费一百万多条消息的更好解决方案?

    apache-kafka - 使用@KafkaListener Annotation 时如何暂停 Kafka Consumer

    docker - 基于控制台的 dockerized kafka 消费者