scala - 来自kafka的spark Streaming如何指定轮询事件的截止时间

标签 scala apache-spark apache-kafka spark-streaming kafka-consumer-api

我有 Spark 流应用程序,它运行一天结束并消耗上游应用程序发送的 kafka 事件。当前上游应用程序整天不断推送新数据,我的消费者最终会消耗它。我想根据截止时间(例如每天下午 6 点)限制消耗的事件。有没有办法指定截止时间来限制基于截止时间(例如 kafka 事件时间戳或其他内容)消耗的事件。下面是消费者代码

  KafkaUtils.createDirectStream[String, String](ssc, PreferConsistent, Subscribe[String, String](topicSet, kafkaParams))

最佳答案

您可以根据时间戳或时间或任何字段过滤处理过程中的事件。例如,假设您的事件是 JSON,并且它有一个名为 hour 的字段,它是事件时间的小时值。您可以轻松地仅选择 6 点之前创建的事件,如下所示。

directStream.foreachRDD { rdd =>
        val eventDfRDD = rdd.filter(record => {
          val option = JSON.parseFull(record).get.asInstanceOf[Map[String, String]]
          option.get("hour") < 1800
        })
      }

关于scala - 来自kafka的spark Streaming如何指定轮询事件的截止时间,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/54151044/

相关文章:

scala - 如果函数可以是不同的类型,单子(monad)规则将如何应用

java - 如何将 FP-Growth 模型FrequentItemSet 结果保存到文本文件中?

python - 如何使用 Python 在 Kafka 中生成墓碑 Avro 记录?

apache-spark - SBT 组装 jar 排除

apache-kafka - KafkaRDD scala 最小示例

apache-kafka - 单个消费者可以从 kafka 主题的多个分区中读取吗?

java - 使用 JMeter/Java 上下文的 Scala JSR223 脚本

scala - Spark 2.2 非法模式组件 : XXX java. lang.IllegalArgumentException:非法模式组件:XXX

Scala 构造函数重载?

apache-spark - 具有多个执行器的 Spark 独立配置