scala - Spark - 在不打开流的情况下获取 Kafka 的最早和最新偏移量

标签 scala apache-spark apache-kafka

我目前正在使用 spark-streaming-kafka-0-10_2.11将我的 spark 应用程序与 kafka 队列连接起来。对于 Streams,一切正常。但是对于特定场景,我只需要一次 kafka 队列的全部内容 - 为此我得到了更好地使用 KafkaUtils.createRDD 的建议。 ( SparkStreaming: Read Kafka Stream and provide it as RDD for further processing )

然而对于 spark-streaming-kafka-0-10_2.11我无法弄清楚如何为我的 Kafka 主题获取最早和最新的偏移量,这是创建偏移范围所需的我必须处理的 createRDD方法。

在不打开流的情况下获得这些偏移量的推荐方法是什么?任何帮助将不胜感激。

最佳答案

在阅读了几次讨论后,我能够从特定分区获得最早或最新的偏移量:

val consumer = new SimpleConsumer(host,port,timeout,bufferSize,"offsetfetcher");
val topicAndPartition = new TopicAndPartition(topic, initialPartition)
val request = OffsetRequest(Map(topicAndPartition -> PartitionOffsetRequestInfo(OffsetRequest.EarliestTime,1)))
val offsets = consumer.getOffsetsBefore(request).partitionErrorAndOffsets(topicAndPartition).offsets

return offsets.head

但是,如何在 kafka_consumer.sh CLI 命令中复制“from_beginning”的行为是我不知道的 KafkaUtils.createRDD 方法。

关于scala - Spark - 在不打开流的情况下获取 Kafka 的最早和最新偏移量,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/44755649/

相关文章:

java - 如何从可变 Java 映射构建 Scala 不可变映射?

apache-spark - Spark mllib 打乱数据

postgresql - Kafka Connect JDBC Sink quote.sql.identifiers 不工作

apache-kafka - 使用 Kafka 将 Web 层与业务逻辑代码分离

apache-kafka - 生产中的Kafka UI监控工具

java - 按两个条件拆分字符串

java - 使用 Scala 解析 Java 源代码

scala - 有没有办法忽略不匹配的情况?

python - 当 pyspark 中两列的值是相同组合时删除行

apache-spark - datastax - 无法在 Spark 提交上连接到 DSE 资源管理器