java - Samza 0.14.1 无法正确处理 OffsetOutOfRangeException 异常?

标签 java apache-kafka apache-samza

我们面临着与此 thread 中描述的相同问题.

此处 - Samza 请求的 Kafka 分区偏移量太旧(即 Kafka 日志已向前移动)。我们正在设置属性 consumer.auto.offset.reset最小,因此期望 Samza 在这种情况下将其检查点重置为最早的可用分区偏移量。但这种情况并没有发生,我们不断收到这种形式的异常:

INFO [2018-08-21 19:26:20,924] [U:669,F:454,T:1,123,M:2,658]
kafka.producer.SyncProducer:[Logging_class:info:66] - [main] -
Disconnecting from vrni-platform-release:9092
INFO [2018-08-21 19:26:20,924] [U:669,F:454,T:1,123,M:2,658]
system.kafka.GetOffset:[Logging_class:info:63] - [main] - Validating offset
56443499 for topic and partition Topic3-0
WARN [2018-08-21 19:26:20,925] [U:669,F:454,T:1,123,M:2,658]
system.kafka.KafkaSystemConsumer:[Logging_class:warn:74] - [main] - While
refreshing brokers for Topic3-0:
org.apache.kafka.common.errors.OffsetOutOfRangeException: The requested
offset is not within the range of offsets maintained by the server..
Retrying

版本详情

  • 萨姆扎:2.11-0.14.1
  • Kafka 客户端:1.1.0
  • Kafka 服务器:1.1.0 Scala 2.11

浏览代码,看来 GetOffset::isValidOffset 应该能够捕获异常 OffsetOutOfRangeException 并将其转换为 false 值。但看来这并没有发生。 Exceptionpackage 是否可能不匹配? GetOffSet类(class)正在 catch 异常import kafka.common.OffsetOutOfRangeException,但是从日志来看,这个类的封装不同。难道是这个原因?

def isValidOffset(consumer: DefaultFetchSimpleConsumer, topicAndPartition: TopicAndPartition, offset: String) = {
    info("Validating offset %s for topic and partition %s" format (offset, topicAndPartition))

    try {
      val messages = consumer.defaultFetch((topicAndPartition, offset.toLong))

      if (messages.hasError) {
        KafkaUtil.maybeThrowException(messages.error(topicAndPartition.topic, topicAndPartition.partition).exception())
      }

      info("Able to successfully read from offset %s for topic and partition %s. Using it to instantiate consumer." format (offset, topicAndPartition))

      true
    } catch {
      case e: OffsetOutOfRangeException => false
    }
}

此外,看来 BrokerProxy类 - GetOffset 的调用者会打印一条日志 "It indicates that..." 如果它得到一个 false 值,但它不会记录这一行(表明GetOffset 方法中生成的一些异常未被捕获并向上传播):

def addTopicPartition(tp: TopicAndPartition, nextOffset: Option[String]) = {
    debug("Adding new topic and partition %s to queue for %s" format (tp, host))

    if (nextOffsets.asJava.containsKey(tp)) {
      toss("Already consuming TopicPartition %s" format tp)
    }

    val offset = if (nextOffset.isDefined && offsetGetter.isValidOffset(simpleConsumer, tp, nextOffset.get)) {
      nextOffset
        .get
        .toLong
    } else {
      warn("It appears that we received an invalid or empty offset %s for %s. Attempting to use Kafka's auto.offset.reset setting. This can result in data loss if processing continues." format (nextOffset, tp))

      offsetGetter.getResetOffset(simpleConsumer, tp)
    }

    debug("Got offset %s for new topic and partition %s." format (offset, tp))

    nextOffsets += tp -> offset

    metrics.topicPartitions.get((host, port)).set(nextOffsets.size)
  }

这可能是由于我们使用的 Kafka 客户端库版本不匹配造成的吗? 是否有推荐的 Kafka 客户端版本可供我们与 Samza 0.14.1 一起使用(假设 Kafka 服务器是 1.x)?

任何与此相关的帮助将不胜感激。

最佳答案

以上是 samza 0.14.0 和 0.14.1 中的错误。 SAMZA-1822是错误 ID。

这也在 samza mailing list 中进行了讨论。 。

关于java - Samza 0.14.1 无法正确处理 OffsetOutOfRangeException 异常?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/51991805/

相关文章:

java - Android WebView 302 重定向 - 蜂窝

java - 使用 File.getAbsolutePath 时 ClassLoader.getResource 返回 null

java - 如何使用 vert.x java api 将服务器通知(推送通知)发送给客户端

hadoop - 如何在HDFS上部署和运行Samza作业?

java - KeyStore 和 TrustStore 加载失败 - 私钥必须附有证书链

spring - 我可以使用 Spring WebFlux 实现通过 Kafka 请求/响应主题获取数据的 REST 服务吗?

scala - 如何将 Spark 流 DF 写入 Kafka 主题

scala - 使用经过身份验证的 Confluent Schema Registry 配置 Spark 结构化流

java - Samza/Kafka 更新元数据失败

java - 如何在Samza worker上获得应用程序ID?