尝试使用 for 循环在 UDF 中逐一发送列列表,但出现错误,即数据框找不到 col_name。目前在列表 list_col 中,我们有两列,但它可以更改。所以我想编写一个适用于每个列列表的代码。在这段代码中,我一次连接一行列,行值采用结构格式,即列表中的列表。对于每个空值,我都必须给空间。
list_col=['pcxreport','crosslinediscount']
def struct_generater12(row):
list3 = []
main_str = ''
if(row is None):
list3.append(' ')
else:
for i in row:
temp = ''
if(i is None):
temp+= ' '
else:
for j in i:
if (j is None):
temp+= ' '
else:
temp+= str(j)
list3.append(temp)
for k in list3:
main_str +=k
return main_str
A = udf(struct_generater12,returnType=StringType())
# z = addlinterestdetail_FDF1.withColumn("Concated_pcxreport",A(addlinterestdetail_FDF1.pcxreport))
for i in range(0,len(list_col)-1):
struct_col='Concate_'
struct_col+=list_col[i]
col_name=list_col[i]
z = addlinterestdetail_FDF1.withColumn(struct_col,A(addlinterestdetail_FDF1.col_name))
struct_col=''
z.show()
最佳答案
addlinterestdetail_FDF1.col_name
表示该列名为 "col_name"
,您没有访问变量 col_name
中包含的字符串。
在列上调用UDF
时,您可以
- 直接使用它的字符串名称:
A(col_name)
或者使用pyspark sql函数
col
:import pyspark.sql.functions as psf z = addlinterestdetail_FDF1.withColumn(struct_col,A(psf.col(col_name)))
您应该考虑使用 pyspark sql 函数进行连接,而不是编写 UDF。首先让我们创建一个具有嵌套结构的示例数据框:
import json
j = {'pcxreport':{'a': 'a', 'b': 'b'}, 'crosslinediscount':{'c': 'c', 'd': None, 'e': 'e'}}
jsonRDD = sc.parallelize([json.dumps(j)])
df = spark.read.json(jsonRDD)
df.printSchema()
df.show()
root
|-- crosslinediscount: struct (nullable = true)
| |-- c: string (nullable = true)
| |-- d: string (nullable = true)
| |-- e: string (nullable = true)
|-- pcxreport: struct (nullable = true)
| |-- a: string (nullable = true)
| |-- b: string (nullable = true)
+-----------------+---------+
|crosslinediscount|pcxreport|
+-----------------+---------+
| [c,null,e]| [a,b]|
+-----------------+---------+
我们将编写一个包含嵌套列名称的字典:
list_col=['pcxreport','crosslinediscount']
list_subcols = dict()
for c in list_col:
list_subcols[c] = df.select(c+'.*').columns
现在我们可以“扁平化”StructType
,将None
替换为' '
,然后连接:
import itertools
import pyspark.sql.functions as psf
df.select([c + '.*' for c in list_col])\
.na.fill({c:' ' for c in list(itertools.chain.from_iterable(list_subcols.values()))})\
.select([psf.concat(*sc).alias(c) for c, sc in list_subcols.items()])\
.show()
+---------+-----------------+
|pcxreport|crosslinediscount|
+---------+-----------------+
| ab| c e|
+---------+-----------------+
关于python-3.x - 派斯帕克 : Pass dynamic Column in UDF,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/47237206/