python - pyspark 1.6 中 Pandas 分位数和切割的替代方法是什么

标签 python pandas pyspark apache-spark-sql

我是 pyspark 的新手。我有如下所示的 Pandas 代码。

bindt = df[df[var].notnull()][var].quantile([0,.1,.2,.3,.4,.5,.6,.7,.8,.9,1]).unique()

df['{0}_quartile'.format(var)] = pd.cut(df[var], bindt, labels=False, include_lowest=True )

我在 pyspark 2.x 中找到了“approxQuantile”,但在 pyspark 1.6.0 中没有找到任何此类

我的示例输入:

df.show()

+-----------+----------+---------------+--------------+------------------------+
|  id       | col_1    |col_2          |col_3         |col_4                   |
+-----------+----------+---------------+--------------+------------------------+
|1.10919E+16|3988487.35|     -236751.43|    -362208.07|                0.660000|
|1.10919E+16|3988487.35|     -236751.43|    -362208.07|                0.900000|
|1.10919E+16|3988487.35|     -236751.43|    -362208.07|                0.660000|
|1.10919E+16|  36718.55|           null|          null|                0.860000|
|1.10919E+16|  36718.55|           null|          null|                0.780000|
|1.10919E+16|  36718.55|           null|          null|                0.660000|
|1.10919E+16|  36718.55|           null|          null|                0.900000|
|1.10919E+16|  36718.55|           null|          null|                0.660000|

df.collect()

[Row(id=u'1.11312E+16', col_1=Decimal('367364.44'), col_2=Decimal('-401715.23'), col_3=Decimal('-1649917.53'), col_4=Decimal('0.080000')), 
Row(id=u'1.11312E+16', col_1=Decimal('367364.44'), col_2=Decimal('-401715.23'), col_3=Decimal('-1649917.53'), col_4=Decimal('0.780000')), 
Row(id=u'1.11312E+16', col_1=Decimal('367364.44'), col_2=Decimal('-401715.23'), col_3=Decimal('-1649917.53'), col_4=Decimal('0.780000')), 
Row(id=u'1.11312E+16', col_1=Decimal('367364.44'), col_2=Decimal('-401715.23'), col_3=Decimal('-1649917.53'), col_4=Decimal('0.860000')), 
Row(id=u'1.11312E+16', col_1=Decimal('367364.44'), col_2=Decimal('-401715.23'), col_3=Decimal('-1649917.53'), col_4=Decimal('0.330000'))]

我必须为所有输入列循环上述逻辑。

for var in df.columns:
    bindt = df[df[var].notnull()][var].quantile([0,.1,.2,.3,.4,.5,.6,.7,.8,.9,1]).unique()    
    df['{0}_quartile'.format(var)] = pd.cut(df[var], bindt, labels=False, include_lowest=True )

谁能建议如何在 pyspark 1.6 dataframe 中重写上述代码。

提前致谢

最佳答案

如果您使用的是 pyspark 2.x,则可以使用 QuantileDiscretizer来自使用 approxQuantile() 的 ml lib和 Bucketizer在幕后。

但是,由于您使用的是 pyspark 1.6.x,因此您需要:

1。查找列的分位数

您可以通过两种方式找到分位数值:

  1. 通过计算 percent_rank() 来计算列的百分位数并提取百分位值接近所需分位数的列值

  2. 关注the methods in this answer其中解释了如何使用 pyspark < 2.0.0

  3. 执行分位数近似

这是我的近似分位数值的示例实现:

from pyspark.sql import functions as F
from pyspark.sql import Window

def compute_quantiles(df, col, quantiles):
  quantiles = sorted(quantiles)

  # 1. compute percentile
  df = df.withColumn("percentile", F.percent_rank().over(Window.orderBy(col)))

  # 2. categorize quantile based on the desired quantile and compute errors
  df = df.withColumn("percentile_cat1", F.lit(-1.0))
  df = df.withColumn("percentile_err1", F.lit(-1.0))
  df = df.withColumn("percentile_cat2", F.lit(-1.0))
  df = df.withColumn("percentile_err2", F.lit(-1.0))

  # check percentile with the lower boundaries
  for idx in range(0, len(quantiles)-1):
    q = quantiles[idx]
    df = df.withColumn("percentile_cat1", F\
                       .when( (F.col("percentile_cat1") == -1.0) & 
                             (F.col("percentile") <= q), q)\
                       .otherwise(F.col("percentile_cat1")))
    df = df.withColumn("percentile_err1", F\
                       .when( (F.col("percentile_err1") == -1.0) & 
                             (F.col("percentile") <= q), 
                             F.pow(F.col("percentile") - q, 2))\
                       .otherwise(F.col("percentile_err1")))

  # assign the remaining -1 values in the error to the largest squared error of 1
  df = df.withColumn("percentile_err1", F\
                     .when(F.col("percentile_err1") == -1.0, 1)\
                     .otherwise(F.col("percentile_err1")))

  # check percentile with the upper boundaries
  for idx in range(1, len(quantiles)):
    q = quantiles[idx]
    df = df.withColumn("percentile_cat2", F\
                       .when((F.col("percentile_cat2") == -1.0) & 
                             (F.col("percentile") <= q), q)\
                       .otherwise(F.col("percentile_cat2")))
    df = df.withColumn("percentile_err2",F\
                       .when((F.col("percentile_err2") == -1.0) & 
                             (F.col("percentile") <= q), 
                             F.pow(F.col("percentile") - q, 2))\
                       .otherwise(F.col("percentile_err2")))

  # assign the remaining -1 values in the error to the largest squared error of 1
  df = df.withColumn("percentile_err2", F\
                     .when(F.col("percentile_err2") == -1.0, 1)\
                     .otherwise(F.col("percentile_err2")))

  # select the nearest quantile to the percentile
  df = df.withColumn("percentile_cat", F\
                     .when(F.col("percentile_err1") < F.col("percentile_err2"), 
                           F.col("percentile_cat1"))\
                     .otherwise(F.col("percentile_cat2")))
  df = df.withColumn("percentile_err", F\
                     .when(F.col("percentile_err1") < F.col("percentile_err2"), 
                           F.col("percentile_err1"))\
                     .otherwise(F.col("percentile_err2")))

  # 3. approximate quantile values by choosing the value with the lowest error at each percentile category
  df = df.withColumn("approx_quantile", F\
                     .first(col).over(Window\
                                      .partitionBy("percentile_cat")\
                                      .orderBy(F.asc("percentile_err"))))

  return df

