python - 如何使用 Spark Data Frame 中前一行的两列计算一行中的列?

标签 python sql apache-spark apache-spark-sql pyspark

我正在尝试使用 spark(任何类型:pysparksparkspark sql 等)。

我的数据具有以下形状:

+------------+------+------+--------+
| population | rate | year | city   |
+------------+------+------+--------+
| 100        | 0.1  | 1    | one    |
+------------+------+------+--------+
| 100        | 0.11 | 2    | one    |
+------------+------+------+--------+
| 100        | 0.12 | 3    | one    |
+------------+------+------+--------+
| 200        | 0.1  | 1    | two    |
+------------+------+------+--------+
| 1000       | 0.21 | 2    | three  |
+------------+------+------+--------+
| 1000       | 0.22 | 3    | three  |
+------------+------+------+--------+

population 列是错误的(它来自两个表之间的 join,未显示)。

我想用上一行的 population*(1 + rate) 结果更新 population 列。我知道在 sql 中我可以使用 recursive CTEhiveql 不支持它。

你能给我一些建议吗?

最佳答案

据我了解你的描述,你所需要的只是一些基本的代数和窗口函数。首先让我们重新创建示例数据:

import pandas as pd  # Just to make a reproducible example

pdf = pd.DataFrame({
    'city': {0: 'one', 1: 'one', 2: 'one', 3: 'two', 4: 'three', 5: 'three'},
    'population': {0: 100, 1: 100, 2: 100, 3: 200, 4: 1000, 5: 1000},
    'rate': {0: 0.10000000000000001,
     1: 0.11,
     2: 0.12,
     3: 0.10000000000000001,
     4: 0.20999999999999999,
     5: 0.22},
    'year': {0: 1, 1: 2, 2: 3, 3: 1, 4: 2, 5: 3}})

df = sqlContext.createDataFrame(pdf)

df.show()
## +-----+----------+----+----+
## | city|population|rate|year|
## +-----+----------+----+----+
## |  one|       100| 0.1|   1|
## |  one|       100|0.11|   2|
## |  one|       100|0.12|   3|
## |  two|       200| 0.1|   1|
## |three|      1000|0.21|   2|
## |three|      1000|0.22|   3|
## +-----+----------+----+----+

接下来我们定义窗口:

import sys
from pyspark.sql.window import Window
from pyspark.sql.functions import exp, log, sum, first, col, coalesce

# Base window
w = Window.partitionBy("city").orderBy("year")

# ROWS BETWEEN UNBOUNDED PRECEDING AND 1 PRECEDING
wr = w.rowsBetween(-sys.maxsize, -1)

和一些列:

# Take a sum of logarithms of rates over the window
log_sum = sum(log(col("rate") + 1)).over(wr)

# Take sum of logs and exponentiate to go back to original space 
cumulative_rate = exp(log_sum).alias("cumulative_rate")

# Find base population for each group
base_population = first("population").over(w).alias("base_population")

# Prepare final column (base population * cumulative product of rates)
current_population = coalesce(
     # This is null for the first observation in a group
     cumulative_rate * base_population, 
     # so we provide population as an alternative
     col("population")  
).alias("current_population")

最后我们可以如下使用这些

df.select("*", current_population).show()

## +-----+----------+----+----+------------------+
## | city|population|rate|year|current_population|
## +-----+----------+----+----+------------------+
## |three|      1000|0.21|   2|            1000.0|
## |three|      1000|0.22|   3|            1210.0|
## |  two|       200| 0.1|   1|             200.0|
## |  one|       100| 0.1|   1|             100.0|
## |  one|       100|0.11|   2|110.00000000000001|
## |  one|       100|0.12|   3|122.10000000000004|
## +-----+----------+----+----+------------------+

关于python - 如何使用 Spark Data Frame 中前一行的两列计算一行中的列?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/33662691/

相关文章:

python - 尝试从unittest.TestCase和另一个类继承

python - 读取列标题中包含多个定界符的文件并在末尾跳过某些行

mysql - SQL语句忽略where参数

apache-spark - 为什么 Spark(在 Google Dataproc 上)不使用所有 vcore?

apache-spark - Spark Window 聚合与 Group By/Join 性能

python - 根据前一行的值从数据框中过滤行

python - 在 ubuntu 上将 python 包关联到不同版本的 python

java - 在使用 Java 调用 Spring 存储过程时出现语法错误

sql - 在 SQL 中找到三个连续值的最大总和?

java - 如何使用单个 Spark 上下文在 Apache Spark 中运行并发作业(操作)