我有一个 DataFrame,需要将其写入我们的自定义数据存储中。此数据存储要求根据特定列的 (account_id) 值将数据分组写入。 例如。给定此数据:
account_id | date | value
1 | 2023-01-01 | 1
1 | 2023-01-02 | 2
2 | 2023-01-01 | 3
我需要将 account_id 1 的组与 account_id 2 的组分开处理。
我尝试了这个(在 Scala 中)使用
df.repartition(df.col("account_id")).foreachPartition(processPartition)
我预计单元测试中的模拟数据存储会收到 2 个调用,一个调用包含 account_id 1 的 2 行,另一个调用包含帐户 id 2 的一行,但它只收到包含所有 3 行的单个写入调用。这就好像 Spark 忽略了我的分区要求。
阅读本文后,我的印象是分区纯粹是一种性能工具(用于控制并行性的使用方式),并且 Spark 在这里决定甚至没有理由分割小帧。但是,我找不到任何文档可以清楚地说明这一点。我的问题是,就我而言,这与性能无关,而是与正确性有关。我也找不到任何文档告诉我该怎么做。我遇到了 DataFrameWriter 的partitionBy,但是这个文档都讨论磁盘上的文件,而我需要使用我已经编写的客户端库写入我们的自定义数据存储。
最后,我在 StackOverflow 上发现了一些类似的问题,但他们明确提到 DataFrame 很小,性能不是问题,因此接受的答案是首先获取不同值的列表,然后迭代该列表并创建新的 DataFrames 过滤这些值 ala account_ids.map(id => (df.where(($"account_id"=== id))))
但这会破坏 Spark 的隐式并行性。
最佳答案
分区可保证具有相同 account_id
的所有帐户最终位于同一分区中,但不能保证单个分区仅包含单个 account_id
。通常,您的 account_id
数会多于分区数,因此 Spark 没有机会为每个 account_id
创建单个分区。
如果您在调用repartition
时省略分区数量,Spark将使用默认值(spark.sql.shuffle.partitions
)作为分区数量。
但是,您可以按键对每个分区中的行进行排序(不需要额外的改组),然后单独处理每个键:
//create testdata with 8 keys distributed over 5 partitions
val df = spark.createDataset(for( x <- 1 to 100 ) yield (x%8, x))
.toDF("key", "value")
.repartition(5, 'key)
//the method handling the data store
def doSomething(in:List[Row]): Unit = {
if( in.length > 0 ) {
println(in) //replace this line with whatever code should work only on a single account_id
}
}
//processing the partitions in parallel
df.sortWithinPartitions("key")
.foreachPartition{it:Iterator[Row] => {
var curKey:Option[Int] = Option.empty
var curList: ListBuffer[Row] = ListBuffer()
while(it.hasNext) {
val row = it.next()
val k = row.getAs[Int]("key")
if( curKey == Some(k) ) {
curList += row
}
else {
doSomething(curList.toList)
curList = ListBuffer()
curList += row
curKey = Some(k)
}
}
doSomething(curList.toList)
}
}
输出:
List([1,1], [1,9], [1,17], [1,25], [1,33], [1,41], [1,49], [1,57], [1,65], [1,73], [1,81], [1,89], [1,97])
List([2,2], [2,10], [2,18], [2,26], [2,34], [2,42], [2,50], [2,58], [2,66], [2,74], [2,82], [2,90], [2,98])
List([6,6], [6,14], [6,22], [6,30], [6,38], [6,46], [6,54], [6,62], [6,70], [6,78], [6,86], [6,94])
List([0,8], [0,16], [0,24], [0,32], [0,40], [0,48], [0,56], [0,64], [0,72], [0,80], [0,88], [0,96])
List([3,3], [3,11], [3,19], [3,27], [3,35], [3,43], [3,51], [3,59], [3,67], [3,75], [3,83], [3,91], [3,99])
List([5,5], [5,13], [5,21], [5,29], [5,37], [5,45], [5,53], [5,61], [5,69], [5,77], [5,85], [5,93])
List([4,4], [4,12], [4,20], [4,28], [4,36], [4,44], [4,52], [4,60], [4,68], [4,76], [4,84], [4,92], [4,100])
List([7,7], [7,15], [7,23], [7,31], [7,39], [7,47], [7,55], [7,63], [7,71], [7,79], [7,87], [7,95])
输出中的每一行都是通过对 doSomething
的一次调用创建的,并且仅包含具有唯一键的行,同时仍然并行处理分区。
关于apache-spark - 如何按列的值分组处理 Spark DataFrame,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/76598813/