def extract_quantiles(df):
  df_quantiles = df.select("percentile_cat", "approx_quantile").distinct()
  rows = df_quantiles.collect()
  quantile_values = [ row.approx_quantile for row in rows ]

  return quantile_values

我想从上面实现的是计算列中每一行的百分位数,并将其分类到最接近的分位数。可以通过选择与百分位数具有最低差异(平方误差)的最低分位数类别来将百分位数分类为最接近的分位数。

<强>1。计算百分位数

首先,我使用 percent_rank() 计算列的百分位数, 一个 Window function在 pyspark 中。您可以将 Window 视为数据的分区规范。由于percent_rank()是Window函数,所以需要传入Window。

<强>2。将百分位数分类为分位数边界并计算误差

最接近百分位数的分位数类别可以低于、等于高于。因此,我需要两次计算误差:第一次将百分位数与分位数下限进行比较,第二次将其与分位数上限进行比较。请注意,≤ 运算符用于检查百分位数是否小于或等于 边界。在知道百分位数的直接上下分位数边界后,我们可以通过选择低于或等于或高于或等于误差最小的分位数类别,将百分位数分配给最近的分位数类别。

<强>3。近似分位数

一旦我们知道每个百分位数的所有最接近的分位数类别,我们就可以近似分位数值:它是每个分位数类别中误差最低的值。可以使用 first() 计算此近似分位数值使用 Window 在每个类别分区上运行函数.接下来,要提取分位数值,我们只需从数据框中选择唯一的 percentileCategory-approxQuantileValue 对。


在使用 desired_quantiles = [0.0, 0.1, 0.2, 0.3, 0.4, 0.5, 0.6, 0.7, 0.8, 0.9, 1.0] 测试我的数据(~10000 行)后,我发现我的示例实现非常接近 approxQuantile 结果。当我减少提供给 approxQuantile 的误差时,两个结果值变得更接近。

使用 extract_quantiles(compute_quantile(df, col, quantiles)):

enter image description here

使用 approxQuantile:

enter image description here

2。使用 Bucketizer

找到分位数后,您可以使用 pyspark 的 Bucketizer 根据分位数对值进行分桶。 Bucketizer 在两个 pyspark 1.6.x 中都可用 [1] [2]和 2.x [3] [4]

这是一个如何执行分桶的示例:

from pyspark.ml.feature import Bucketizer

bucketedData = df
desired_quantiles = [0.0, 0.1, 0.2, 0.3, 0.4, 0.5, 0.6, 0.7, 0.8, 0.9, 1.0] # must be sorted

for col in df.columns:
  quantile_values = extract_quantiles(compute_quantiles(df, col, desired_quantiles))
  splits = [ boundary_values ] # replace this with quantile_values

  bucketizer = Bucketizer()\
    .setInputCol(col)\
    .setOutputCol("{}_quantile".format(col))\
    .setSplits(splits)
  bucketedData = bucketizer.transform(bucketedData)

您可以将 value_boundaries 替换为您在第 1 步中找到的分位数值或您想要的任何桶分割范围。当您使用 bucketizer 时,整个列值范围必须包含在拆分中。否则,指定拆分之外的值将被视为错误。如果您不确定该值,则必须显式提供无限值,例如 -float("inf")float("inf") 以涵盖所有 float 值数据的边界。

关于python - pyspark 1.6 中 Pandas 分位数和切割的替代方法是什么,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/54803107/

相关文章:

python - 用 scapy 在 python 中编写一个以太网桥

python - 从 Pandas 数据框中过滤数据

python-3.x - 来自 python worker :. 的错误 .. SyntaxError: invalid syntax

function - 加入PySpark时有right_anti吗?

python - 如何获取执行当前线程的文件的当前路径?

python - 显示多索引 Pandas 数据框的前 10 行

python - Beautifulsoup 通过 <br/> 拆分标签中的文本

python - 使用 IN 运算符在 pandas.Series 中查找字符串

python - pandas 自动从具有列名称的系列列表创建数据框

authentication - 通过 Spark 本地读取 S3 文件(或更好的 : pyspark)