apache-spark - Spark数据帧过滤器操作

标签 apache-spark pyspark apache-spark-sql

我有一个 Spark 数据框,然后过滤要应用的字符串,过滤器仅选择某些行,但我想知道未选择行的原因。 示例:

数据帧列:customer_id|col_a|col_b|col_c|col_d

过滤字符串:col_a > 0 & col_b > 4 & col_c < 0 & col_d=0

enter image description here

等等...

reason_for_exclusion可以是任何字符串或字母,只要它说明为什么排除特定行。

我可以拆分过滤器字符串并应用每个过滤器,但我有巨大的过滤器字符串,效率很低,因此只需检查是否有更好的方法来执行此操作?

谢谢

最佳答案

您必须检查过滤表达式中的每个条件,这对于简单的过滤操作来说可能会很昂贵。 我建议对所有被过滤的行显示相同的原因,因为它至少满足该表达式中的一个条件。它不太漂亮,但我更喜欢它,因为它很高效,尤其是当您必须处理非常大的 DataFrame 时。

data = [(1, 1, 5, -3, 0),(2, 0, 10, -1, 0), (3, 0, 10, -4, 1),]
df = spark.createDataFrame(data, ["customer_id", "col_a", "col_b", "col_c", "col_d"])

filter_expr = "col_a > 0 AND col_b > 4 AND col_c < 0 AND col_d=0"

filtered_df = df.withColumn("reason_for_exclusion", 
                            when(~expr(filter_expr),lit(filter_expr)
                                ).otherwise(lit(None))
                            )
filtered_df.show(truncate=False)

输出:

+-----------+-----+-----+-----+-----+-------------------------------------------------+
|customer_id|col_a|col_b|col_c|col_d|reason_for_exclusion                             |
+-----------+-----+-----+-----+-----+-------------------------------------------------+
|1          |1    |5    |-3   |0    |null                                             |
|2          |0    |10   |-1   |0    |col_a > 0 AND col_b > 4 AND col_c < 0 AND col_d=0|
|3          |0    |10   |-4   |1    |col_a > 0 AND col_b > 4 AND col_c < 0 AND col_d=0|
+-----------+-----+-----+-----+-----+-------------------------------------------------+

编辑:

现在,如果您确实只想显示失败的条件,您可以将每个条件转换为单独的列并使用 DataFrame select进行计算。然后您必须检查评估为 False 的列知道哪个条件失败了。

您可以将这些列命名为 <PREFIX>_<condition>以便您以后可以轻松识别它们。这是一个完整的示例:

filter_expr = "col_a > 0 AND col_b > 4 AND col_c < 0 AND col_d=0"
COLUMN_FILTER_PREFIX = "filter_validation_"
original_columns = [col(c) for c in df.columns]

# create column for each condition in filter expression
condition_columns = [expr(f).alias(COLUMN_FILTER_PREFIX + f) for f in filter_expr.split("AND")]

# evaluate condition to True/False and persist the DF with calculated columns
filtered_df = df.select(original_columns + condition_columns)
filtered_df = filtered_df.persist(StorageLevel.MEMORY_AND_DISK)

# get back columns we calculated for filter
filter_col_names = [c for c in filtered_df.columns if COLUMN_FILTER_PREFIX in c]
filter_columns = list()
for c in filter_col_names:
    filter_columns.append(
        when(~col(f"`{c}`"),
             lit(f"{c.replace(COLUMN_FILTER_PREFIX, '')}")
             )
    )
array_reason_filter = array_except(array(*filter_columns), array(lit(None)))
df_with_filter_reason = filtered_df.withColumn("reason_for_exclusion", array_reason_filter)

df_with_filter_reason.select(*original_columns, col("reason_for_exclusion")).show(truncate=False)

# output
+-----------+-----+-----+-----+-----+----------------------+
|customer_id|col_a|col_b|col_c|col_d|reason_for_exclusion  |
+-----------+-----+-----+-----+-----+----------------------+
|1          |1    |5    |-3   |0    |[]                    |
|2          |0    |10   |-1   |0    |[col_a > 0 ]          |
|3          |0    |10   |-4   |1    |[col_a > 0 ,  col_d=0]|
+-----------+-----+-----+-----+-----+----------------------+

关于apache-spark - Spark数据帧过滤器操作,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/59229793/

相关文章:

python - 获取 Spark RDD 中每个键的最大值

python - 将新的键/值对添加到 Spark MapType 列

apache-spark - 将流式数据集附加到 Spark 中的批处理数据集

scala - 如何计算代币?

hadoop - 使用spark读取avro数据并无法将org.apache.avro.util.Utf8强制转换为java.lang.String异常

mysql - java.sql.SQLException : other error: request outdated while connecting to TIDB from Spark using mysql-connector-java 5. 1.6 连接器

apache-spark - 在Kubernetes/Mesos中运行YARN集群

python - PySpark - 将单个整数列表与列表列进行比较

使用临时表时 pyspark 中的 SQL 查询错误

apache-spark - PySpark错误:AttributeError:'NoneType'对象没有属性'_jvm'