apache-spark - 如何在spark查询中不硬编码任何列名的情况下检查一行的所有列是否为空?

标签 apache-spark pyspark

如何在 pyspark 的查询中不硬编码任何列名的情况下检查一行的所有列是否为空?我有一个对象类型的列,包含 json 格式的字符串。如果列的所有 json 字段都为空,我想引发异常。目前,如果我执行columnname.jsonfield,那么我的代码会按预期抛出异常,但我想检查所有字段是否为空。我无法对所有 json 字段进行硬编码,因为我的一些表包含 200 多个 json 字段

例如: 列名是值 {"active":null,"id":null,"uuid":null}

Value.active 工作正常,但我需要类似于 value.* 的东西,它在 pyspark 的 where 子句中不起作用。

最佳答案

您可以展平数据框中的(可能是嵌套的)列,然后检查展平结构的每个元素是否为空:

from pyspark.sql import functions as F
from pyspark.sql import types as T
from functools import reduce

#create some testdata with nested structures
data = ['{"active":null,"id":null,"uuid":null}', 
    '{"active":1,"id":2,"uuid":3}',
    '{"active":4,"id":5,"uuid":6, "x": {"y":7, "z":8, "a":{"b":9,"c":10}}}',
    '{"active":4,"id":5,"uuid":6, "x": {"y":7, "z":8, "a":{"b":9,"c":null}}}']

df = spark.read.json(spark.sparkContext.parallelize(data))

def flatten(field, prefix=""):
    if isinstance(field, T.StructType):
        for f in field:
            yield from flatten(f, prefix)
    elif isinstance(field, T.StructField) and isinstance(field.dataType, T.StructType):
        for f in field.dataType:
            yield from flatten(f, prefix + field.name + ".")
    else:
        yield prefix + field.name

cols_are_null = [F.col(c).isNull() for c in flatten(df.schema)]
all_cols_are_null = reduce(lambda l,r: l & r, cols_are_null)
df.withColumn("all_cols_are_null", all_cols_are_null).show()

#+------+----+----+-----------------+-----------------+
#|active|  id|uuid|                x|all_cols_are_null|
#+------+----+----+-----------------+-----------------+
#|  null|null|null|             null|             true|
#|     1|   2|   3|             null|            false|
#|     4|   5|   6|  {{9, 10}, 7, 8}|            false|
#|     4|   5|   6|{{9, null}, 7, 8}|            false|
#+------+----+----+-----------------+-----------------+

关于apache-spark - 如何在spark查询中不硬编码任何列名的情况下检查一行的所有列是否为空?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/67804603/

相关文章:

python - 派斯帕克 : casting string to float when reading a csv file

hadoop - 如何将存储在包含行的HDFS中的文本文件转换为Pyspark中的数据框?

python - PySpark 序列化 EOFError

apache-spark - 使用 Hbase 进行 Spark Streaming

r - Spark是否支持melt和dcast

scala - 使用 Spark 转换非常大的 JSON 文件的最快方法是什么?

java - Spark fat jar 在 YARN 上运行多个版本

scala - 在Apache Spark中删除空的DataFrame分区

python - 如何在 DataFrame 中将 Column 声明为分类特征以在 ml 中使用

apache-spark - 累积前几行中的数组(PySpark数据框)