apache-spark - 在 RDD 转换时保留 Spark DataFrame 列分区

标签 apache-spark dataframe pyspark apache-spark-sql rdd

给定一个 Spark DataFrame,如下所示:

==================================
| Name | Col1 | Col2 | .. | ColN |
----------------------------------
|    A |    1 |   11 | .. |   21 |
|    A |   31 |   41 | .. |   51 |
|    B |    2 |   12 | .. |   22 |
|    B |   32 |   42 | .. |   52 |
==================================

我想运行逻辑,对与特定 Name 值相对应的表分区执行聚合/计算。所述逻辑要求分区的全部内容——并且该分区——在执行逻辑的节点的内存中具体化;它看起来类似于下面的 processSegment 函数:

def processDataMatrix(dataMatrix):
    # do some number crunching on a 2-D matrix

def processSegment(dataIter):
    # "running" value of the Name column in the iterator
    dataName = None
    # as the iterator is processed, put the data in a matrix
    dataMatrix = []

    for dataTuple in dataIter:
        # separate the name column from the other columns
        (name, *values) = dataTuple
        # SANITY CHECK: ensure that all rows have same name
        if (dataName is None):
            dataName = name
        else:
            assert (dataName == name), 'row name ' + str(name) + ' does not match expected ' + str(dataName)

        # put the row in the matrix
        dataMatrix.append(values)

    # if any rows were processed, number-crunch the matrix
    if (dataName is not None):
        return processDataMatrix(dataMatrix)
    else:
        return []

我尝试通过基于 Name 列重新分区来实现此工作,然后通过底层上的 mapPartitions 在每个分区上运行 processSegment RDD:

result = \
    stacksDF \
        .repartition('Name') \
        .rdd \
        .mapPartitions(processSegment) \
        .collect()

但是,该进程通常无法通过 processSegment 中的 SANITY CHECK 断言:

AssertionError: row name Q7 does not match expected A9

当我尝试在底层 RDD 上运行 mapPartitions 时,为什么表面上在 DataFrame 上执行的分区没有被保留?如果上述方法无效,是否有某种方法(使用 DataFrame API 或 RDD API)使我能够对 DataFrame 分区的内存中呈现执行聚合逻辑?

(由于我使用 PySpark,并且我希望执行的特定数字运算逻辑是 Python,用户定义的聚合函数 (UDAF) would not appear to be an option 。)

最佳答案

我相信您误解了分区的工作原理。一般来说,partioner 是一个满射函数,而不是双射函数。虽然特定值的所有记录都将移动到单个分区,但分区可能包含具有多个不同值的记录。

DataFrame API 无法让您对分区器进行任何控制,但在使用 RDD API 时可以定义自定义 partitionFunc。这意味着您可以使用双射,例如:

mapping = (df
    .select("Name")
    .distinct()
    .rdd.flatMap(lambda x: x)
    .zipWithIndex()
    .collectAsMap())

def partitioner(x):
    return mapping[x]

并按如下方式使用它:

df.rdd.map(lambda row: (row.Name, row)).partitionBy(len(mapping), partitioner)

尽管您可能必须记住分区不是空闲的,并且如果唯一值的数量很大,则可能会成为严重的性能问题。

关于apache-spark - 在 RDD 转换时保留 Spark DataFrame 列分区,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/40666840/

相关文章:

r - 使用 R 按列值对数据框进行排序

python - 如何访问 Spark 稀疏向量元素

apache-spark - Apache 星火 : ERROR Executor -> Iterator

scala - 将本地向量转换为 RDD[向量]

Python Plotnine - 创建堆积条形图

pyspark - from_utc_timestamp 不考虑夏令时

java - Pyspark 中的广播加入得到 OnOutOfMemoryError

hadoop - 用于查询 HDFS 上的数据的纯 spark 与 spark SQL

python - 在 pyspark 中将 RDD 转换为 Dataframe

python - Pandas 数据框在 read_excel 时更改浮点值