我有 Spark 流应用程序,它运行一天结束并消耗上游应用程序发送的 kafka 事件。当前上游应用程序整天不断推送新数据,我的消费者最终会消耗它。我想根据截止时间(例如每天下午 6 点)限制消耗的事件。有没有办法指定截止时间来限制基于截止时间(例如 kafka 事件时间戳或其他内容)消耗的事件。下面是消费者代码
KafkaUtils.createDirectStream[String, String](ssc, PreferConsistent, Subscribe[String, String](topicSet, kafkaParams))
最佳答案
您可以根据时间戳或时间或任何字段过滤处理过程中的事件。例如,假设您的事件是 JSON,并且它有一个名为 hour 的字段,它是事件时间的小时值。您可以轻松地仅选择 6 点之前创建的事件,如下所示。
directStream.foreachRDD { rdd =>
val eventDfRDD = rdd.filter(record => {
val option = JSON.parseFull(record).get.asInstanceOf[Map[String, String]]
option.get("hour") < 1800
})
}
关于scala - 来自kafka的spark Streaming如何指定轮询事件的截止时间,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/54151044/