给定一个 Spark DataFrame,如下所示:
==================================
| Name | Col1 | Col2 | .. | ColN |
----------------------------------
| A | 1 | 11 | .. | 21 |
| A | 31 | 41 | .. | 51 |
| B | 2 | 12 | .. | 22 |
| B | 32 | 42 | .. | 52 |
==================================
我想运行逻辑,对与特定 Name
值相对应的表分区执行聚合/计算。所述逻辑要求分区的全部内容——并且仅该分区——在执行逻辑的节点的内存中具体化;它看起来类似于下面的 processSegment
函数:
def processDataMatrix(dataMatrix):
# do some number crunching on a 2-D matrix
def processSegment(dataIter):
# "running" value of the Name column in the iterator
dataName = None
# as the iterator is processed, put the data in a matrix
dataMatrix = []
for dataTuple in dataIter:
# separate the name column from the other columns
(name, *values) = dataTuple
# SANITY CHECK: ensure that all rows have same name
if (dataName is None):
dataName = name
else:
assert (dataName == name), 'row name ' + str(name) + ' does not match expected ' + str(dataName)
# put the row in the matrix
dataMatrix.append(values)
# if any rows were processed, number-crunch the matrix
if (dataName is not None):
return processDataMatrix(dataMatrix)
else:
return []
我尝试通过基于 Name
列重新分区来实现此工作,然后通过底层上的 mapPartitions
在每个分区上运行 processSegment
RDD:
result = \
stacksDF \
.repartition('Name') \
.rdd \
.mapPartitions(processSegment) \
.collect()
但是,该进程通常无法通过 processSegment
中的 SANITY CHECK
断言:
AssertionError: row name Q7 does not match expected A9
当我尝试在底层 RDD 上运行 mapPartitions
时,为什么表面上在 DataFrame 上执行的分区没有被保留?如果上述方法无效,是否有某种方法(使用 DataFrame API 或 RDD API)使我能够对 DataFrame 分区的内存中呈现执行聚合逻辑?
(由于我使用 PySpark,并且我希望执行的特定数字运算逻辑是 Python,用户定义的聚合函数 (UDAF) would not appear to be an option 。)
最佳答案
我相信您误解了分区的工作原理。一般来说,partioner 是一个满射函数,而不是双射函数。虽然特定值的所有记录都将移动到单个分区,但分区可能包含具有多个不同值的记录。
DataFrame
API 无法让您对分区器进行任何控制,但在使用 RDD
API 时可以定义自定义 partitionFunc
。这意味着您可以使用双射,例如:
mapping = (df
.select("Name")
.distinct()
.rdd.flatMap(lambda x: x)
.zipWithIndex()
.collectAsMap())
def partitioner(x):
return mapping[x]
并按如下方式使用它:
df.rdd.map(lambda row: (row.Name, row)).partitionBy(len(mapping), partitioner)
尽管您可能必须记住分区不是空闲的,并且如果唯一值的数量很大,则可能会成为严重的性能问题。
关于apache-spark - 在 RDD 转换时保留 Spark DataFrame 列分区,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/40666840/