版本:
- Spark 2.2
- 卡夫卡 0.11
根据 documentation在 kafka 中提交偏移量我应该使用:
stream.asInstanceOf[CanCommitOffsets].commitAsync(offsetRanges)
结果偏移量仅在下一批开始时提交。这会导致“持续”滞后。
是否有任何解决方法可以在当前批处理结束时提交偏移量(但仍使用相同的 kafka 组进行偏移量)?
最佳答案
Is there any workaround to commit offsets at the end of current batch
不是通过 commitAsync
API。该方法调用所做的是将要提交的偏移量排队,然后在 DirectKafkaInputDStream.compute
期间执行异步提交:
override def compute(validTime: Time): Option[KafkaRDD[K, V]] = {
val untilOffsets = clamp(latestOffsets())
// Create KafkaRDD and other irrelevant code
currentOffsets = untilOffsets
commitAll()
Some(rdd)
}
commitAll
只是轮询由 commitAsync
填充的队列:
protected def commitAll(): Unit = {
val m = new ju.HashMap[TopicPartition, OffsetAndMetadata]()
var osr = commitQueue.poll()
while (null != osr) {
val tp = osr.topicPartition
val x = m.get(tp)
val offset = if (null == x) { osr.untilOffset } else { Math.max(x.offset, osr.untilOffset) }
m.put(tp, new OffsetAndMetadata(offset))
osr = commitQueue.poll()
}
if (!m.isEmpty) {
consumer.commitAsync(m, commitCallback.get)
}
}
因此,不幸的是,如果您想将偏移量作为事务提交,您将不得不将它们单独存储在您自己的存储中,而不是使用 Kafka 内置的偏移量提交跟踪。
关于apache-spark - Spark : commit kafka offsets on end of batch,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/47990842/