假设我有一个 pyspark DataFrame (DF):
-----------------------------
record_id | foo | bar
-----------------------------
1 | random text | random text
2 | random text | random text
3 | random text | random text
1 | random text | random text
2 | random text | random text
-----------------------------
我的最终目标是使用 .write.jdbc()
将这些行写入 MySQL,我已经成功做到了这一点。但现在,在此之前,根据 record_id
列的唯一性添加一个新列 unique
。
我在使用类似的方法识别唯一的 record_id
方面取得了一些进展:
df.select('record_id').distinct().rdd.map(lambda r: r[0])
但与 Panda 的 DataFrames 不同,我不相信它有可以重用的索引,它似乎只是值。我对 Spark/Pyspark 还很陌生。
尝试找出以下工作流程是否有意义?
- 识别具有不同
record_id
的行,并写入 MySQL - 然后,识别剩余的行,并写入 MySQL
或者是否可以更改原始 DF,根据一些链接命令添加一个新列unique
?类似于下面的内容,然后我可以将其批量写入 MySQL:
----------------------------------
record_id | foo | bar | unique
----------------------------------
1 | random text | random text | 0
2 | random text | random text | 0
3 | random text | random text | 1 # where 1 for boolean True
1 | random text | random text | 0
2 | random text | random text | 0
----------------------------------
如有任何建议或建议,我们将不胜感激!
最佳答案
可以统计行数partitionBy record_id,如果record_id只有一行,将其标记为唯一:
from pyspark.sql.window import Window
import pyspark.sql.functions as F
df.withColumn("unique", (F.count("record_id").over(Window.partitionBy("record_id")) == 1).cast('integer')).show()
+---------+-----------+-----------+------+
|record_id| foo| bar|unique|
+---------+-----------+-----------+------+
| 3|random text|random text| 1|
| 1|random text|random text| 0|
| 1|random text|random text| 0|
| 2|random text|random text| 0|
| 2|random text|random text| 0|
+---------+-----------+-----------+------+
关于python - Pyspark DataFrame 选择具有不同值的行和具有非不同值的行,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/47211837/