pandas - 使 groupby.apply 更高效或转换为 spark

标签 pandas dataframe pyspark apache-spark-sql pandas-groupby

全部,

我正在使用 pandas groupby.apply 来使用我自己的自定义函数。但是,我注意到该功能非常非常慢。有人可以帮我转换此代码以应用于 spark 数据帧吗?

添加简单的例子供人们使用:

import pandas as pd
import operator

df = pd.DataFrame({
    'Instruments': ['A', 'B', 'A', 'B', 'A', 'C', 'C', 'B'],
    'Sers': ['Wind', 'Tool', 'Wind', 'Wind', 'Tool', 'Tool', 'Tool', 'Wind'],
    'Sounds': [42, 21, 34, 56, 43, 61, 24, 23]
})
def get_stats(data_frame):

    # For each grouped data_frame, cutoff all Sounds greater than 99th percentile
    cutoff_99 = data_frame[data_frame.Sounds <= data_frame.Sounds.quantile(.99)]

    # Based on total number of records, select the most-abundant sers
    sers_to_use = max((cutoff_99.Sers.value_counts() / cutoff_99.shape[0]).to_dict().items(), key = operator.itemgetter(1))[0]

    # Give me the average sound of the selected sers
    avg_sounds_of_sers_to_use = cutoff_99.loc[cutoff_99["Sers"] == sers_to_use].Sounds.mean()

    # Pre-allocate lists
    cool = []
    mean_sounds = []
    ratios = []
    _difference = []


    for i in cutoff_99.Sers.unique():
        # add each unique sers of that dataframe 
        cool.append(i) 

        # get the mean sound of that ser
        sers_mean_sounds = (cutoff_99.loc[cutoff_99["Sers"] == i].Sounds).mean()

        # add each mean sound for each sers
        mean_sounds.append(sers_mean_sounds) 

        # get the ratio of the sers to use vs. the current sers; add all of the ratios to the list
        ratios.append(avg_sounds_of_sers_to_use / sers_mean_sounds)

        # get the percent difference and add it to a list
        _difference.append(
            float(
                round(
                    abs(avg_sounds_of_sers_to_use - sers_mean_sounds)
                    / ((avg_sounds_of_sers_to_use + sers_mean_sounds) / 2),
                    2,
                )
                * 100
            )
        )

    # return a series with these lists/values.
    return pd.Series({
        'Cools': cool,
        'Chosen_Sers': sers_to_use,
        'Average_Sounds_99_Percent': mean_sounds,
        'Mean_Ratios': ratios,
        'Percent_Differences': _difference
    }) 

我在pandas中调用函数如下: df.groupby('仪器').apply(get_stats)

最佳答案

您可以使用 pyspark 和 window 实现一切功能如下图所示。

创建 pyspark 数据框:

import pandas as pd

data = {
    'Instruments': ['A', 'B', 'A', 'B', 'A', 'C', 'C', 'B'],
    'Sers': ['Wind', 'Tool', 'Wind', 'Wind', 'Tool', 'Tool', 'Tool', 'Wind'],
    'Sounds': [42, 21, 34, 56, 43, 61, 24, 23]
}

pddf = pd.DataFrame(data)

df = spark.createDataFrame(pddf)
df.show()

输出:

+-----------+----+------+
|Instruments|Sers|Sounds|
+-----------+----+------+
|          A|Wind|    42|
|          B|Tool|    21|
|          A|Wind|    34|
|          B|Wind|    56|
|          A|Tool|    43|
|          C|Tool|    61|
|          C|Tool|    24|
|          B|Wind|    23|
+-----------+----+------+

计算:

from pyspark.sql import Window
from pyspark.sql import functions as F

wI = Window.partitionBy('Instruments')
wIS = Window.partitionBy('Instruments', 'Sers')

df = df.withColumn('q', F.expr('percentile_approx(Sounds, 0.99)').over(wI))
df = df.filter(df.Sounds < df.q)

