sql - Pyspark:根据多种条件过滤数据框

标签 sql filter pyspark apache-spark-sql pyspark-sql

我想首先根据以下条件过滤数据帧(d<5),其次(如果 col1 中的值等于 col3 中的对应值,则 col2 的值不等于 col4 中的对应值)。

如果原始数据帧DF如下:

+----+----+----+----+---+
|col1|col2|col3|col4|  d|
+----+----+----+----+---+
|   A|  xx|   D|  vv|  4|
|   C| xxx|   D|  vv| 10|
|   A|   x|   A|  xx|  3|
|   E| xxx|   B|  vv|  3|
|   E| xxx|   F| vvv|  6|
|   F|xxxx|   F| vvv|  4|
|   G| xxx|   G| xxx|  4|
|   G| xxx|   G|  xx|  4|
|   G| xxx|   G| xxx| 12|
|   B|xxxx|   B|  xx| 13|
+----+----+----+----+---+

所需的数据框是:

+----+----+----+----+---+
|col1|col2|col3|col4|  d|
+----+----+----+----+---+
|   A|  xx|   D|  vv|  4|
|   A|   x|   A|  xx|  3|
|   E| xxx|   B|  vv|  3|
|   F|xxxx|   F| vvv|  4|
|   G| xxx|   G|  xx|  4|
+----+----+----+----+---+

我试过的代码没有按预期工作:

cols=[('A','xx','D','vv',4),('C','xxx','D','vv',10),('A','x','A','xx',3),('E','xxx','B','vv',3),('E','xxx','F','vvv',6),('F','xxxx','F','vvv',4),('G','xxx','G','xxx',4),('G','xxx','G','xx',4),('G','xxx','G','xxx',12),('B','xxxx','B','xx',13)]
df=spark.createDataFrame(cols,['col1','col2','col3','col4','d'])

df.filter((df.d<5)& (df.col2!=df.col4) & (df.col1==df.col3)).show()

+----+----+----+----+---+
|col1|col2|col3|col4|  d|
+----+----+----+----+---+
|   A|   x|   A|  xx|  3|
|   F|xxxx|   F| vvv|  4|
|   G| xxx|   G|  xx|  4|
+----+----+----+----+---+

我应该怎么做才能达到预期的结果?

最佳答案

你的逻辑条件是错误的。 IIUC,你想要的是:

import pyspark.sql.functions as f

df.filter((f.col('d')<5))\
    .filter(
        ((f.col('col1') != f.col('col3')) | 
         (f.col('col2') != f.col('col4')) & (f.col('col1') == f.col('col3')))
    )\
    .show()

我打破了filter()进入 2 次调用以提高可读性,但您可以等效地在一行中完成。

输出:

+----+----+----+----+---+
|col1|col2|col3|col4|  d|
+----+----+----+----+---+
|   A|  xx|   D|  vv|  4|
|   A|   x|   A|  xx|  3|
|   E| xxx|   B|  vv|  3|
|   F|xxxx|   F| vvv|  4|
|   G| xxx|   G|  xx|  4|
+----+----+----+----+---+

关于sql - Pyspark:根据多种条件过滤数据框,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/49301373/

相关文章:

spring-mvc - 如何将 Spring DelegatingFilterProxy 与多个过滤器和 Spring Security 一起使用?

angular - 在Angular 6中通过http get处理复杂对象

Pyspark 日期到时间戳(零小时/午夜)

python - 如何将 Pyspark Dataframe header 设置为另一行?

apache-spark - Spark for Python - 无法将字符串列转换为十进制/ double

mysql - 动态查询 where 子句检查

php - MySQL 从单个选择中进行多次插入

php - 在 php 中的一个字段中输入值并将另一个字段留空会得到错误的结果

sql - 是否可以使用 SELECT FROM DUAL 获得多行?

java - 如何使用 Java 在运行时缩小不同的 javascript 文件