python-3.x - 派斯帕克 : Pass dynamic Column in UDF

标签 python-3.x hadoop apache-spark dataframe pyspark

尝试使用 for 循环在 UDF 中逐一发送列列表,但出现错误,即数据框找不到 col_name。目前在列表 list_col 中,我们有两列,但它可以更改。所以我想编写一个适用于每个列列表的代码。在这段代码中,我一次连接一行列,行值采用结构格式,即列表中的列表。对于每个空值,我都必须给空间。

    list_col=['pcxreport','crosslinediscount']
    def struct_generater12(row):
    list3 = []
    main_str = ''
    if(row is None):
        list3.append(' ')
    else:
        for i in row:
            temp = ''
            if(i is None):
                temp+= ' '
            else:
                for j in i:
                    if (j is None):
                        temp+= ' '
                    else:
                        temp+= str(j)
            list3.append(temp)
    for k in list3:
        main_str +=k
    return main_str


    A = udf(struct_generater12,returnType=StringType())
    # z = addlinterestdetail_FDF1.withColumn("Concated_pcxreport",A(addlinterestdetail_FDF1.pcxreport))
    for i in range(0,len(list_col)-1):
        struct_col='Concate_'
        struct_col+=list_col[i]
        col_name=list_col[i]
        z = addlinterestdetail_FDF1.withColumn(struct_col,A(addlinterestdetail_FDF1.col_name))
        struct_col=''

    z.show()

最佳答案

addlinterestdetail_FDF1.col_name 表示该列名为 "col_name",您没有访问变量 col_name 中包含的字符串。

在列上调用UDF时,您可以

  • 直接使用它的字符串名称:A(col_name)
  • 或者使用pyspark sql函数col:

    import pyspark.sql.functions as psf
    z = addlinterestdetail_FDF1.withColumn(struct_col,A(psf.col(col_name)))
    

您应该考虑使用 pyspark sql 函数进行连接,而不是编写 UDF。首先让我们创建一个具有嵌套结构的示例数据框:

import json
j = {'pcxreport':{'a': 'a', 'b': 'b'}, 'crosslinediscount':{'c': 'c', 'd': None, 'e': 'e'}}
jsonRDD = sc.parallelize([json.dumps(j)])
df = spark.read.json(jsonRDD)
df.printSchema()
df.show()

    root
     |-- crosslinediscount: struct (nullable = true)
     |    |-- c: string (nullable = true)
     |    |-- d: string (nullable = true)
     |    |-- e: string (nullable = true)
     |-- pcxreport: struct (nullable = true)
     |    |-- a: string (nullable = true)
     |    |-- b: string (nullable = true)

    +-----------------+---------+
    |crosslinediscount|pcxreport|
    +-----------------+---------+
    |       [c,null,e]|    [a,b]|
    +-----------------+---------+

我们将编写一个包含嵌套列名称的字典:

list_col=['pcxreport','crosslinediscount']
list_subcols = dict()
for c in list_col:
    list_subcols[c] = df.select(c+'.*').columns

现在我们可以“扁平化”StructType,将None替换为' ',然后连接:

import itertools
import pyspark.sql.functions as psf
df.select([c + '.*' for c in list_col])\
    .na.fill({c:' ' for c in list(itertools.chain.from_iterable(list_subcols.values()))})\
    .select([psf.concat(*sc).alias(c) for c, sc in list_subcols.items()])\
    .show()

    +---------+-----------------+
    |pcxreport|crosslinediscount|
    +---------+-----------------+
    |       ab|              c e|
    +---------+-----------------+

关于python-3.x - 派斯帕克 : Pass dynamic Column in UDF,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/47237206/

相关文章:

hadoop - 如何将多个文件加载到配置单元中的表中?

scala - 如何在 groupBy 之后聚合 map 列?

R dplyr 根据给定列的数值过滤行

python - 子类化 float 以强制 python 中的定点打印精度

python - Pi 计算器输出问题

python - 网络故障”从 chrome 和 Edge 浏览器下载 PDF 但它在 IE 和 firefox 中工作

Python try except 在 sql server 中删除或创建表

hadoop - 如何为从 Amazon EMR 应用程序到 S3 的每个上传请求设置 User-Agent(前缀)

python - 如何使用python将模型存储到hdfs

apache-spark - spark.python.worker.memory 是什么?