scala - Spark 流如果(!rdd.partitions.isEmpty)不工作

标签 scala apache-kafka spark-streaming kafka-consumer-api dstream

我正在尝试从 kafka 服务器创建一个 dStream,然后对该流进行一些转换。如果流为空 (if(!rdd.partitions.isEmpty)),我已经包含了一个 catch;然而,即使没有事件发布到 kafka 主题,也永远不会到达 else 语句。

val stream = KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder](ssc, kafkaParams, topics)

stream.foreachRDD { rdd =>
    if(!rdd.partitions.isEmpty) {

        val message = rdd.map((x$2) => x$2._2).collect().toList.map(parser)

        val val = message(0)

    } else println("empty stream...")

    ssc.start() 
    ssc.awaitTermination()

}

在使用 KafkaUtils.createDirectStream 而不是 createStream 时,我应该使用替代语句来检查流是否为空吗?

最佳答案

使用 RDD.isEmpty 而不是 RDD.partitions.isEmpty ,它会添加一个检查以查看底层分区是否真的有元素:

stream.foreachRDD { rdd =>
  if(!rdd.isEmpty) {
    // Stuff
  }
}

RDD.partitions.isEmpty 不起作用的原因是 RDD 中存在一个分区,但该分区本身是空的。但是从 partitions 的角度来看,它是一个 Array[Partition],它不是空的。

关于scala - Spark 流如果(!rdd.partitions.isEmpty)不工作,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/40385468/

相关文章:

scala - 为什么 Scala 编译器拒绝没有前导空格的函数体?

apache-spark - 开发人员之间是否可以共享/访问hdfs?

scala - Scala 中的不可变数据结构

scala - spark DataFrame "as"方法的使用

postgresql - 查询模式下的kafka jdbc source connector错误

java - Apache Kafka - 关于主题/分区的 KafkaStream

apache-spark - 将数据从 dstream 写入 parquet

apache-spark - 如何在结构化流中将数据帧转换为 rdds?

apache-spark - 如何调整 "spark.rpc.askTimeout"?

scala - 将 Scala String 转换为 StringContext 并虚拟转发引用