如何在 pyspark 的查询中不硬编码任何列名的情况下检查一行的所有列是否为空?我有一个对象类型的列,包含 json 格式的字符串。如果列的所有 json 字段都为空,我想引发异常。目前,如果我执行columnname.jsonfield,那么我的代码会按预期抛出异常,但我想检查所有字段是否为空。我无法对所有 json 字段进行硬编码,因为我的一些表包含 200 多个 json 字段
例如: 列名是值 {"active":null,"id":null,"uuid":null}
Value.active 工作正常,但我需要类似于 value.* 的东西,它在 pyspark 的 where 子句中不起作用。
最佳答案
您可以展平数据框中的(可能是嵌套的)列,然后检查展平结构的每个元素是否为空:
from pyspark.sql import functions as F
from pyspark.sql import types as T
from functools import reduce
#create some testdata with nested structures
data = ['{"active":null,"id":null,"uuid":null}',
'{"active":1,"id":2,"uuid":3}',
'{"active":4,"id":5,"uuid":6, "x": {"y":7, "z":8, "a":{"b":9,"c":10}}}',
'{"active":4,"id":5,"uuid":6, "x": {"y":7, "z":8, "a":{"b":9,"c":null}}}']
df = spark.read.json(spark.sparkContext.parallelize(data))
def flatten(field, prefix=""):
if isinstance(field, T.StructType):
for f in field:
yield from flatten(f, prefix)
elif isinstance(field, T.StructField) and isinstance(field.dataType, T.StructType):
for f in field.dataType:
yield from flatten(f, prefix + field.name + ".")
else:
yield prefix + field.name
cols_are_null = [F.col(c).isNull() for c in flatten(df.schema)]
all_cols_are_null = reduce(lambda l,r: l & r, cols_are_null)
df.withColumn("all_cols_are_null", all_cols_are_null).show()
#+------+----+----+-----------------+-----------------+
#|active| id|uuid| x|all_cols_are_null|
#+------+----+----+-----------------+-----------------+
#| null|null|null| null| true|
#| 1| 2| 3| null| false|
#| 4| 5| 6| {{9, 10}, 7, 8}| false|
#| 4| 5| 6|{{9, null}, 7, 8}| false|
#+------+----+----+-----------------+-----------------+
关于apache-spark - 如何在spark查询中不硬编码任何列名的情况下检查一行的所有列是否为空?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/67804603/