java - 如何处理 OffsetOutOfRangeException 错误?

标签 java apache-kafka apache-storm

我正在使用 stormkafka 来分析实时数据。

我在 spout 中收到以下错误

错误

kafka.common.OffsetOutOfRangeException
at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method)
at sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:57)
at sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
at java.lang.reflect.Constructor.newInstance(Constructor.java:526)
at java.lang.Class.newInstance(Class.java:374)
at kafka.common.ErrorMapping$.maybeThrowException(ErrorMapping.scala:53)
at kafka.message.ByteBufferMessageSet.kafka$message$ByteBufferMessageSet$$internalIterator(ByteBufferMessageSet.scala:99)
at kafka.message.ByteBufferMessageSet.iterator(ByteBufferMessageSet.scala:82)
at scala.collection.IterableLike$class.foreach(IterableLike.scala:73)
at kafka.message.MessageSet.foreach(MessageSet.scala:87)
at scala.collection.TraversableOnce$class.size(TraversableOnce.scala:104)
at kafka.message.MessageSet.size(MessageSet.scala:87)
at storm.kafka.PartitionManager.fill(PartitionManager.java:113)
at storm.kafka.PartitionManager.next(PartitionManager.java:83)
at storm.kafka.KafkaSpout.nextTuple(KafkaSpout.java:106)
at backtype.storm.daemon.executor$fn__3430$fn__3445$fn__3474.invoke(executor.clj:547)
at backtype.storm.util$async_loop$fn__444.invoke(util.clj:403)
at clojure.lang.AFn.run(AFn.java:24)
at java.lang.Thread.run(Thread.java:745)

备注:

  1. spout 的 startOffsetTime 为 -1。
  2. Storm 版本 - 0.9.0
  3. kafka 版本 - 0.7.2

如何解决这个问题?

任何建议将不胜感激。

最佳答案

kafka.common.OffsetOutOfRangeException

通常表示客户端请求的范围在服务器上不再可用

这可能会发生,因为根据您的 Kafka 配置中的保留策略,主题日志中具有请求偏移量的消息不再存在。

以下是配置示例:(查看它并根据 将其设置为最佳设置)

############################# Log Retention Policy #############################    
# The following configurations control the disposal of log segments. The policy can
# be set to delete segments after a period of time, or after a given size has accumulated.
# A segment will be deleted whenever *either* of these criteria are met. Deletion always happens
# from the end of the log.    
# The minimum age of a log file to be eligible for deletion
log.retention.hours=168

# A size-based retention policy for logs. Segments are pruned from the log as long as the remaining
# segments don't drop below log.retention.bytes.
#log.retention.bytes=1073741824

# The maximum size of a log segment file. When this size is reached a new log segment will be created.
log.segment.bytes=536870912

# The interval at which log segments are checked to see if they can be deleted according 
# to the retention policies
log.cleanup.interval.mins=1

注意:Kafka 会根据您的配置自动从文件中删除消息,消费者会在 zookeeper 中保留分区偏移量。 (现在考虑偏移量是 3000)。当 Kafka 完成清理时,该分区的偏移量被重置,因此最大偏移量必须小于存储在 zookeeper 中的一个消费者 (3000)。当消费者从 zookeeper 获取当前偏移量(即再次为 3000)并使用此偏移量从 Kafka 读取不存在的消息时,这可能是一个问题。 所以解决方案是将自动删除间隔处理到最优。

还可以查看以下链接以获取更多信息。

关于java - 如何处理 OffsetOutOfRangeException 错误?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/24883455/

相关文章:

java - 使用kafka流根据消息 key 向主题发送消息

scala - Alpakka卡夫卡vs卡夫卡流

java - Storm的ExecutorDetails类

java - 使用 Storm 进行动态枢轴

java - 为什么在minor gc之后回收了部分oldGen?

java - 是否可以只运行一个测试类(利用 PowerMock 和 Mockito)?

java - 使用 Spring Kafka 框架时如何处理错误/异常?

java - Storm Kafka-Spout 无法正常工作

Java- 将文本写入图像,然后写入输出文件

java - 上下文或 Activity 之外的 getString