map 内的Scala Spark过滤器

标签 scala apache-spark

我想在映射 RDD 时有效地过滤它。这可能吗?

这是我想做的伪代码:

for element in rdd:
    val opt = f(element)
    if (opt.nonEmpty) add_pair(opt.get, element)

这是在 Scala Spark 中实现伪代码的一种 hacky 方法:

rdd.map(element => (
    f(element).getOrElse(99),
    element
)).filter(tuple => tuple._1 != 99)

我无法找到干净的语法来执行此操作,因此我首先映射了所有元素,然后过滤掉了我不想要的元素。请注意,可能昂贵的调用 f(element) 只计算一次。如果我要在映射之前过滤元素(看起来更干净),那么我最终会调用 f 两次,这是低效的。

请不要将此标记为重复。虽然有类似的问题,但他们都没有真正回答这个问题。例如,this潜在重复将调用 f 两次,效率低下,因此不回答此问题。

最佳答案

你可以只使用 flatMap:

//let's say your f returns Some(x*2) for even number and None for odd
def f(n: Int): Option[Int] = if (n % 2) Some(n*2) else None 

val rdd = sc.parallelize(List(1,2,3,4))
rdd.flatMap(f) // 4,8

// rdd.flatMap(f) or rdd.flatMap(f(_)) or rdd.flatMap(e => f(e))

如果您需要进一步传递元组并进行过滤,则只需使用嵌套 map:

rdd.flatMap(e => f(e).map((_,e))) //(4,2),(8,4)

关于 map 内的Scala Spark过滤器,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/55839082/

相关文章:

apache-spark - PySpark 的第一个和最后一个函数一次完成一个分区

scala - Scala:构建特征和类的复杂层次结构

javascript - js.ThisFunction0的正确用法

java - 在 Java 中实例化 scala.Int

scala - 如何在Spark Shell中将s3与Apache Spark 2.2一起使用

python - 理解 Python 中 Spark MLlib 的 LinearRegressionWithSGD 示例有问题吗?

Java 8 - 将 Kairosdb 中的多个对象列表保存到 csv 文件中

scala - 在 Scala 中重用函数结果的好方法是什么

regex - 如何正确使用scala.util.matching.Regex?

scala - 使用 Spark 处理 txt 文件