apache-spark - 为 kafka 主题 : No current assignment for partition topic1 分配新分区(旧分区被撤销)后,Spark Streaming 作业失败

标签 apache-spark apache-kafka spark-streaming offset kafka-consumer-api

在 kafka 中使用 spark streaming 并使用以下代码创建直接流-

val kafkaParams = Map[String, Object](
  "bootstrap.servers" -> conf.getString("kafka.brokers"),
  "zookeeper.connect" -> conf.getString("kafka.zookeeper"),
  "group.id" -> conf.getString("kafka.consumergroups"),
  "auto.offset.reset" -> args { 1 },
  "enable.auto.commit" -> (conf.getString("kafka.autoCommit").toBoolean: java.lang.Boolean),
  "key.deserializer" -> classOf[StringDeserializer],
  "value.deserializer" -> classOf[StringDeserializer],
  "security.protocol" -> "SASL_PLAINTEXT",
  "session.timeout.ms" -> args { 2 },
  "max.poll.records" -> args { 3 },
  "request.timeout.ms" -> args { 4 },
  "fetch.max.wait.ms" -> args { 5 })

val messages = KafkaUtils.createDirectStream[String, String](
  ssc,
  LocationStrategies.PreferConsistent,
  ConsumerStrategies.Subscribe[String, String](topicsSet, kafkaParams))

经过一些处理后,我们使用 commitAsync API 提交偏移量。

try
{
messages.foreachRDD { rdd =>
  val offsetRanges = rdd.asInstanceOf[HasOffsetRanges].offsetRanges
  messages.asInstanceOf[CanCommitOffsets].commitAsync(offsetRanges)
}
}
catch
{   
 case e:Throwable => e.printStackTrace()
}

以下错误导致作业崩溃-

            18/03/20 10:43:30 INFO ConsumerCoordinator: Revoking previously assigned partitions [TOPIC_NAME-3, TOPIC_NAME-5, TOPIC_NAME-4] for group 21_feb_reload_2
            18/03/20 10:43:30 INFO AbstractCoordinator: (Re-)joining group 21_feb_reload_2
            18/03/20 10:43:30 INFO AbstractCoordinator: (Re-)joining group 21_feb_reload_2
            18/03/20 10:44:00 INFO AbstractCoordinator: Successfully joined group 21_feb_reload_2 with generation 20714
            18/03/20 10:44:00 INFO ConsumerCoordinator: Setting newly assigned partitions [TOPIC_NAME-1, TOPIC_NAME-0, TOPIC_NAME-2] for group 21_feb_reload_2
            18/03/20 10:44:00 ERROR JobScheduler: Error generating jobs for time 1521557010000 ms
            java.lang.IllegalStateException: No current assignment for partition TOPIC_NAME-4
                at org.apache.kafka.clients.consumer.internals.SubscriptionState.assignedState(SubscriptionState.java:251)
                at org.apache.kafka.clients.consumer.internals.SubscriptionState.needOffsetReset(SubscriptionState.java:315)
                at org.apache.kafka.clients.consumer.KafkaConsumer.seekToEnd(KafkaConsumer.java:1170)
                at org.apache.spark.streaming.kafka010.DirectKafkaInputDStream.latestOffsets(DirectKafkaInputDStream.scala:197)
                at org.apache.spark.streaming.kafka010.DirectKafkaInputDStream.compute(DirectKafkaInputDStream.scala:214)
                at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1$$anonfun$apply$7.apply(DStream.scala:341)
                at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1$$anonfun$apply$7.apply(DStream.scala:341)
                at scala.util.DynamicVariable.withValue(DynamicVariable.scala:58)
                at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1.apply(DStream.scala:340)
                at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1.apply(DStream.scala:340)
                at org.apache.spark.streaming.dstream.DStream.createRDDWithLocalProperties(DStream.scala:415)
                at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1.apply(DStream.scala:335)
                at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1.apply(DStream.scala:333)
                at scala.Option.orElse(Option.scala:289)
                at org.apache.spark.streaming.dstream.DStream.getOrCompute(DStream.scala:330)
                at org.apache.spark.streaming.dstream.MappedDStream.compute(MappedDStream.scala:36)
                at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1$$anonfun$apply$7.apply(DStream.scala:341)
                at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1$$anonfun$apply$7.apply(DStream.scala:341)
                at scala.util.DynamicVariable.withValue(DynamicVariable.scala:58)
                at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1.apply(DStream.scala:340)
                at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1.apply(DStream.scala:340)
                at org.apache.spark.streaming.dstream.DStream.createRDDWithLocalProperties(DStream.scala:415)
                at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1.apply(DStream.scala:335)
                at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1.apply(DStream.scala:333)
                at scala.Option.orElse(Option.scala:289)
                at org.apache.spark.streaming.dstream.DStream.getOrCompute(DStream.scala:330)
                at org.apache.spark.streaming.dstream.ForEachDStream.generateJob(ForEachDStream.scala:48)
                at org.apache.spark.streaming.DStreamGraph$$anonfun$1.apply(DStreamGraph.scala:117)
                at org.apache.spark.streaming.DStreamGraph$$anonfun$1.apply(DStreamGraph.scala:116)
                at scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:241)
                at scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:241)
                at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
                at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)
                at scala.collection.TraversableLike$class.flatMap(TraversableLike.scala:241)
                at scala.collection.AbstractTraversable.flatMap(Traversable.scala:104)
                at org.apache.spark.streaming.DStreamGraph.generateJobs(DStreamGraph.scala:116)
                at org.apache.spark.streaming.scheduler.JobGenerator$$anonfun$3.apply(JobGenerator.scala:249)
                at org.apache.spark.streaming.scheduler.JobGenerator$$anonfun$3.apply(JobGenerator.scala:247)
                at scala.util.Try$.apply(Try.scala:192)
                at org.apache.spark.streaming.scheduler.JobGenerator.generateJobs(JobGenerator.scala:247)
                at org.apache.spark.streaming.scheduler.JobGenerator.org$apache$spark$streaming$scheduler$JobGenerator$$processEvent(JobGenerator.scala:183)
                at org.apache.spark.streaming.scheduler.JobGenerator$$anon$1.onReceive(JobGenerator.scala:89)
                at org.apache.spark.streaming.scheduler.JobGenerator$$anon$1.onReceive(JobGenerator.scala:88)
                at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48)
            18/03/20 10:44:00 ERROR ApplicationMaster: User class threw exception: java.lang.IllegalStateException: No current assignment for partition 

