apache-spark - Spark : commit kafka offsets on end of batch

标签 apache-spark apache-kafka

版本:

  • Spark 2.2
  • 卡夫卡 0.11

根据 documentation在 kafka 中提交偏移量我应该使用:

stream.asInstanceOf[CanCommitOffsets].commitAsync(offsetRanges)

结果偏移量仅在下一批开始时提交。这会导致“持续”滞后。

是否有任何解决方法可以在当前批处理结束时提交偏移量(但仍使用相同的 kafka 组进行偏移量)?

滞后监控示例: enter image description here

最佳答案

Is there any workaround to commit offsets at the end of current batch

不是通过 commitAsync API。该方法调用所做的是将要提交的偏移量排队,然后在 DirectKafkaInputDStream.compute 期间执行异步提交:

override def compute(validTime: Time): Option[KafkaRDD[K, V]] = {
  val untilOffsets = clamp(latestOffsets())

  // Create KafkaRDD and other irrelevant code

  currentOffsets = untilOffsets
  commitAll()
  Some(rdd)
}

commitAll 只是轮询由 commitAsync 填充的队列:

protected def commitAll(): Unit = {
  val m = new ju.HashMap[TopicPartition, OffsetAndMetadata]()
  var osr = commitQueue.poll()
  while (null != osr) {
    val tp = osr.topicPartition
    val x = m.get(tp)
    val offset = if (null == x) { osr.untilOffset } else { Math.max(x.offset, osr.untilOffset) }
    m.put(tp, new OffsetAndMetadata(offset))
    osr = commitQueue.poll()
  }
  if (!m.isEmpty) {
    consumer.commitAsync(m, commitCallback.get)
  }
}

因此,不幸的是,如果您想将偏移量作为事务提交,您将不得不将它们单独存储在您自己的存储中,而不是使用 Kafka 内置的偏移量提交跟踪。

关于apache-spark - Spark : commit kafka offsets on end of batch,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/47990842/

相关文章:

json - PySpark:类型错误:col 应该是 Column

apache-spark - Apache Spark与Apache Hadoop

python - 带有 Python 消费者的 Docker Kafka

apache-kafka - Kafka 生产者在网络分区期间如何表现?

apache-kafka - 带有选择器的 Apache Kafka 客户端?

java - 使用汇合反序列化Apache Flink中的Avro

xml - 在 spark 中过滤数据框并保存为 avro

apache-spark - 在 Zeppelin 0.71 上运行的 Dataproc Spark 看不到在 Zeppelin 0.62 中创建的 Hive 表

java - 如何配置 ConfluenceRegistry 以使用不同的 Avro 架构源?

azure - 如何在Azure Spark集群上通过Apache Livy设置spark.driver.extraClassPath?