apache-spark - 如何按列的值分组处理 Spark DataFrame

标签 apache-spark apache-spark-sql

我有一个 DataFrame,需要将其写入我们的自定义数据存储中。此数据存储要求根据特定列的 (account_id) 值将数据分组写入。 例如。给定此数据:

account_id | date       | value
         1 | 2023-01-01 | 1
         1 | 2023-01-02 | 2
         2 | 2023-01-01 | 3

我需要将 account_id 1 的组与 account_id 2 的组分开处理。

我尝试了这个(在 Scala 中)使用 df.repartition(df.col("account_id")).foreachPartition(processPartition)

我预计单元测试中的模拟数据存储会收到 2 个调用,一个调用包含 account_id 1 的 2 行,另一个调用包含帐户 id 2 的一行,但它只收到包含所有 3 行的单个写入调用。这就好像 Spark 忽略了我的分区要求。

阅读本文后,我的印象是分区纯粹是一种性能工具(用于控制并行性的使用方式),并且 Spark 在这里决定甚至没有理由分割小帧。但是,我找不到任何文档可以清楚地说明这一点。我的问题是,就我而言,这与性能无关,而是与正确性有关。我也找不到任何文档告诉我该怎么做。我遇到了 DataFrameWriter 的partitionBy,但是这个文档都讨论磁盘上的文件,而我需要使用我已经编写的客户端库写入我们的自定义数据存储。

最后,我在 StackOverflow 上发现了一些类似的问题,但他们明确提到 DataFrame 很小,性能不是问题,因此接受的答案是首先获取不同值的列表,然后迭代该列表并创建新的 DataFrames 过滤这些值 ala account_ids.map(id => (df.where(($"account_id"=== id)))) 但这会破坏 Spark 的隐式并行性。

最佳答案

分区可保证具有相同 account_id 的所有帐户最终位于同一分区中,但不能保证单个分区仅包含单个 account_id。通常,您的 account_id 数会多于分区数,因此 Spark 没有机会为每个 account_id 创建单个分区。 如果您在调用repartition时省略分区数量,Spark将使用默认值(spark.sql.shuffle.partitions)作为分区数量。

但是,您可以按键对每个分区中的行进行排序(不需要额外的改组),然后单独处理每个键:

//create testdata with 8 keys distributed over 5 partitions
val df = spark.createDataset(for( x <- 1 to 100 ) yield (x%8, x))
  .toDF("key", "value")
  .repartition(5, 'key)

//the method handling the data store
def doSomething(in:List[Row]): Unit = {
  if( in.length > 0 ) {
    println(in) //replace this line with whatever code should work only on a single account_id
  }
}

//processing the partitions in parallel
df.sortWithinPartitions("key")
  .foreachPartition{it:Iterator[Row] => {
    var curKey:Option[Int] = Option.empty
    var curList: ListBuffer[Row] = ListBuffer()
    while(it.hasNext) {
      val row = it.next()
      val k = row.getAs[Int]("key")
      if( curKey == Some(k) ) {
        curList += row
      }
      else {
        doSomething(curList.toList)
        curList = ListBuffer()
        curList += row
        curKey = Some(k)
      }
    }
    doSomething(curList.toList)
  }
}

输出:

List([1,1], [1,9], [1,17], [1,25], [1,33], [1,41], [1,49], [1,57], [1,65], [1,73], [1,81], [1,89], [1,97])
List([2,2], [2,10], [2,18], [2,26], [2,34], [2,42], [2,50], [2,58], [2,66], [2,74], [2,82], [2,90], [2,98])
List([6,6], [6,14], [6,22], [6,30], [6,38], [6,46], [6,54], [6,62], [6,70], [6,78], [6,86], [6,94])
List([0,8], [0,16], [0,24], [0,32], [0,40], [0,48], [0,56], [0,64], [0,72], [0,80], [0,88], [0,96])
List([3,3], [3,11], [3,19], [3,27], [3,35], [3,43], [3,51], [3,59], [3,67], [3,75], [3,83], [3,91], [3,99])
List([5,5], [5,13], [5,21], [5,29], [5,37], [5,45], [5,53], [5,61], [5,69], [5,77], [5,85], [5,93])
List([4,4], [4,12], [4,20], [4,28], [4,36], [4,44], [4,52], [4,60], [4,68], [4,76], [4,84], [4,92], [4,100])
List([7,7], [7,15], [7,23], [7,31], [7,39], [7,47], [7,55], [7,63], [7,71], [7,79], [7,87], [7,95])

输出中的每一行都是通过对 doSomething 的一次调用创建的,并且仅包含具有唯一键的行,同时仍然并行处理分区。

关于apache-spark - 如何按列的值分组处理 Spark DataFrame,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/76598813/

相关文章:

scala - 出现类似需要结构类型的错误,但在 spark scala 中得到了简单结构类型的字符串

scala - 如何定义DataFrame的分区?

python - PySpark DataFrame - 动态加入多列

python - scala:将元组引用传递给函数

amazon-ec2 - 我应该将我的 AWS 访问 key 添加到哪个 core-site.xml?

python - 如何检查pyspark数据框中的字符串列是否全部为数字

apache-spark - dropDuplicates运算符使用哪一行?

apache-spark - Spark 读取 CSV - 不显示损坏的记录

scala - Spark清理shuffle溢出到磁盘

java - 获取 Spark 数据集中嵌套数组的最小值