我的发现-

1- 帖子中的类似问题 - Kafka Spark Stream throws Exception:No current assignment for partition 这并没有对为什么使用 Assign 而不是 Subscribe 给出太多解释。

2- 为了确保没有重新平衡,我将 session.timeout.ms 增加到几乎我的批处理持续时间,因为我的处理在不到 2 分钟(批处理持续时间)内完成。

session.timeout.ms- 消费者在仍被视为活着的情况下可以与经纪人失去联系的时间 ( https://www.safaribooksonline.com/library/view/kafka-the-definitive/9781491936153/ch04.html )

3- 遇到 Re-balance Listeners with 方法 - 一个 onPartitionsRevoked b onPartitionsAssigned

但我无法理解如何使用第一个在重新平衡之前提交偏移量的。

任何输入将不胜感激。

最佳答案

我遇到过同样的问题。当我的两个 spark 作业使用相同的 kafka client.id 时。所以我已经为另一个作业分配了新的 kafka 客户端

关于apache-spark - 为 kafka 主题 : No current assignment for partition topic1 分配新分区(旧分区被撤销)后,Spark Streaming 作业失败,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/49452967/

相关文章:

apache-spark - Spark 2.1 结构化流 - 使用 Kakfa 作为 Python 源 (pyspark)

apache-spark - docker 停止 spark 容器退出

scala - 通过文本文件序列化保留Spark分区顺序

ssl - Kafka 和 Zookeeper TLS

java - 如何将时间戳附加到 rdd 并推送到 elasticsearch

apache-spark - Spark Streaming中广播变量空指针异常

java - 在正在运行的 Spark Streaming 作业中动态修改窗口长度或创建多个窗口

apache-spark - 云环境中的 Spark 数据局部性

apache-kafka - 卡夫卡流 - CommitFailedException : Commit cannot be completed since the group has already rebalanced and assigned the partitions to another member

linux - 无法使用 dcos 节点 ssh --master-proxy --leader sshing 进入 dcos 节点