apache-kafka - 如何在不手动分配分区的情况下实现 Exactly-Once Kafka Consumer

标签 apache-kafka kafka-consumer-api

我正在浏览 this article这解释了如何通过执行以下操作确保消息只被处理一次:

  • 在启动/重启时从数据库中读取(主题、分区、偏移量)
  • 从特定的(主题、分区、偏移量)读取消息
  • 自动执行以下操作:
    • 正在处理消息
    • 将偏移量提交给数据库作为(主题、分区、偏移量)

如您所见,它明确指定了从哪个分区读取消息。我觉得这不是个好主意,因为它不允许 Kafka 将公平份额的分区分配给活跃的消费者。在消费者内部轮询 kafka 主题时,如果不显式指定分区,我无法提供实现类似功能的逻辑。可以吗?

最佳答案

很好的分析。你有一个很好的观点,如果可能的话,你当然应该让 kafka 处理消费者的分区分配。

consumer.Assign(Partition[]) 有一个替代方法。当分区被撤销或分配给消费者时,kafka 经纪人将通知您的消费者。例如,dotnet 客户端库有一个“SetPartitionsRevoked”和“SetPartitionsAssigned”处理程序,消费者可以使用它们来管理他们的偏移量。

当一个分区被撤销时,将你最后处理的每个被撤销分区的偏移量保存到数据库中。分配新分区时,从数据库中获取该分区的最后处理偏移量并使用它。

C# 示例:

public class Program
{
   public void Main(string[] args)
   {

      using (
         var consumer = new ConsumerBuilder<string, string>(config)
                      .SetErrorHandler(ErrorHandler)
                      .SetPartitionsRevokedHandler(HandlePartitionsRevoked)
                      .SetPartitionsAssigned(HandlePartitionsAssigned)
                      .Build()
      )
      {
         while (true)
         {
            consumer.Consume()//.Poll()
         }
      }
   }

   public IEnumerable<TopicPartitionOffset> 
   HandlePartitionsRevoked
   (
      IConsumer<string, string> consumer, 
      List<TopicPartitionOffset> currentTopicPartitionOffsets
   )
   {
      Persist(<last processed offset for each partition in 
      'currentTopicPartitionOffsets'>);
      return tpos;
   }

   public IEnumerable<TopicPartitionOffset> HandlePartitionsAssigned
   (
      IConsumer<string, string> consumer, 
      List<TopicPartition> tps
    )
   {
      List<TopicPartitionOffset> tpos = FetchOffsetsFromDbForTopicPartitions(tps);
      return tpos
   }
}

来自 ConsumerRebalanceListener Docs 的 Java 示例:

如果用 Java 编写,您可以实现一个“ConsumerRebalanceListener”接口(interface)。然后将接口(interface)的实现传递给 consumer.Subscribe(topic, listener) 方法。下面的示例是从上面链接的 kafka 文档中逐字提取的:

public class SaveOffsetsOnRebalance implements ConsumerRebalanceListener {
       private Consumer<?,?> consumer;

       public SaveOffsetsOnRebalance(Consumer<?,?> consumer) {
           this.consumer = consumer;
       }

       public void onPartitionsRevoked(Collection<TopicPartition> partitions) {
           // save the offsets in an external store using some custom code not described here
           for(TopicPartition partition: partitions)
              saveOffsetInExternalStore(consumer.position(partition));
       }

       public void onPartitionsAssigned(Collection<TopicPartition> partitions) {
           // read the offsets from an external store using some custom code not described here
           for(TopicPartition partition: partitions)
              consumer.seek(partition, readOffsetFromExternalStore(partition));
    }
}

如果我的理解是正确的,你会像这样调用 java 版本:consumer.Subscribe("My topic", new SaveOffsetsOnRebalance(consumer))

有关更多信息,请参阅 kafka docs 的“在 Kafka 外部存储偏移量”部分.

以下是这些文档的摘录,总结了如何为恰好一次处理存储分区和偏移量:

Each record comes with its own offset, so to manage your own offset you just need to do the following:

  • Configure enable.auto.commit=false
  • Use the offset provided with each ConsumerRecord to save your position.
  • On restart restore the position of the consumer using seek(TopicPartition, long).

This type of usage is simplest when the partition assignment is also done manually (this would be likely in the search index use case described above). If the partition assignment is done automatically special care is needed to handle the case where partition assignments change. This can be done by providing a ConsumerRebalanceListener instance in the call to subscribe(Collection, ConsumerRebalanceListener) and subscribe(Pattern, ConsumerRebalanceListener). For example, when partitions are taken from a consumer the consumer will want to commit its offset for those partitions by implementing ConsumerRebalanceListener.onPartitionsRevoked(Collection). When partitions are assigned to a consumer, the consumer will want to look up the offset for those new partitions and correctly initialize the consumer to that position by implementing ConsumerRebalanceListener.onPartitionsAssigned(Collection).

Another common use for ConsumerRebalanceListener is to flush any caches the application maintains for partitions that are moved elsewhere.

关于apache-kafka - 如何在不手动分配分区的情况下实现 Exactly-Once Kafka Consumer,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/58029989/

相关文章:

apache-kafka - Kafka 在尝试访问磁盘上的日志文件时抛出异常磁盘错误

python-3.x - 使用 Apache Beam Python 为每个窗口编写唯一的 Parquet 文件

java - 使用 Java Spark - kafka 直接流无法从 Kafka 主题获取值(value)

apache-kafka - Kafka Confluent HTTP 接收器连接器的开源替代方案

java - 简单的 Kafka 消费者没有收到消息

apache-kafka - 如何使用@KafkaListner 注解订阅多个主题

apache-kafka - 分区数量超过消费者时的 Apache Kafka 消息消费

apache-spark - Spark Streaming 应用程序中不同持续时间的多个窗口

apache-kafka - 向 kafka 主题发送消息时出现超时异常

java - Kafka消费者组,创建消费者组时将offset设置为0