apache-spark - 如何在 spark 数据帧中的 ISIN 运算符中传递数据帧

标签 apache-spark hadoop apache-spark-sql bigdata

我想将具有一组值的数据框传递给新查询,但它失败了。

1) 在这里我选择了特定的列,这样我就可以在下一个查询中通过 ISIN

scala> val managerIdDf=finalEmployeesDf.filter($"manager_id"!==0).select($"manager_id").distinct
managerIdDf: org.apache.spark.sql.DataFrame = [manager_id: bigint]

2) 我的示例数据:

 scala> managerIdDf.show
    +----------+                                                                    
    |manager_id|
    +----------+
    |     67832|
    |     65646|
    |      5646|
    |     67858|
    |     69062|
    |     68319|
    |     66928|
    +----------+

3) 当我执行最终查询时它失败了:

scala> finalEmployeesDf.filter($"emp_id".isin(managerIdDf)).select("*").show
java.lang.RuntimeException: Unsupported literal type class org.apache.spark.sql.DataFrame [manager_id: bigint]  

我也试过转换为 ListSeq 但它只产生一个错误。如下所示,当我尝试转换为 Seq 并重新运行查询时,它会抛出错误:

scala> val seqDf=managerIdDf.collect.toSeq
seqDf: Seq[org.apache.spark.sql.Row] = WrappedArray([67832], [65646], [5646], [67858], [69062], [68319], [66928])

scala> finalEmployeesDf.filter($"emp_id".isin(seqDf)).select("*").show
java.lang.RuntimeException: Unsupported literal type class scala.collection.mutable.WrappedArray$ofRef WrappedArray([67832], [65646], [5646], [67858], [69062], [68319], [66928])

我还提到了 this发布但徒劳无功。我正在尝试使用这种类型的查询来解决 spark 数据帧中的子查询。有人在吗?

最佳答案

使用数据帧和临时 View 以及 SPARK SQL 的自由格式 SQL 的替代方法 - 不要担心逻辑,它只是约定和初始方法的替代方法 - 这应该同样足够:

val df2 = Seq(
  ("Peter", "Doe", Seq(("New York", "A000000"), ("Warsaw", null))),
  ("Bob", "Smith", Seq(("Berlin", null))),
  ("John", "Jones", Seq(("Paris", null)))
).toDF("firstname", "lastname", "cities")

df2.createOrReplaceTempView("persons")

val res = spark.sql("""select * 
                         from persons 
                        where firstname
                       not in (select firstname
                                 from persons
                                where lastname <> 'Doe')""")

res.show

val list = List("Bob", "Daisy", "Peter")

val res2 = spark.sql("select firstname, lastname from persons")
                .filter($"firstname".isin(list:_*))

res2.show

val query = s"select * from persons where firstname in (${list.map ( x => "'" + x + "'").mkString(",") })"
val res3 = spark.sql(query)
res3.show

df2.filter($"firstname".isin(list: _*)).show

val list2 = df2.select($"firstname").rdd.map(r => r(0).asInstanceOf[String]).collect.toList
df2.filter($"firstname".isin(list2: _*)).show 

具体针对您的情况:

val seqDf=managerIdDf.rdd.map(r => r(0).asInstanceOf[Long]).collect.toList 2) 
finalEmployeesDf.filter($"emp_id".isin(seqDf: _)).select("").show

关于apache-spark - 如何在 spark 数据帧中的 ISIN 运算符中传递数据帧,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/52613706/

相关文章:

java - Spark 错误 - 不支持的类文件主要版本

apache-spark - SparkSQL 和 UDT

scala - 将 csv 转换为 RDD

scala - 如何检查 DateType 列的值是否在指定日期范围内?

hadoop - spark 会自动缓存 rdds 吗?

pyspark - 有没有办法将超过 255 列加载到 Spark Dataframe 中?

hadoop - 负责管理Hadoop配置/服务集群的 Chef

hadoop - "merge"在 MapReduce 中是什么意思?

python - Pyspark Dataframe 从列中获取唯一元素,并将字符串作为元素列表

apache-spark - 将具有 UTC 偏移量的字符串转换为 Spark 时间戳