#this is our marker for Chosen_Sers
#a higher number indicates that this is the most-abundant sers
df = df.withColumn('tmpCount', F.count('Sers').over(wIS))
#this is the most-abundant sers as string
df = df.withColumn('Chosen_Sers', F.first('Sers').over(wI.orderBy(F.desc('tmpCount'))))

#mean sound for each sers within a instrument 
df = df.withColumn('Average_Sounds_99_Percent', F.mean('Sounds').over(wIS))
#mean sound of the chosen sers
df = df.withColumn('avg_sounds_of_sers_to_use', F.first(F.col('Average_Sounds_99_Percent')).over(wI.orderBy(F.desc('tmpCount'))))

df = df.withColumn('mean_ratios', F.col('avg_sounds_of_sers_to_use')/F.mean('Sounds').over(wIS))

df = df.withColumn('percent_differences', 100 * F.round(F.abs(F.col('avg_sounds_of_sers_to_use') - F.col('Average_Sounds_99_Percent'))/ ((F.col('avg_sounds_of_sers_to_use') + F.col('Average_Sounds_99_Percent'))/2),2))

#until now we flat table
df.show()

#now we create the desired structure and drop all unneeded columns
df.dropDuplicates(['Instruments','Sers']).groupby('Instruments', 'Chosen_Sers').agg(F.collect_list('Sers').alias('Sers')
                                            , F.collect_list('Average_Sounds_99_Percent').alias('Average_Sounds_99_Percent')
                                            , F.collect_list('mean_ratios').alias('mean_ratios')
                                            , F.collect_list('percent_differences').alias('percent_differences')
                                            ).show(truncate=False)

输出:

#just a flat table
+-----------+----+------+---+--------+-----------+-------------------------+-------------------------+------------------+-------------------+
|Instruments|Sers|Sounds|  q|tmpCount|Chosen_Sers|Average_Sounds_99_Percent|avg_sounds_of_sers_to_use|       mean_ratios|percent_differences|
+-----------+----+------+---+--------+-----------+-------------------------+-------------------------+------------------+-------------------+
|          B|Tool|    21| 56|       1|       Tool|                     21.0|                     21.0|               1.0|                0.0|
|          B|Wind|    23| 56|       1|       Tool|                     23.0|                     21.0|1.0952380952380953|                9.0|
|          C|Tool|    24| 61|       1|       Tool|                     24.0|                     24.0|               1.0|                0.0|
|          A|Wind|    42| 43|       2|       Wind|                     38.0|                     38.0|               1.0|                0.0|
|          A|Wind|    34| 43|       2|       Wind|                     38.0|                     38.0|               1.0|                0.0|
+-----------+----+------+---+--------+-----------+-------------------------+-------------------------+------------------+-------------------+
#desired structure
+-----------+-----------+------------+-------------------------+-------------------------+-------------------+
|Instruments|Chosen_Sers|Sers        |Average_Sounds_99_Percent|mean_ratios              |percent_differences|
+-----------+-----------+------------+-------------------------+-------------------------+-------------------+
|B          |Tool       |[Tool, Wind]|[21.0, 23.0]             |[1.0, 0.9130434782608695]|[0.0, 9.0]         |
|C          |Tool       |[Tool]      |[24.0]                   |[1.0]                    |[0.0]              |
|A          |Wind       |[Wind]      |[38.0]                   |[1.0]                    |[0.0]              |
+-----------+-----------+------------+-------------------------+-------------------------+-------------------+

关于pandas - 使 groupby.apply 更高效或转换为 spark,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/61083987/

相关文章:

python - 在python中添加分钟到时间戳

python - Pyspark RDD ReduceByKey 多函数

python - 如何使用其他数据框(PySpark)创建数据框?

python - 在 Pandas 中按小时过滤

python - 如何以 Pandas 两列为条件对频率进行重采样?

r - 添加一列以指示每行中选定列的重复率

python - py4j.protocol.Py4JError : An error occurred while calling None. 无。痕迹:

python - 如何编写对 DataFrame 友好的函数

python - 如何在 Pandas 中量化日期时间索引,并为每列提供一个操作?

Python 类 : How to add a custom method to an existing object (pandas dataframe)