java - 如何在 Spark Streaming with Kafka 中禁用enable.auto.commit 的情况下处理偏移提交失败?

标签 java apache-kafka spark-streaming apache-spark-2.0

我使用 Spark 2.0.0 和 Kafka 0.10.2。

我有一个应用程序正在处理来自 Kafka 的消息,并且是一个长时间运行的作业。

我有时会在日志中看到以下消息。我了解如何增加超时以及所有内容,但我想知道的是,我确实遇到了此错误,如何从中恢复?

ERROR ConsumerCoordinator: Offset commit failed. org.apache.kafka.clients.consumer.CommitFailedException:
Commit cannot be completed since the group has already rebalanced and assigned the partitions to another member.
This means that the time between subsequent calls to poll() was longer than the configured session.timeout.ms, which typically implies that the poll loop is spending too much time message processing.
You can address this either by increasing the session timeout or by reducing the maximum size of batches returned in poll() with max.poll.records.

这不是关于我如何逃避这个错误,而是如何在发生错误后处理它

背景:在正常情况下,我不会看到提交错误,但如果确实出现错误,我应该能够从中恢复。我正在使用 AT_LEAST_ONCE 设置,因此我对重新处理一些消息感到非常满意。 我正在运行 Java 并使用 DirectKakfaStreams 进行手动提交。

创建流:

JavaInputDStream<ConsumerRecord<String, String>> directKafkaStream =
  KafkaUtils.createDirectStream(
    jssc,
    LocationStrategies.PreferConsistent(),
    ConsumerStrategies.<String, String>Subscribe(topics, kafkaParams));

提交偏移量

((CanCommitOffsets) directKafkaStream.inputDStream()).commitAsync(offsetRanges);

最佳答案

我对这种情况的理解是,您使用 Kafka Direct Stream 集成(使用 spark-streaming-kafka-0-10_2.11 模块,如 Spark Streaming + Kafka Integration Guide (Kafka broker version 0.10.0 or higher) 中所述)。

正如错误消息中所述:

Commit cannot be completed since the group has already rebalanced and assigned the partitions to another member.

Kafka 管理消费者消费的主题分区,因此 Direct Stream 将创建消费者池(在单个消费者组内)。

与任何消费者组一样,您应该期待重新平衡(引用 Kafka: The Definitive Guide 中的第 4 章“Kafka 消费者 - 从 Kafka 读取数据”):

consumers in a consumer group share ownership of the partitions in the topics they subscribe to. When we add a new consumer to the group it starts consuming messages from partitions which were previously consumed by another consumer. The same thing happens when a consumer shuts down or crashes, it leaves the group, and the partitions it used to consume will be consumed by one of the remaining consumers. Reassignment of partitions to consumers also happen when the topics the consumer group is consuming are modified, for example if an administrator adds new partitions.

在相当多的情况下,可能会发生重新平衡,并且这是可以预期的。你确实这么做了。

你问:

how can I recover from it? This is not on how I escape this error but how to handle it once it occurs?

我的答案是使用 CanCommitOffsets 的另一种方法:

def commitAsync(offsetRanges: Array[OffsetRange], callback: OffsetCommitCallback): Unit

这使您可以访问 Kafka 的 OffsetCommitCallback :

OffsetCommitCallback is a callback interface that the user can implement to trigger custom actions when a commit request completes. The callback may be executed in any thread calling poll().

我认为 onComplete 可以让您了解异步提交如何完成并采取相应的行动。

我无法为您提供太多帮助的是,当无法提交某些偏移量时,如何恢复 Spark Streaming 应用程序中的更改。我认为这需要跟踪偏移量并接受某些偏移量无法提交和重新处理的情况。

关于java - 如何在 Spark Streaming with Kafka 中禁用enable.auto.commit 的情况下处理偏移提交失败?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/44020582/

相关文章:

sql - Spark sql 中的广播连接(Spark 1.6.2)

apache-spark - Spark : Is receiver in spark streaming a bottleneck?

java - 如何存储数组列表中某一列的文本?

JavaFX - 在表格 View 中移动列

java - 为什么我看不到 Kafka Streams reduce 方法的任何输出?

express - Kafka+API服务架构

java - 使用数据库中的数据填充 JTable

java - 当其他 fragment 更改第一个 fragment 模型时刷新 fragment

apache-storm - 将storm的字数统计拓扑与kafka集成

hadoop - 从 kafka Spark 流接收时获取空值