我使用 Spark-sql-2.4.1 版本和 Kafka 0.10 v.
当我尝试按消费者消费数据时。 即使将“auto.offset.reset”设置为“最新”
后,它也会出现以下错误org.apache.kafka.clients.consumer.OffsetOutOfRangeException: Offsets out of range with no configured reset policy for partitions: {COMPANY_INBOUND-16=168}
at org.apache.kafka.clients.consumer.internals.Fetcher.throwIfOffsetOutOfRange(Fetcher.java:348)
at org.apache.kafka.clients.consumer.internals.Fetcher.fetchedRecords(Fetcher.java:396)
at org.apache.kafka.clients.consumer.KafkaConsumer.pollOnce(KafkaConsumer.java:999)
at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:937)
at org.apache.spark.sql.kafka010.InternalKafkaConsumer.fetchData(KafkaDataConsumer.scala:470)
at org.apache.spark.sql.kafka010.InternalKafkaConsumer.org$apache$spark$sql$kafka010$InternalKafkaConsumer$$fetchRecord(KafkaDataConsumer.scala:361)
at org.apache.spark.sql.kafka010.InternalKafkaConsumer$$anonfun$get$1.apply(KafkaDataConsumer.scala:251)
at org.apache.spark.sql.kafka010.InternalKafkaConsumer$$anonfun$get$1.apply(KafkaDataConsumer.scala:234)
at org.apache.spark.util.UninterruptibleThread.runUninterruptibly(UninterruptibleThread.scala:77)
at org.apache.spark.sql.kafka010.InternalKafkaConsumer.runUninterruptiblyIfPossible(KafkaDataConsumer.scala:209)
at org.apache.spark.sql.kafka010.InternalKafkaConsumer.get(KafkaDataConsumer.scala:234)
where is the issue ? why setting is not working ? How should it be fixed?
第 2 部分:
.readStream()
.format("kafka")
.option("startingOffsets", "latest")
.option("enable.auto.commit", false)
.option("maxOffsetsPerTrigger", 1000)
.option("auto.offset.reset", "latest")
.option("failOnDataLoss", false)
.load();
最佳答案
Spark 结构化流忽略 auto.offset.reset,请改用 runningOffsets 选项
auto.offset.reset: Set the source option startingOffsets to specify where to start instead. Structured Streaming manages which offsets are consumed internally, rather than rely on the kafka Consumer to do it. This will ensure that no data is missed when new topics/partitions are dynamically subscribed. Note that startingOffsets only applies when a new streaming query is started, and that resuming will always pick up from where the query left off.
关于apache-spark - 即使将 "auto.offset.reset"设置为 "latest"后也会出现错误 OffsetOutOfRangeException,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/58653885/