apache-spark - pyspark approxQuantile 函数

标签 apache-spark pyspark apache-spark-sql

我有包含以下列的数据框:idpricetimestamp

我想查找按 id 分组的中值。

我正在使用此代码来查找它,但它给了我这个错误。

from pyspark.sql import DataFrameStatFunctions as statFunc
windowSpec = Window.partitionBy("id")
median = statFunc.approxQuantile("price",
                                 [0.5],
                                 0) \
                 .over(windowSpec)

return df.withColumn("Median", median)

是否无法使用DataFrameStatFunctions在新列中填充值?

TypeError: unbound method approxQuantile() must be called with DataFrameStatFunctions instance as first argument (got str instance instead)

最佳答案

嗯,确实不可能使用approxQuantile来填充新数据框列中的值,但这不是您收到此错误的原因。不幸的是,整个底层故事是一个相当令人沮丧的故事,如 I have argued许多 Spark(尤其是 PySpark)功能就是这种情况,而且它们缺乏足够的文档。

首先,没有一个,而是两个 approxQuantile 方法; first one是标准 DataFrame 类的一部分,即您不需要导入 DataFrameStatFunctions:

spark.version
# u'2.1.1'

sampleData = [("bob","Developer",125000),("mark","Developer",108000),("carl","Tester",70000),("peter","Developer",185000),("jon","Tester",65000),("roman","Tester",82000),("simon","Developer",98000),("eric","Developer",144000),("carlos","Tester",75000),("henry","Developer",110000)]

df = spark.createDataFrame(sampleData, schema=["Name","Role","Salary"])
df.show()
# +------+---------+------+ 
# |  Name|     Role|Salary|
# +------+---------+------+
# |   bob|Developer|125000| 
# |  mark|Developer|108000|
# |  carl|   Tester| 70000|
# | peter|Developer|185000|
# |   jon|   Tester| 65000|
# | roman|   Tester| 82000|
# | simon|Developer| 98000|
# |  eric|Developer|144000|
# |carlos|   Tester| 75000|
# | henry|Developer|110000|
# +------+---------+------+

med = df.approxQuantile("Salary", [0.5], 0.25) # no need to import DataFrameStatFunctions
med
# [98000.0]

The second oneDataFrameStatFunctions 的一部分,但如果您按照您的方式使用它,则会收到报告的错误:

from pyspark.sql import DataFrameStatFunctions as statFunc
med2 = statFunc.approxQuantile( "Salary", [0.5], 0.25)
# TypeError: unbound method approxQuantile() must be called with DataFrameStatFunctions instance as first argument (got str instance instead)

因为正确的用法是

med2 = statFunc(df).approxQuantile( "Salary", [0.5], 0.25)
med2
# [82000.0]

尽管您无法在 PySpark 文档中找到有关此问题的简单示例(我自己花了一些时间才弄清楚)...最好的部分?这两个值不相等:

med == med2
# False

我怀疑这是由于使用了不确定性算法(毕竟,它应该是一个近似中值),即使您使用相同的玩具数据重新运行命令你可能会得到不同的值(并且与我在这里报告的值不同) - 我建议尝试一下以获得感觉......

但是,正如我已经说过的,这并不是您不能使用 approxQuantile 在新数据帧列中填充值的原因 - 即使您使用正确的语法,您也会收到不同的错误:

df2 = df.withColumn('median_salary', statFunc(df).approxQuantile( "Salary", [0.5], 0.25))
# AssertionError: col should be Column

这里,col指的是withColumn操作的第二个参数,即approxQuantile,错误消息显示它是不是 Column 类型 - 事实上,它是一个列表:

type(statFunc(df).approxQuantile( "Salary", [0.5], 0.25))
# list

因此,在填充列值时,Spark 需要 Column 类型的参数,并且您不能使用列表;下面是创建一个新列的示例,其中每个角色的平均值而不是中值:

import pyspark.sql.functions as func
from pyspark.sql import Window

windowSpec = Window.partitionBy(df['Role'])
df2 = df.withColumn('mean_salary', func.mean(df['Salary']).over(windowSpec))
df2.show()
# +------+---------+------+------------------+
# |  Name|     Role|Salary|       mean_salary| 
# +------+---------+------+------------------+
# |  carl|   Tester| 70000|           73000.0| 
# |   jon|   Tester| 65000|           73000.0|
# | roman|   Tester| 82000|           73000.0|
# |carlos|   Tester| 75000|           73000.0|
# |   bob|Developer|125000|128333.33333333333|
# |  mark|Developer|108000|128333.33333333333| 
# | peter|Developer|185000|128333.33333333333| 
# | simon|Developer| 98000|128333.33333333333| 
# |  eric|Developer|144000|128333.33333333333|
# | henry|Developer|110000|128333.33333333333| 
# +------+---------+------+------------------+

它之所以有效,是因为与approxQuantile相反,mean返回一个Column:

type(func.mean(df['Salary']).over(windowSpec))
# pyspark.sql.column.Column

关于apache-spark - pyspark approxQuantile 函数,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/45287832/

相关文章:

apache-spark - 无法在远程模式下将 SparkGraphComputer 与 Tinkerpop 3.2.3 和 Janusgraph 0.1.1 一起使用

scala - (run-main-0) java.lang.NoSuchMethodError

java - 使用 Spark 从 Azure Blob 读取数据

pyspark - 在 pyspark 中聚合 One-Hot 编码功能

apache-spark - Spark-Csv 写引用模式不起作用

apache-spark - Cassandra datastax 驱动程序连接突然终止

python - 获取数据框列及其值作为 pyspark 中的变量

apache-spark - 从 pyspark 数据帧中减去平均值

pyspark - 如何迭代 Spark 数据帧的一列并逐个访问其中的值?

java - 如何将整列的大小写更改为小写?