scala - 创建 Spark Dataframe 的摘要

标签 scala apache-spark apache-spark-sql databricks azure-databricks

我有一个 Spark Dataframe,我正在尝试对其进行总结,以便找到过长的列:

// Set up test data
// Look for long columns (>=3), ie 1 is ok row,, 2 is bad on column 3, 3 is bad on column 2
val df = Seq(
    ( 1, "a", "bb", "cc", "file1" ),
    ( 2, "d", "ee", "fff", "file2" ),
    ( 3, "g", "hhhh", "ii", "file3" )
    ).
    toDF("rowId", "col1", "col2", "col3", "filename")

我可以总结列的长度并找到过长的列,如下所示:

// Look for long columns (>=3), ie 1 is ok row,, 2 is bad on column 3, 3 is bad on column 2
val df2 = df.columns
    .map(c => (c, df.agg(max(length(df(s"$c")))).as[String].first()))
    .toSeq.toDF("columnName", "maxLength")
    .filter($"maxLength" > 2)

如果我尝试将现有文件名列添加到 map 中,则会收到错误:

val df2 = df.columns
    .map(c => ($"filename", c, df.agg(max(length(df(s"$c")))).as[String].first()))
    .toSeq.toDF("fn", "columnName", "maxLength")
    .filter($"maxLength" > 2)

我尝试了 $"filename" 的一些变体句法。我怎样才能合并filename列到摘要中?

<表类=“s-表”> <标题> 列名 最大长度 文件名 <正文> col2 4 文件3 col3 3 文件2

真实的数据框有 300 多列和数百万行,因此我无法硬输入列名称。

最佳答案

@wBob 以下内容是否实现了您的目标?

  1. 按文件名分组并获取每列的最大值:
    val cols = df.columns.dropRight(1) // to remove the filename col
    val maxLength = cols.map(c => s"max(length(${c})) as ${c}").mkString(",")
    print(maxLength)
    df.createOrReplaceTempView("temp")
    val df1 = spark
      .sql(s"select filename, ${maxLength} from temp group by filename")
    df1.show()`

输出:

+--------+-----+----+----+----+
|filename|rowId|col1|col2|col3|
+--------+-----+----+----+----+
|   file1|    1|   1|   2|   2|
|   file2|    1|   1|   2|   3|
|   file3|    1|   1|   4|   2|
+--------+-----+----+----+----+
  • 使用子查询获取每列的最大值并使用 union 连接结果:
  •     df1.createOrReplaceTempView("temp2")
        val res = cols.map(col => {
          spark.sql(s"select '${col}' as columnName,  $col as maxLength, filename from temp2 " +
            s"where $col = (select max(${col}) from temp2)")
        }).reduce(_ union _)
        res.show()
    

    结果:

    +----------+---------+--------+
    |columnName|maxLength|filename|
    +----------+---------+--------+
    |     rowId|        1|   file1|
    |     rowId|        1|   file2|
    |     rowId|        1|   file3|
    |      col1|        1|   file1|
    |      col1|        1|   file2|
    |      col1|        1|   file3|
    |      col2|        4|   file3|
    |      col3|        3|   file2|
    +----------+---------+--------+
    

    请注意,rowIdcol1 有多个条目,因为最大值不唯一。

    可能有一种更优雅的方式来编写它,但我目前正在努力寻找一种方式。

    关于scala - 创建 Spark Dataframe 的摘要,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/72281639/

    相关文章:

    ruby - 有没有比使用自定义 case 语句更实用的方法来用 Ruby 编写这个?

    java - Spark Java - 无法获取 java.lang.String 的正确类标签

    scala - 如何在 Spark 1.5 中转置数据帧(没有可用的枢轴运算符)?

    apache-spark - 在 Spark SQL 中编写 SQL 与使用 Dataframe API

    scala - 避免 Scala 内存泄漏 - Scala 构造函数

    scala - 在 Scala 中创建 SparkSession 时出现 java.lang.InterruptedException

    scala - 如何收集错误是scala,然后将所有错误汇总在一起

    scala - 线程 "main"中的异常 java.lang.NoSuchMethodError : scala. Product.$init$(Lscala/Product;)

    python - Spark VectorAssembler 错误 - PySpark 2.3 - Python

    scala - 异常线程 "main"scala.MatchError :Map() (of class org. apache.spark.sql.catalyst.util.CaseInsensitiveMap)