Spark 中 Scala 的收集效率低下?

标签 scala apache-spark

我目前开始学习在 Scala 中使用 Spark。我正在解决的问题需要我读取一个文件,将每一行拆分为某个字符,然后过滤其中一列与谓词匹配的行,最后删除一列。因此,基本的、简单的实现是一个映射,然后是一个过滤器,然后是另一个映射。

这意味着要浏览该集合 3 次,这对我来说似乎很不合理。所以我尝试用一​​个collect(将部分函数作为参数的collect)替换它们。令我惊讶的是,这使它运行速度慢得多。我在本地尝试了常规的 Scala 集合;正如预期的那样,后一种方法要快得多。

那这是为什么呢?我的想法是,map和filter和map不是顺序应用的,而是混合到一个操作中;换句话说,当一个操作强制评估时,将检查列表中的每个元素并执行待处理的操作。是对的吗 ?但即便如此,为什么收集性能这么差?

编辑:一个代码示例来展示我想要做的事情:

天真的方式:

sc.textFile(...).map(l => {
  val s = l.split(" ") 
  (s(0), s(1))
}).filter(_._2.contains("hello")).map(_._1)

采集方式:

sc.textFile(...).collect {
  case s if(s.split(" ")(0).contains("hello")) => s(0)
}

最佳答案

答案在于collect的实现:

/**
 * Return an RDD that contains all matching values by applying `f`.
 */
def collect[U: ClassTag](f: PartialFunction[T, U]): RDD[U] = withScope {
  val cleanF = sc.clean(f)
  filter(cleanF.isDefinedAt).map(cleanF)
}

如您所见,它与 filter->map 的顺序相同,但在您的情况下效率较低。

在 scala 中,PartialFunctionisDefinedAtapply 方法都会评估 if 部分。

因此,在您的“collect”示例中,split 将为每个输入元素执行两次。

关于Spark 中 Scala 的收集效率低下?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/36157253/

相关文章:

Scala 集合元组而不是元组集合

python - 如何将键值对减少为键和值列表?

apache-spark - PySpark 中的 CrossValidator 是否分发执行?

apache-spark - Spark saveAsTable 的位置位于 s3 存储桶的根本原因 NullPointerException

scala - 做某事最多 N 次或直到在 Scala 中满足条件

Scala mergeMsg 与 def

scala - 优化对不可变对象(immutable对象)的冗余调用

Scala - 如何创建可用于类型构造函数的单个隐式

scala - 如何计算流数据集中数组字段中元素的数量(除了一个)?

apache-spark - Spark流:文本数据源仅支持一列