apache-spark - 使用至少包含一个特定值的窗口过滤数据框

标签 apache-spark pyspark

假设您有一个包含两列的 pyspark 数据框:idval
您想要过滤行,其中单个 id 窗口至少有一个具有特定值的 val 条目。

例如如果我们想要获取行,其中 id 窗口的 val 列中至少有一个 5 值:

# Input:

df = spark.createDataFrame(
    [(1, '0'), 
     (1, '5'),
     (2, '2'),
     (2, '5'),    
     (2, '5'),
     (3, '1'),
     (3, '0'),],
     ['id', 'val']
)

# Desired output:

+---+----+
| id| val|
+---+----+
|  1|   0|
|  1|   5|
|  2|   2|
|  2|   5|
|  2|   5|
+---+----+

我想过以某种方式使用窗口函数吗?

最佳答案

使用 Spark-2.4 中的 array_contains() 函数,然后加入 df 以仅获取具有以下内容的 id其中 5 值。

示例:

from pyspark.sql.functions import *
from pyspark.sql.types import *

df1=df.groupBy("id").\
agg(array_contains(collect_set(col("val")).cast("array<int>"),5).alias("has_5")).\
filter(col("has_5")).\
drop('has_5')

df.join(df1,['id'],'inner').show()
#+---+---+
#| id|val|
#+---+---+
#|  1|  0|
#|  1|  5|
#|  2|  2|
#|  2|  5|
#|  2|  5|
#+---+---+

使用窗口函数的另一种方式:

import sys
from pyspark.sql import *

w=Window.partitionBy("id").orderBy("val").rowsBetween(-sys.maxsize,sys.maxsize)

df.withColumn("has_5",array_contains(collect_set(col("val")).over(w).cast("array<int>"),5)).\
filter(col("has_5")).\
drop("has_5").\
show()
#+---+---+
#| id|val|
#+---+---+
#|  1|  0|
#|  1|  5|
#|  2|  2|
#|  2|  5|
#|  2|  5|
#+---+---+

关于apache-spark - 使用至少包含一个特定值的窗口过滤数据框,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/63451475/

相关文章:

apache-spark - 如何理解Spark MLlib的libsvm的格式类型?

apache-spark - 计算 pyspark Dataframe 中的列数?

apache-spark - 在 Pyspark 中将稀疏向量转换为密集向量

python - 将 PySpark 数据框列类型转换为字符串并替换方括号

azure - 如何通过命令行界面将作业(jar)提交到Azure Spark集群?

java - 获取broadcast5的broadcast_5_piece0失败

python - 根据列减去2个pyspark数据帧

azure - Azure Synapse Notebook 中的 WriteStream(格式为 ('console' )

java - Spark Structured Streaming 自动将时间戳转换为本地时间

java - 如何在 Web 上运行 Apache Spark 作业后获取输出