我目前开始学习在 Scala 中使用 Spark。我正在解决的问题需要我读取一个文件,将每一行拆分为某个字符,然后过滤其中一列与谓词匹配的行,最后删除一列。因此,基本的、简单的实现是一个映射,然后是一个过滤器,然后是另一个映射。
这意味着要浏览该集合 3 次,这对我来说似乎很不合理。所以我尝试用一个collect(将部分函数作为参数的collect)替换它们。令我惊讶的是,这使它运行速度慢得多。我在本地尝试了常规的 Scala 集合;正如预期的那样,后一种方法要快得多。
那这是为什么呢?我的想法是,map和filter和map不是顺序应用的,而是混合到一个操作中;换句话说,当一个操作强制评估时,将检查列表中的每个元素并执行待处理的操作。是对的吗 ?但即便如此,为什么收集性能这么差?
编辑:一个代码示例来展示我想要做的事情:
天真的方式:
sc.textFile(...).map(l => {
val s = l.split(" ")
(s(0), s(1))
}).filter(_._2.contains("hello")).map(_._1)
采集方式:
sc.textFile(...).collect {
case s if(s.split(" ")(0).contains("hello")) => s(0)
}
最佳答案
答案在于collect
的实现:
/**
* Return an RDD that contains all matching values by applying `f`.
*/
def collect[U: ClassTag](f: PartialFunction[T, U]): RDD[U] = withScope {
val cleanF = sc.clean(f)
filter(cleanF.isDefinedAt).map(cleanF)
}
如您所见,它与 filter
->map
的顺序相同,但在您的情况下效率较低。
在 scala 中,PartialFunction
的 isDefinedAt
和 apply
方法都会评估 if
部分。
因此,在您的“collect”示例中,split
将为每个输入元素执行两次。
关于Spark 中 Scala 的收集效率低下?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/36157253/