python - 如何使用 PySpark 删除数据框中基于多个过滤器的列?

标签 python python-3.x dataframe apache-spark pyspark

我有一个单元格可以具有的有效值的列表。如果一列中的一个单元格无效,我需要删除整列。我知道有删除特定列中的行的答案,但在这里我将删除整个列,即使其中一个单元格无效。有效/无效的条件是一个单元格只能有三个值:['Messi', 'Ronaldo', 'Virgil']

我尝试阅读有关过滤的内容,但我所能看到的只是过滤列并删除行。例如this问题。我还读到,应该避免在 Spark 中进行过多的扫描和洗牌,我同意这一点。

我不仅查看代码解决方案,还查看 PySpark 提供的现成代码。我希望它不会超出 SO 答案的范围。

对于以下输入数据框:

| Column 1      | Column 2      | Column 3      | Column 4      | Column 5      |
| --------------| --------------| --------------| --------------| --------------|
|  Ronaldo      | Salah         |  Messi        |               |Salah          |
|  Ronaldo      | Messi         |  Virgil       |  Messi        | null          |
|  Ronaldo      | Ronaldo       |  Messi        |  Ronaldo      | null          |

我期望以下输出:

| Column 1      | Column 2      |
| --------------| --------------| 
|  Ronaldo      | Messi         |
|  Ronaldo      | Virgil        |
|  Ronaldo      | Messi         |

最佳答案

I am not only looking at the code solution but more on the off-the-shelf code provided from PySpark.

不幸的是,Spark 被设计为逐行并行操作。过滤列并不是一个“现成的代码”解决方案。

尽管如此,您可以采取以下一种方法:

首先收集每列中无效元素的计数。

from pyspark.sql.functions import col, lit, sum as _sum, when

valid = ['Messi', 'Ronaldo', 'Virgil']
invalid_counts = df.select(
    *[_sum(when(col(c).isin(valid), lit(0)).otherwise(lit(1))).alias(c) for c in df.columns]
).collect()
print(invalid_counts)
#[Row(Column 1=0, Column 2=1, Column 3=0, Column 4=1, Column 5=3)]

此输出将是一个仅包含一个元素的列表。您可以迭代此元素中的项目以查找要保留的列。

valid_columns = [k for k,v in invalid_counts[0].asDict().items() if v == 0]
print(valid_columns)
#['Column 3', 'Column 1']

现在只需从原始 DataFrame 中选择这些列即可。如果您想保持原始列顺序,可以先使用 list.indexvalid_columns 进行排序。

valid_columns = sorted(valid_columns, key=df.columns.index)
df.select(valid_columns).show()
#+--------+--------+
#|Column 1|Column 3|
#+--------+--------+
#| Ronaldo|   Messi|
#| Ronaldo|  Virgil|
#| Ronaldo|   Messi|
#+--------+--------+

关于python - 如何使用 PySpark 删除数据框中基于多个过滤器的列?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/58120774/

相关文章:

python - urllib open - 如何控制重试次数

python - 如何在 python 3 中将二进制值转换为文本或 ascii?

Python DataFrame 从每日数据中选择每月增量的行

Python 字符编码欧洲口音

python - Django 应用程序的照片管理

尝试使用 Google Maps API 时出现 Python socket.py gaierror

python - 如果元素总和为给定数字,则追加到列表中

arrays - cv.fillPoly生成零数组,不读取输入

python - Python 中检查对象属性是否分配了 DataFrame 的最有效方法?

python - 从 R 到 Python 的 case_when 函数