scala - Spark 数据集 API - 加入

标签 scala apache-spark apache-spark-sql apache-spark-dataset

我正在尝试使用 Spark Dataset API 但我在做一个简单的连接时遇到了一些问题。

假设我有两个带字段的数据集:date | value ,然后在 DataFrame 的情况下我的加入看起来像:

val dfA : DataFrame
val dfB : DataFrame

dfA.join(dfB, dfB("date") === dfA("date") )

然而对于 Dataset.joinWith方法,但同样的方法不起作用:
val dfA : Dataset
val dfB : Dataset

dfA.joinWith(dfB, ? )
.joinWith 所需的参数是什么? ?

最佳答案

使用 joinWith您首先必须创建一个 DataSet ,而且很可能是其中两个。创建一个 DataSet ,您需要创建一个与您的架构匹配的案例类并调用 DataFrame.as[T]哪里T是你的案例类。所以:

case class KeyValue(key: Int, value: String)
val df = Seq((1,"asdf"),(2,"34234")).toDF("key", "value")
val ds = df.as[KeyValue]
// org.apache.spark.sql.Dataset[KeyValue] = [key: int, value: string]

您还可以跳过案例类并使用元组:
val tupDs = df.as[(Int,String)]
// org.apache.spark.sql.Dataset[(Int, String)] = [_1: int, _2: string]

然后,如果您有另一个案例类/DF,就像这样说:
case class Nums(key: Int, num1: Double, num2: Long)
val df2 = Seq((1,7.7,101L),(2,1.2,10L)).toDF("key","num1","num2")
val ds2 = df2.as[Nums]
// org.apache.spark.sql.Dataset[Nums] = [key: int, num1: double, num2: bigint]

然后,而 join 的语法和 joinWith相似,结果不同:
df.join(df2, df.col("key") === df2.col("key")).show
// +---+-----+---+----+----+
// |key|value|key|num1|num2|
// +---+-----+---+----+----+
// |  1| asdf|  1| 7.7| 101|
// |  2|34234|  2| 1.2|  10|
// +---+-----+---+----+----+

ds.joinWith(ds2, df.col("key") === df2.col("key")).show
// +---------+-----------+
// |       _1|         _2|
// +---------+-----------+
// | [1,asdf]|[1,7.7,101]|
// |[2,34234]| [2,1.2,10]|
// +---------+-----------+

如您所见,joinWith将对象作为元组的一部分完好无损,而 join将列展平为单个命名空间。 (在上述情况下会导致问题,因为列名“key”重复了。)

奇怪的是,我必须使用 df.col("key")df2.col("key")为加盟创造条件dsds2 -- 如果你只使用 col("key")在任何一侧它都不起作用,并且 ds.col(...)不存在。使用原版df.col("key")然而,有诀窍。

关于scala - Spark 数据集 API - 加入,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/36462674/

相关文章:

scala - Spark通用案例类到结构类型

scala - 为什么这个递归函数不起作用

apache-spark - Spark 纱:问的容器太多

scala - 在 'spark.sql.hive.metastore.jars' 中设置属性 'spark-defaults.conf' 后出现异常

scala - 简洁明了的 Scala HTTP 客户端库

scala - Salat 没有反序列化映射到 Option[Set[A]] 的集合

java - "main"java.lang.ClassCastException : [Lscala. Tuple2;无法在 Spark MLlib LDA 中转换为 scala.Tuple2

scala - Spark/Scala - 项目从 IntelliJ 运行良好,但在 SBT 时引发错误

python - pyspark,比较数据框中的两行

apache-spark - RDD/Dataframe 的分区位置