python - PySpark 根据列条件删除重复项

标签 python apache-spark pyspark

对 Spark 还很陌生,我正在尝试尽可能干净、高效地完成最终转换。

假设我有一个如下所示的数据框

+------+--------+                  
|ID    | Hit    |                  
+------+--------+
|123   |   0    | 
|456   |   1    |
|789   |   0    |     
|123   |   1    |   
|123   |   0    | 
|789   |   1    |   
|1234  |   0    |
| 1234 |   0    |   
+------+--------+

我试图最终得到一个新的数据帧(或两个,取决于哪个更有效),其中如果一行在“命中”中具有1,则它不能有一行在命中中具有0并且如果有也就是说,0 将根据 ID 列达到不同的级别。

这是我尝试过的方法之一,但我不确定这是否有效 1. 最有效的方法 2.尽可能最干净的方式

dfhits = df.filter(df.Hit == 1)
dfnonhits = df.filter(df.Hit == 0)
dfnonhitsdistinct = dfnonhits.filter(~dfnonhits['ID'].isin(dfhits))

最终数据集如下所示:

+------+--------+                  
|ID    | Hit    |                  
+------+--------+
|456   |   1    |    
|123   |   1    |   
|789   |   1    |   
|1234  |   0    |  
+------+--------+

最佳答案

# Creating the Dataframe.
from pyspark.sql.functions import col
df = sqlContext.createDataFrame([(123,0),(456,1),(789,0),(123,1),(123,0),(789,1),(500,0),(500,0)],
                                ['ID','Hit']) 
df.show()
+---+---+ 
| ID|Hit| 
+---+---+ 
|123|  0| 
|456|  1| 
|789|  0| 
|123|  1| 
|123|  0| 
|789|  1| 
|500|  0| 
|500|  0| 
+---+---+

这个想法是找到每个IDHittotal,如果它大于0,则意味着至少有Hit 中存在一个 1。因此,当此条件为true时,我们将删除所有Hit值为0的

# Registering the dataframe as a temporary view.
df.registerTempTable('table_view')
df=sqlContext.sql(
    'select ID, Hit, sum(Hit) over (partition by ID) as sum_Hit from table_view'
)
df.show()
+---+---+-------+ 
| ID|Hit|sum_Hit| 
+---+---+-------+ 
|789|  0|      1| 
|789|  1|      1| 
|500|  0|      0| 
|500|  0|      0| 
|123|  0|      1| 
|123|  1|      1| 
|123|  0|      1| 
|456|  1|      1| 
+---+---+-------+
df = df.filter(~((col('Hit')==0) & (col('sum_Hit')>0))).drop('sum_Hit').dropDuplicates()
df.show()
+---+---+ 
| ID|Hit|  
+---+---+ 
|789|  1| 
|500|  0| 
|123|  1| 
|456|  1|
+---+---+

关于python - PySpark 根据列条件删除重复项,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/54683614/

相关文章:

python - 如何使用新的 NumPy 随机数生成器?

python - 动态查找两个字符串之间的多个空格

java - 如何在 Scala 中使用 mapPartitions?

hash - 如何为每行 rdd 生成哈希? (PYSPARK)

python - 在 hadoop 中加载大的日本文件

python - 使用child_process将函数和参数从 Node 传递到python

python - 系统发育树着色

apache-spark - 无法运行 ALS.train,错误 : java. lang.IllegalArgumentException

apache-spark - 按时间戳分区好还是按年月日小时分区好

apache-spark - 无法使用 rdd.toDF() 但spark.createDataFrame(rdd) 有效