python - PySpark 中可变列数的总和

标签 python apache-spark pyspark apache-spark-sql

我有一个像这样的 Spark DataFrame:

+-----+--------+-------+-------+-------+-------+-------+
| Type|Criteria|Value#1|Value#2|Value#3|Value#4|Value#5|
+-----+--------+-------+-------+-------+-------+-------+
|  Cat|       1|      1|      2|      3|      4|      5|
|  Dog|       2|      1|      2|      3|      4|      5|
|Mouse|       4|      1|      2|      3|      4|      5|
|  Fox|       5|      1|      2|      3|      4|      5|
+-----+--------+-------+-------+-------+-------+-------+

您可以使用以下代码重现它:

data = [('Cat', 1, 1, 2, 3, 4, 5),
        ('Dog', 2, 1, 2, 3, 4, 5),
        ('Mouse', 4, 1, 2, 3, 4, 5),
        ('Fox', 5, 1, 2, 3, 4, 5)]
columns = ['Type', 'Criteria', 'Value#1', 'Value#2', 'Value#3', 'Value#4', 'Value#5']
df = spark.createDataFrame(data, schema=columns)
df.show()

我的任务是添加总计列,该列是所有值列的总和,其中 # 不超过该行的条件。

在此示例中:

  • 对于 'Cat' 行:条件为 1,因此 Total 只是 Value#1
  • 对于 'Dog' 行:条件为 2,因此 TotalValue#1 的总和和值#2
  • 对于行 'Fox':条件为 5,因此 Total 是所有列的总和 (Value#1Value#5)。

结果应如下所示:

+-----+--------+-------+-------+-------+-------+-------+-----+
| Type|Criteria|Value#1|Value#2|Value#3|Value#4|Value#5|Total|
+-----+--------+-------+-------+-------+-------+-------+-----+
|  Cat|       1|      1|      2|      3|      4|      5|    1|
|  Dog|       2|      1|      2|      3|      4|      5|    3|
|Mouse|       4|      1|      2|      3|      4|      5|   10|
|  Fox|       5|      1|      2|      3|      4|      5|   15|
+-----+--------+-------+-------+-------+-------+-------+-----+

我可以使用Python UDF来完成,但是我的数据集很大,并且Python UDF由于序列化而很慢。我正在寻找纯粹的 Spark 解决方案。

我正在使用 PySpark 和 Spark 2.1

最佳答案

您可以轻松地将解决方案调整为 PySpark: compute row maximum of the subset of columns and add to an exisiting dataframe通过 user6910411

from pyspark.sql.functions import col, when

total = sum([
    when(col("Criteria") >= i, col("Value#{}".format(i))).otherwise(0)
    for i in range(1, 6)
])

df.withColumn("total", total).show()

# +-----+--------+-------+-------+-------+-------+-------+-----+
# | Type|Criteria|Value#1|Value#2|Value#3|Value#4|Value#5|total|
# +-----+--------+-------+-------+-------+-------+-------+-----+
# |  Cat|       1|      1|      2|      3|      4|      5|    1|
# |  Dog|       2|      1|      2|      3|      4|      5|    3|
# |Mouse|       4|      1|      2|      3|      4|      5|   10|
# |  Fox|       5|      1|      2|      3|      4|      5|   15|
# +-----+--------+-------+-------+-------+-------+-------+-----+

对于任意一组订单列,定义一个列表:

cols = df.columns[2:]

并将总计重新定义为:

total_ = sum([
    when(col("Criteria") > i, col(cols[i])).otherwise(0)
    for i in range(len(cols))
])

df.withColumn("total", total_).show()
# +-----+--------+-------+-------+-------+-------+-------+-----+
# | Type|Criteria|Value#1|Value#2|Value#3|Value#4|Value#5|total|
# +-----+--------+-------+-------+-------+-------+-------+-----+
# |  Cat|       1|      1|      2|      3|      4|      5|    1|
# |  Dog|       2|      1|      2|      3|      4|      5|    3|
# |Mouse|       4|      1|      2|      3|      4|      5|   10|
# |  Fox|       5|      1|      2|      3|      4|      5|   15|
# +-----+--------+-------+-------+-------+-------+-------+-----+

重要:

这里 sum__builtin__.sum 而不是 pyspark.sql.functions.sum

关于python - PySpark 中可变列数的总和,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/51732843/

相关文章:

python - 树莓派矩阵乘法

python - 字典上的列表理解给出空列表

python - 聚类经纬度gps数据

apache-spark - 如果我没有足够的内存,将会产生什么 Spark ?

将空列替换为 r 中 Spark 数据帧中另一列中的值

apache-spark - 为什么 pyspark 中的 "serialized results of n tasks (XXXX MB)"可能大于 `spark.driver.memory`?

python - 键/值对的两个字典列表的交集

apache-spark - Spark 。约 1 亿行。大小超过 Integer.MAX_VALUE?

python - key 错误 : SPARK_HOME during SparkConf initialization

python - PySpark 无法将字典的 RDD 转换为 DataFrame。错误 : can not accept object in type <class 'pyspark.sql.types.Row' >