java - ReplicaFetcher 崩溃 - 第一个偏移量 XXXXXXX 小于下一个偏移量 YYYYYYYY

标签 java scala apache-kafka

我们有一个由 3 个盒子组成的 2.3 Kafka 集群。几天前,当我们将其升级到 2.3 时,我们注意到这些日志消息导致两个代理上的一个主题分区的replicaFetcher 线程崩溃:

[2019-08-09 15:02:43,520] ERROR [ReplicaFetcher replicaId=4, leaderId=3, fetcherId=0] Unexpected error occurred while processing data for partition __consumer_offsets-21 at offset 57542337 (kafka.server.R
eplicaFetcherThread)
kafka.common.UnexpectedAppendOffsetException: Unexpected offset in append to __consumer_offsets-21. First offset 57542333 is less than the next offset 57542337. First 10 offsets in append: List(57542333,
57542334, 57542335, 57542336, 57542337, 57542338, 57542339, 57542340, 57542341, 57542342), last offset in append: 57570869. Log start offset = 56949140
        at kafka.log.Log.$anonfun$append$2(Log.scala:929)
        at kafka.log.Log.maybeHandleIOException(Log.scala:2065)
        at kafka.log.Log.append(Log.scala:850)
        at kafka.log.Log.appendAsFollower(Log.scala:830)
        at kafka.cluster.Partition.$anonfun$doAppendRecordsToFollowerOrFutureReplica$1(Partition.scala:726)
        at kafka.utils.CoreUtils$.inLock(CoreUtils.scala:253)
        at kafka.utils.CoreUtils$.inReadLock(CoreUtils.scala:259)
        at kafka.cluster.Partition.doAppendRecordsToFollowerOrFutureReplica(Partition.scala:717)
        at kafka.cluster.Partition.appendRecordsToFollowerOrFutureReplica(Partition.scala:733)
        at kafka.server.ReplicaFetcherThread.processPartitionData(ReplicaFetcherThread.scala:161)
        at kafka.server.AbstractFetcherThread.$anonfun$processFetchRequest$7(AbstractFetcherThread.scala:317)
        at scala.Option.foreach(Option.scala:274)
        at kafka.server.AbstractFetcherThread.$anonfun$processFetchRequest$6(AbstractFetcherThread.scala:306)
        at kafka.server.AbstractFetcherThread.$anonfun$processFetchRequest$6$adapted(AbstractFetcherThread.scala:305)
        at scala.collection.mutable.ResizableArray.foreach(ResizableArray.scala:62)
        at scala.collection.mutable.ResizableArray.foreach$(ResizableArray.scala:55)
        at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:49)
        at kafka.server.AbstractFetcherThread.$anonfun$processFetchRequest$5(AbstractFetcherThread.scala:305)
        at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23)
        at kafka.utils.CoreUtils$.inLock(CoreUtils.scala:253)
        at kafka.server.AbstractFetcherThread.processFetchRequest(AbstractFetcherThread.scala:305)
        at kafka.server.AbstractFetcherThread.$anonfun$maybeFetch$3(AbstractFetcherThread.scala:133)
        at kafka.server.AbstractFetcherThread.$anonfun$maybeFetch$3$adapted(AbstractFetcherThread.scala:132)
        at scala.Option.foreach(Option.scala:274)
        at kafka.server.AbstractFetcherThread.maybeFetch(AbstractFetcherThread.scala:132)
        at kafka.server.AbstractFetcherThread.doWork(AbstractFetcherThread.scala:114)
        at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:89)
[2019-08-09 15:02:43,524] WARN [ReplicaFetcher replicaId=4, leaderId=3, fetcherId=0] Partition __consumer_offsets-21 marked as failed (kafka.server.ReplicaFetcherThread)

影响是一个 Broker 无法成为该主题分区的 ISR(实际上第二个 Broker 也有同样的问题,所以我们只有一个 ISR,它是领导者)。

我仍然对这条消息感到困惑,并且无法正确理解它,所以我无法找到正确的方法来解决这个问题。 我真的很想了解这里发生了什么,但不确定我是否理解下面的代码:

https://github.com/apache/kafka/blob/a48b5d900c6b5c9c52a97124a1b51aff3636c32c/core/src/main/scala/kafka/log/Log.scala#L1081-L1098

 if (appendInfo.firstOrLastOffsetOfFirstBatch < nextOffsetMetadata.messageOffset) {

当replicaFetcher必须追加记录时,它如何访问nextOffset信息..? 不确定是否理解此分析的具体作用(要追加的当前记录?):

val appendInfo = analyzeAndValidateRecords(records, isFromClient = isFromClient)

还有这个:

nextOffsetMetadata

这是下一批记录吗..?它如何访问任何“下一个”记录元数据?

如果有人能稍微澄清一下,那就太好了。 与此同时,解决这个问题的解决方案会很好,但我仍然希望清楚地理解它。

编辑:

经过一番研究,有些事情变得更加清晰了。 nextOffset 只是 Activity 段的最新偏移量+1(这些元数据来自 loadSegments() 调用)。

总结一下,发生了什么: 副本从领导者获取段,其起始偏移量低于 Activity 段的最新偏移量。 所以我的问题是,为什么副本不只是截断?

雅尼克

最佳答案

我们的一个 kafka 集群正在运行 kafka 1.1.1,并且遇到了同样的问题,使用类固醇。在我们的例子中,ReplicaFetcher 崩溃了,并且完全停止了复制。 解决方案是删除受感染的分区,并让 kafka 从健康的副本中重新创建。我们尝试修复分区文件夹上不一致的条目,但没有成功。

关于java - ReplicaFetcher 崩溃 - 第一个偏移量 XXXXXXX 小于下一个偏移量 YYYYYYYY,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/57475689/

相关文章:

python - 我怎样才能找到kafka配置文件?

java - 注册时使用 hibernate 检查我的数据库中是否有电子邮件

Java 常量字符串在 Android 中被损坏

scala - 传递的函数参数的协方差

multithreading - 多线程Scala中长进程的惯用超时

java - 这是Java中 'Phantom Types'的准确转换吗?

java - 如何在java中创建检查条件语句大括号的逻辑?

JavaFX PrintAPI 错误的 PaperSource

java - Kafka Broker 偏移/日志保留和消费者偏移在最早模式下重置

hadoop - Kafka 控制台生产者丢失消息