python - Spark 上的行明智计算

标签 python apache-spark

基于此answer 我需要进行一些逐行计算

result= (reduce(add, (<some row wise calculation on col(x)> for x in df.columns[1:])) / n).alias("result")

但在此之前我需要按降序对行值进行排序(更改数据框中每行的列顺序?) 假设我有以下行

 3,7,21,9
 5,15,10,2
例如,我需要知道每行每个值的排名(顺序),然​​后计算总和(值/索引) 对于第一行

21 ->4,9->3,7->3,3->1,sum(21/4,9/3,7/3,3/1)

第二行

15->4,10->3,5->2,2->1,sum(15/4,10/4,5/2,2/1)

不是重复的,因为我需要不是按列而是按行排序

最佳答案

假设您的输入数据框如下

+----+----+----+----+
|col1|col2|col3|col4|
+----+----+----+----+
|3   |7   |21  |9   |
|5   |15  |10  |2   |
+----+----+----+----+

然后你可以编写一个udf函数来获取你想要的输出列

from pyspark.sql import functions as f
from pyspark.sql import types as t
def sortAndIndex(list):
    return sorted([(value, index+1) for index, value in enumerate(sorted(list))],  reverse=True)

sortAndIndexUdf = f.udf(sortAndIndex, t.ArrayType(t.StructType([t.StructField('key', t.IntegerType(), True), t.StructField('value', t.IntegerType(), True)])))

df.withColumn('sortedAndIndexed', sortAndIndexUdf(f.array([x for x in df.columns])))

这应该给你

+----+----+----+----+----------------------------------+
|col1|col2|col3|col4|sortedAndIndexed                  |
+----+----+----+----+----------------------------------+
|3   |7   |21  |9   |[[21, 4], [9, 3], [7, 2], [3, 1]] |
|5   |15  |10  |2   |[[15, 4], [10, 3], [5, 2], [2, 1]]|
+----+----+----+----+----------------------------------+

更新

您评论为

my calculation should be sum(value/index) so probably using yours udf funcrtion I should return some kind of reduce(add,)?

为此你可以做到

from pyspark.sql import functions as f
from pyspark.sql import types as t
def divideAndSum(list):
    return sum([float(value)/(index+1) for index, value in enumerate(sorted(list))])

divideAndSumUdf = f.udf(divideAndSum, t.DoubleType())

df.withColumn('divideAndSum', divideAndSumUdf(f.array([x for x in df.columns])))

这应该给你

+----+----+----+----+------------------+
|col1|col2|col3|col4|divideAndSum      |
+----+----+----+----+------------------+
|3   |7   |21  |9   |14.75             |
|5   |15  |10  |2   |11.583333333333334|
+----+----+----+----+------------------+

关于python - Spark 上的行明智计算,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/50666790/

相关文章:

configuration - 错误 YarnClientSchedulerBackend : Asked to remove non-existent executor 21

python - MySQL "IN"查询中的多个参数

python - 如何在 Python 中使用计时器解锁条件?

scala - 为什么加入两个数据集并应用过滤器会导致 “error: constructor cannot be instantiated to expected type”?

scala - 如何连接到 Pivotal HD(来自 Spark)?

exception - Apache Spark 任务不可序列化

python - 计算 pySpark 中非唯一列表元素的累积和

python - 有没有办法从 python 异步或并行运行 SQLite 查询?

python - 使用更新的 Python 2.6.2 在 Snow Leopard 上安装 Python Imaging Library (PIL)

python - 从 Django View 中的 HttpResponse 检索 JSON