apache-spark - 如何在 Spark scala 的窗口 PartitionBy 中应用多个列

标签 apache-spark

val partitionsColumns = "idnum,monthnum"
val partitionsColumnsList = partitionsColumns.split(",").toList
val loc = "/data/omega/published/invoice"
val df = sqlContext.read.parquet(loc)
val windowFunction = Window.partitionBy  (partitionsColumnsList:_*).orderBy(df("effective_date").desc)
<console>:38: error: overloaded method value partitionBy with alternatives:
(cols: org.apache.spark.sql.Column*)     org.apache.spark.sql.expressions.WindowSpec <and>
(colName: String,colNames: String*)org.apache.spark.sql.expressions.WindowSpec
cannot be applied to (String)
val windowFunction = Window.partitionBy(partitionsColumnsList:_*).orderBy(df("effective_date").desc)

是否可以将列列表发送到 partitionBy 方法 Spark/Scala?

我已经实现了将一列传递给 partitionBy 方法,该方法有效。我不知道如何将多个列传递给 partitionBy 方法

基本上我想将 List(Columns) 传递给 partitionBy 方法

Spark 版本为 1.6。

最佳答案

Window.partitionBy具有以下定义:

static WindowSpec partitionBy(Column... cols) 

Creates a WindowSpec with the partitioning defined.

static WindowSpec partitionBy(scala.collection.Seq<Column> cols)

Creates a WindowSpec with the partitioning defined.

static WindowSpec partitionBy(String colName, scala.collection.Seq<String> colNames) 

Creates a WindowSpec with the partitioning defined.

static WindowSpec partitionBy(String colName, String... colNames)

Creates a WindowSpec with the partitioning defined.

以你的例子,

val partitionsColumnsList = partitionsColumns.split(",").toList

你可以像这样使用它:

Window.partitionBy(partitionsColumnsList.map(col(_)):_*).orderBy(df("effective_date").desc)

或者

Window.partitionBy(partitionsColumnsList.head, partitionsColumnsList.tail _* ).orderBy(df("effective_date").desc)

关于apache-spark - 如何在 Spark scala 的窗口 PartitionBy 中应用多个列,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/56626446/

相关文章:

scala - 如何使用循环在 Spark-Scala 的 HDFS 中迭代多个文本文件?

apache-spark - Spark "Failed to construct kafka consumer"通过 SSL

scala - 如何在 Spark 中转置 RDD

apache-spark - Kubernetes 上的 SparkPi - 找不到或加载主类?

java - 使用 Maven 构建 Spark : error finding javac (but path is correct)

hadoop - 批处理模式中的 livy 抛出错误 Error : Only local python files are supported: Parsed arguments

scala - 为什么 "java.lang.ClassNotFoundException: Failed to find data source: kinesis"具有 spark-streaming-kinesis-asl 依赖性?

scala - 如何将正则表达式解析为整个 spark 数据框而不是每一列?

apache-spark - 使用 Hbase 进行 Spark Streaming

apache-spark - 从AWS S3读取pyspark文件不起作用