python - Pyspark DataFrame 选择具有不同值的行和具有非不同值的行

标签 python apache-spark dataframe pyspark

假设我有一个 pyspark DataFrame (DF):

-----------------------------
record_id | foo | bar
-----------------------------
1 | random text | random text
2 | random text | random text
3 | random text | random text
1 | random text | random text
2 | random text | random text
-----------------------------

我的最终目标是使用 .write.jdbc() 将这些行写入 MySQL,我已经成功做到了这一点。但现在,在此之前,根据 record_id 列的唯一性添加一个新列 unique

我在使用类似的方法识别唯一的 record_id 方面取得了一些进展:

df.select('record_id').distinct().rdd.map(lambda r: r[0])

但与 Panda 的 DataFrames 不同,我不相信它有可以重用的索引,它似乎只是值。我对 Spark/Pyspark 还很陌生。

尝试找出以下工作流程是否有意义?

  1. 识别具有不同 record_id 的行,并写入 MySQL
  2. 然后,识别剩余的行,并写入 MySQL

或者是否可以更改原始 DF,根据一些链接命令添加一个新列unique?类似于下面的内容,然后我可以将其批量写入 MySQL:

----------------------------------
record_id | foo | bar | unique 
----------------------------------
1 | random text | random text | 0
2 | random text | random text | 0
3 | random text | random text | 1 # where 1 for boolean True
1 | random text | random text | 0
2 | random text | random text | 0
----------------------------------

如有任何建议或建议,我们将不胜感激!

最佳答案

可以统计行数partitionBy record_id,如果record_id只有一行,将其标记为唯一:

from pyspark.sql.window import Window
import pyspark.sql.functions as F

df.withColumn("unique", (F.count("record_id").over(Window.partitionBy("record_id")) == 1).cast('integer')).show()
+---------+-----------+-----------+------+
|record_id|        foo|        bar|unique|
+---------+-----------+-----------+------+
|        3|random text|random text|     1|
|        1|random text|random text|     0|
|        1|random text|random text|     0|
|        2|random text|random text|     0|
|        2|random text|random text|     0|
+---------+-----------+-----------+------+

关于python - Pyspark DataFrame 选择具有不同值的行和具有非不同值的行,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/47211837/

相关文章:

python - 沿特定维度从 ndarray 中减去矩阵而不重新整形

apache-spark - 将PySpark Dataframe批量写入SQL DB

r - 识别数据框 A 中未包含在数据框 B 中的记录

python - 按照特定模式从列中提取字符串

python - 从旧数据框创建子列

python - mysql :Query set is not working in python

Python字符串 'in'算子实现算法和时间复杂度

python - 如何将多次出现的二进制列转换为 Pandas 中的分类数据

java - 为什么 Spark 应用程序会失败并显示 "Exception in thread "main"java.lang.NoClassDefFoundError : . ..StringDeserializer"?

scala - SparkSession 不接受运行时配置