python - 通过其他键将具有非唯一 ID 的列添加到 pyspark 数据帧

标签 python pyspark apache-spark-sql

对标题表示歉意 - 不知道如何轻松总结我的问题。

我有一个 pyspark 数据框,包含 2 列、代码和 emp。每个唯一的代码值都有多个 emp 值,如下所示。我希望添加一列,对于每个唯一的代码值,应用一个递增的数字,例如下面的值列。我玩过monotoniclyIncreasingId(),但没有设法将其 id 创建限制为一个特定的代码键,而且文档确实表明索引不需要按顺序递增。

+----+---+-----+
|code|emp|value|
+----+---+-----+
|   a| 14|    1|
|   a| 22|    2|
|   a| 35|    3|
|   a| 64|    4|
|   b| 12|    1|
...
+----+---+-----+

如果这对效率有影响的话,每个代码值最多有 4 个 emp 值。索引应随着 emp 值的大小而递增 - 最低值应为 1,最高值为 n,其中 n 是具有特定代码的记录数。

最佳答案

您可以将row_number()窗口化函数结合使用。

首先导入Windowrow_number

from pyspark.sql import Window
from pyspark.sql.functions import row_number()

假设您的场景具有以下列和值

>>> cols1 = ['code', 'emp']
>>> vals1 = [
     ('a', 14),
     ('a', 22),
     ('a', 35),
     ('a', 64),
     ('b', 12),
     ('b', 35)
]
# Create a DataFrame
>>> df1 = spark.createDataFrame(vals1, cols1)

# Result of 'df1' table.
>>> df1.show()
+----+---+
|code|emp|
+----+---+
|   a| 14|
|   a| 22|
|   a| 35|
|   a| 64|
|   b| 12|
|   b| 35|
+----+---+

code列上应用row_number()

>>> val = df1.withColumn("value", row_number().over(Window.partitionBy("code").orderBy("emp")))

>>> val.show()
+----+---+-----+
|code|emp|value|
+----+---+-----+
|   b| 12|    1|
|   b| 35|    2|
|   a| 14|    1|
|   a| 22|    2|
|   a| 35|    3|
|   a| 64|    4|
+----+---+-----+

最后,按列code排序即可得到想要的结果。

>>> val.orderBy('code').show()
+----+---+-----+
|code|emp|value|
+----+---+-----+
|   a| 14|    1|
|   a| 22|    2|
|   a| 35|    3|
|   a| 64|    4|
|   b| 12|    1|
|   b| 35|    2|
+----+---+-----+
  • partitionBy:创建一个已定义分区的 WindowSpec。

欲了解更多信息,请参阅:

关于python - 通过其他键将具有非唯一 ID 的列添加到 pyspark 数据帧,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/53065994/

相关文章:

Python Manager通过注册表访问dict

python - Databricks - Pyspark - 使用动态键处理嵌套的 json

scala - 如何在Spark SQL中使用连字符对列名进行转义

apache-spark-sql - Spark SQL from_json 文档

apache-spark - 在 PySpark 数据框中修剪字符串列

python3 线程输出帮助。这是正确的输出吗?

python - Pandas:将分组的 df 转换为以两列作为键、值对的字典列表

apache-spark - 在 Google Colab 中使用图形框架

python - 如何在 python 3.2 中解压缩 c 结构?

python - 作业因阶段失败而中止 : Task 5 in stage 3. 0 失败 1 次