我的问题不是 [ Joining Spark Dataframes with "isin" operator 的重复问题。我的问题是关于“不在”,而不是“在”。这是不同的!
我有两个Dataset
s:
-
userProfileDataset
:Dataset[UserProfile]
-
jobModelsDataset
:Dataset[JobModel]
案例类UserProfile
定义为
case class UserProfile(userId: Int, visitedJobIds: Array[Int])
和案例类别 JobModel
定义为
case class JobModel(JobId: Int, Model: Map[String, Double])
我还创建了两个对象( UserProfileFieldNames
和 JobModelFieldNames
),其中包含这两个案例类的字段名称。
我的目标是,对于 userProfileDataset
中的每个用户,找到JobModel.JobId
不包含在 UserProfile.visitedJobIds
中。
如何做到这一点?
我考虑过使用 crossJoin
然后filter
。它可能会起作用。有没有更直接、更有效的方法?
我尝试过以下方法,但都不起作用:
val result = userProfileDataset.joinWith(jobModelsDataset,
!userProfileDataset.col(UserProfileFieldNames.visitedJobIds).contains(jobModelsDataset.col(JobModelFieldNames.jobId)),
"left_outer"
)
它导致:
Exception in thread "main" org.apache.spark.sql.AnalysisException: cannot resolve 'contains(
_1
.visitedJobIds
, CAST(_2
.JobId
AS STRING))' due to data type mismatch: argument 1 requires string type, however, '_1
.visitedJobIds
' is of array type.;;
难道是因为 contains
方法只能用于测试一个字符串是否包含另一个字符串?
以下条件也不起作用:
!jobModelsDataset.col(JobModelFieldNames.jobId)
.isin(userProfileDataset.col(UserProfileFieldNames.visitedJobIds))
它导致:
Exception in thread "main" org.apache.spark.sql.AnalysisException: cannot resolve '(
_2
.JobId
IN (_1
.visitedJobIds
))' due to data type mismatch: Arguments must be same type but were: IntegerType != ArrayType(IntegerType,false);; 'Join LeftOuter, NOT _2#74.JobId IN (_1#73.visitedJobIds)
最佳答案
如果唯一的job id数量不是太多,那么您可以按如下方式收集和广播它们
val jobIds = jobModelsDataset.map(_.JobId).distinct.collect().toSeq
val broadcastedJobIds = spark.sparkContext.broadcast(jobIds)
要将此广播序列与 visitedJobIds
列进行比较,您可以创建一个 UDF
val notVisited = udf((visitedJobs: Seq[Int]) => {
broadcastedJobIds.value.filterNot(visitedJobs.toSet)
})
val df = userProfileDataset.withColumn("jobsToDo", notVisited($"visitedJobIds"))
使用 jobIds = 1,2,3,4,5
和示例数据帧进行测试
+------+---------------+
|userId| visitedJobIds|
+------+---------------+
| 1| [1, 2, 3]|
| 2| [3, 4, 5]|
| 3|[1, 2, 3, 4, 5]|
+------+---------------+
将给出最终数据框
+------+---------------+--------+
|userId| visitedJobIds|jobsToDo|
+------+---------------+--------+
| 1| [1, 2, 3]| [4, 5]|
| 2| [3, 4, 5]| [1, 2]|
| 3|[1, 2, 3, 4, 5]| []|
+------+---------------+--------+
关于scala - Spark : How to join two `Dataset` s A and B with the condition that an ID array column of A does NOT contain the ID column of B?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/48636095/