scala - 左反加入Spark?

标签 scala apache-spark

我已经定义了两个这样的表:

 val tableName = "table1"
    val tableName2 = "table2"

    val format = new SimpleDateFormat("yyyy-MM-dd")
      val data = List(
        List("mike", 26, true),
        List("susan", 26, false),
        List("john", 33, true)
      )
    val data2 = List(
        List("mike", "grade1", 45, "baseball", new java.sql.Date(format.parse("1957-12-10").getTime)),
        List("john", "grade2", 33, "soccer", new java.sql.Date(format.parse("1978-06-07").getTime)),
        List("john", "grade2", 32, "golf", new java.sql.Date(format.parse("1978-06-07").getTime)),
        List("mike", "grade2", 26, "basketball", new java.sql.Date(format.parse("1978-06-07").getTime)),
        List("lena", "grade2", 23, "baseball", new java.sql.Date(format.parse("1978-06-07").getTime))
      )

      val rdd = sparkContext.parallelize(data).map(Row.fromSeq(_))
      val rdd2 = sparkContext.parallelize(data2).map(Row.fromSeq(_))
      val schema = StructType(Array(
        StructField("name", StringType, true),
        StructField("age", IntegerType, true),
        StructField("isBoy", BooleanType, false)
      ))
    val schema2 = StructType(Array(
        StructField("name", StringType, true),
        StructField("grade", StringType, true),
        StructField("howold", IntegerType, true),
        StructField("hobby", StringType, true),
        StructField("birthday", DateType, false)
      ))

      val df = sqlContext.createDataFrame(rdd, schema)
      val df2 = sqlContext.createDataFrame(rdd2, schema2)
      df.createOrReplaceTempView(tableName)
      df2.createOrReplaceTempView(tableName2)

我正在尝试构建查询以从 table1 返回 table2 中没有匹配行的行。
我尝试使用此查询来做到这一点:
Select * from table1 LEFT JOIN table2 ON table1.name = table2.name AND table1.age = table2.howold AND table2.name IS NULL AND table2.howold IS NULL

但这只是给了我 table1 中的所有行:

List({"name":"john","age":33,"isBoy":true}, {"name":"susan","age":26,"isBoy":false}, {"name":"mike","age":26,"isBoy":true})



如何有效地在 Spark 中进行这种类型的连接?

我正在寻找 SQL 查询,因为我需要能够指定要在两个表之间进行比较的列,而不仅仅是像在其他推荐问题中那样逐行比较。就像使用减法,除了等。

最佳答案

您可以使用“左反”连接类型 - 使用 DataFrame API 或 SQL(DataFrame API 支持 SQL 支持的所有内容,包括您需要的任何连接条件):

数据帧 API:

df.as("table1").join(
  df2.as("table2"),
  $"table1.name" === $"table2.name" && $"table1.age" === $"table2.howold",
  "leftanti"
)

查询语句:
sqlContext.sql(
  """SELECT table1.* FROM table1
    | LEFT ANTI JOIN table2
    | ON table1.name = table2.name AND table1.age = table2.howold
  """.stripMargin)

注意 :还值得注意的是,使用元组和隐式 toDF 创建示例数据的方法更短、更简洁,无需单独指定架构。方法,然后在需要的地方“修复”自动推断的模式:
import spark.implicits._
val df = List(
  ("mike", 26, true),
  ("susan", 26, false),
  ("john", 33, true)
).toDF("name", "age", "isBoy")

val df2 = List(
  ("mike", "grade1", 45, "baseball", new java.sql.Date(format.parse("1957-12-10").getTime)),
  ("john", "grade2", 33, "soccer", new java.sql.Date(format.parse("1978-06-07").getTime)),
  ("john", "grade2", 32, "golf", new java.sql.Date(format.parse("1978-06-07").getTime)),
  ("mike", "grade2", 26, "basketball", new java.sql.Date(format.parse("1978-06-07").getTime)),
  ("lena", "grade2", 23, "baseball", new java.sql.Date(format.parse("1978-06-07").getTime))
).toDF("name", "grade", "howold", "hobby", "birthday").withColumn("birthday", $"birthday".cast(DateType))

关于scala - 左反加入Spark?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/43186888/

相关文章:

python - Pyspark:如何读取谷歌存储桶中的 .csv 文件?

scala - 为什么Scala除了零参数方法还需要无参数方法?

scala - 如何在一个库中支持多个 Scala 版本

scala - 分组并在 spark sql 中获取第一个值

apache-spark - 无法将scala.collection.mutable.WrappedArray $ ofRef强制转换为Integer

scala - Spark如何与CPython互操作

scala - 找不到用于测试的现有数据 Scala Specs2

scala - scala_home homebrew 在 OSX 上安装在哪里?

scala - 为什么 Haskell 不需要蹦床?

java - scala 中的模式匹配。当参数表现出多态性或者是子类时,会有什么行为::