scala - 如何在给定多个条件的情况下对 Spark 数据帧执行 "Lookup"操作

标签 scala apache-spark dataframe apache-spark-sql lookup

我是 Spark 的新手(我的版本是 1.6.0),现在我正在尝试解决下面给出的问题:

假设有两个源文件:

  • 第一个(简称 A)是一个大列,包含名为 A1、B1、C1 的列和其他 80 列。里面有230K条记录。
  • 第二个(简称 B)是一个小型查找表,其中包含名为 A2、B2、C2 和 D2 的列。里面有250条记录。

现在我们需要在 A 中插入一个新列,逻辑如下:

  • 首先在B中查找A1、B1、C1(对应列为A2、B2、C2),如果成功,则返回D2作为新添加列的值。如果什么也没找到...
  • 然后在B中查找A1、B1。如果成功,则返回D2。如果什么也没找到...
  • 设置默认值“NA”

我已经读入文件并将它们转换为数据帧。对于第一种情况,我通过左外部将它们连接在一起得到了结果。但下一步我找不到好的方法。

我当前的尝试是通过使用不太严格的条件连接 A 和 B 来构建新的数据框。但是我不知道如何从另一个数据帧更新当前数据帧。或者还有其他更直观、更有效的方法来解决整个问题吗?

感谢您的所有回答。

----------------------------20160309更新---------------------------- -----------------

终于接受了@mlk的回答。仍然非常感谢 @zero323 对 UDF 与 join 的精彩评论,Tungsten 代码生成确实是我们现在面临的另一个问题。但由于我们需要进行多次查找,并且每次查找平均 4 个条件,因此前一种解决方案更合适......

最终的解决方案看起来像下面的代码片段:

```
import sqlContext.implicits._
import com.github.marklister.collections.io._

case class TableType(A: String, B: String, C: String, D: String)
val tableBroadcast = sparkContext.broadcast(CsvParser(TableType).parseFile("..."))
val lkupD = udf {
  (aStr: String, bStr: String, cStr: String) =>
    tableBroadcast.value.find {
      case TableType(a, b, c, _) =>
        (a == aStr && b == bStr && c == cStr) ||
        (a == aStr && b == bStr)
    }.getOrElse(TableType("", "", "", "NA")).D
}
df = df.withColumn("NEW_COL", lkupD($"A", $"B", $"C"))
```

最佳答案

由于 B 很小,我认为最好的方法是使用广播变量和用户定义的函数。

// However you get the data...
case class BType( A2: Int, B2: Int, C2 : Int, D2 : String)
val B = Seq(BType(1,1,1,"B111"), BType(1,1,2, "B112"), BType(2,0,0, "B200"))

val A = sc.parallelize(Seq((1,1,1, "DATA"), (1,1,2, "DATA"), (2, 0, 0, "DATA"), (2, 0, 1, "NONE"), (3, 0, 0, "NONE"))).toDF("A1", "B1", "C1", "OTHER")


// Broadcast B so all nodes have a copy of it.
val Bbradcast = sc.broadcast(B)

// A user defined function to find the value for D2. This I'm sure could be improved by whacking it into maps. But this is a small example. 
val findD = udf {( a: Int, b : Int, c: Int) => Bbradcast.value.find(x => x.A2 == a && x.B2 == b && x.C2 == c).getOrElse(Bbradcast.value.find(x => x.A2 == a && x.B2 == b).getOrElse(BType(0,0,0,"NA"))).D2 }

// Use the UDF in a select
A.select($"A1", $"B1", $"C1", $"OTHER", findD($"A1", $"B1", $"C1").as("D")).show

关于scala - 如何在给定多个条件的情况下对 Spark 数据帧执行 "Lookup"操作,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/35719328/

相关文章:

Java 使用 Apache Spark 指定架构从 json 文件读取

python - 如何使用 Python 清理 Excel 文件中的文本?

python - 文件不是 zip 文件错误,但我没有打开 zip 文件

r - 创建一个非正统的虚拟变量

scala - Aux 模式在 Scala 中完成了什么?

scala - play框架中的错误处理

Scala 映射和/或 groupby 函数

scala - 在 akka 2.x 中,root actor 是否由其他人监督?

apache-spark - sparksql 删除配置单元表

apache-spark - Spark 2.3 java.lang.NoSuchMethodError : io.netty.buffer.PooledByteBufAllocator.metric