scala - Spark : How to join two `Dataset` s A and B with the condition that an ID array column of A does NOT contain the ID column of B?

标签 scala apache-spark join apache-spark-dataset

我的问题不是 [ Joining Spark Dataframes with "isin" operator 的重复问题。我的问题是关于“不在”,而不是“在”。这是不同的!

我有两个Dataset s:

  • userProfileDataset :Dataset[UserProfile]
  • jobModelsDataset :Dataset[JobModel]

案例类UserProfile定义为

case class UserProfile(userId: Int, visitedJobIds: Array[Int])

和案例类别 JobModel定义为

case class JobModel(JobId: Int, Model: Map[String, Double])

我还创建了两个对象( UserProfileFieldNamesJobModelFieldNames ),其中包含这两个案例类的字段名称。

我的目标是,对于 userProfileDataset 中的每个用户,找到JobModel.JobId不包含在 UserProfile.visitedJobIds如何做到这一点?

我考虑过使用 crossJoin然后filter 。它可能会起作用。有没有更直接、更有效的方法?


我尝试过以下方法,但都不起作用:

val result = userProfileDataset.joinWith(jobModelsDataset,
      !userProfileDataset.col(UserProfileFieldNames.visitedJobIds).contains(jobModelsDataset.col(JobModelFieldNames.jobId)),
      "left_outer"
    )

它导致:

Exception in thread "main" org.apache.spark.sql.AnalysisException: cannot resolve 'contains(_1.visitedJobIds, CAST(_2.JobId AS STRING))' due to data type mismatch: argument 1 requires string type, however, '_1.visitedJobIds' is of array type.;;

难道是因为 contains方法只能用于测试一个字符串是否包含另一个字符串?

以下条件也不起作用:

!jobModelsDataset.col(JobModelFieldNames.jobId)
  .isin(userProfileDataset.col(UserProfileFieldNames.visitedJobIds))

它导致:

Exception in thread "main" org.apache.spark.sql.AnalysisException: cannot resolve '(_2.JobId IN (_1.visitedJobIds))' due to data type mismatch: Arguments must be same type but were: IntegerType != ArrayType(IntegerType,false);; 'Join LeftOuter, NOT _2#74.JobId IN (_1#73.visitedJobIds)

最佳答案

如果唯一的job id数量不是太多,那么您可以按如下方式收集和广播它们

val jobIds = jobModelsDataset.map(_.JobId).distinct.collect().toSeq
val broadcastedJobIds = spark.sparkContext.broadcast(jobIds)

要将此广播序列与 visitedJobIds 列进行比较,您可以创建一个 UDF

val notVisited = udf((visitedJobs: Seq[Int]) => { 
  broadcastedJobIds.value.filterNot(visitedJobs.toSet)
})

val df = userProfileDataset.withColumn("jobsToDo", notVisited($"visitedJobIds"))

使用 jobIds = 1,2,3,4,5 和示例数据帧进行测试

+------+---------------+
|userId|  visitedJobIds|
+------+---------------+
|     1|      [1, 2, 3]|
|     2|      [3, 4, 5]|
|     3|[1, 2, 3, 4, 5]|
+------+---------------+

将给出最终数据框

+------+---------------+--------+
|userId|  visitedJobIds|jobsToDo|
+------+---------------+--------+
|     1|      [1, 2, 3]|  [4, 5]|
|     2|      [3, 4, 5]|  [1, 2]|
|     3|[1, 2, 3, 4, 5]|      []|
+------+---------------+--------+

关于scala - Spark : How to join two `Dataset` s A and B with the condition that an ID array column of A does NOT contain the ID column of B?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/48636095/

相关文章:

r - 合并来自不同数据框的列

mysql - 在mysql中的同一个表中加入子查询

c# - Linq 左外连接与自定义比较器

scala - 到 StringOps 的隐式转换未在隐式 val 函数体内应用

scala - RxScala Observable 永远不会运行

python - 如何在 Spark Dataframe 中按组/分区重命名列?

scala - 如何使用平面图分解数据集?

hadoop - 使用 --master yarn-cluster : issue with spark-assembly 运行 spark-submit

java - 如何将 Scala 特征转换为 Java 类?

scala - Actor 可以在特定条件下读取消息吗?