python - 如果函数返回字典数组,如何构造与 UDF 一起使用的模式

标签 python dataframe pyspark

我正在尝试为下面提到的数据类型创建架构,它是与 udf 一起使用的字典列表,但我收到下面提到的错误。

 Unexpected tuple %r with StructType

 [{'cumulativeDefaultbalance': 0, 'loanId': 13131, 'cumulativeEndingBalance': 4877.9918745262694, 'cumulativeContractpaymentw': 263.67479214039736, 'month': 1, 'cumulativeInterestpayment': 141.66666666666666, 'cumulativePrincipalpayment': 122.00812547373067, 'cumulativeAdjbeginingbal': 5000, 'cumulativePrepaymentamt': 40.315417142065087}]

下面是我正在构建的架构对象

schema = StructType([
            StructField('cumulativeAdjbeginingbal', FloatType(), False),
            StructField('cumulativeEndingBalance', FloatType(), False),
            StructField('cumulativeContractpaymentw', FloatType(), False),
            StructField('cumulativeInterestpayment', FloatType(), False),
            StructField('cumulativePrincipalpayment', FloatType(), False),
            StructField('cumulativePrepaymentamt', FloatType(), False),
            StructField('cumulativeDefaultbalance', FloatType(), False)
        ])

谁能告诉我的代码失败的原因是什么?

最佳答案

据我所知,问题在于您定义的模式要求 rdd 元素采用列表形式而不是字典形式。因此,您可以在创建 DF 之前执行此操作(假设您的 dicts rdd 基本列表名为 df

df.map(lambda x: x.values)

或者,您可以执行以下操作并消除显式架构定义:

from pyspark.sql import Row
df.map(lambda x: Row(**x)).toDF()

编辑:实际上看起来该架构是针对 UDF 的返回类型的。我认为以下应该有效:

from pyspark.sql.types import ArrayType

schema = ArrayType(StructType([
        StructField('cumulativeAdjbeginingbal', FloatType(), False),
        StructField('cumulativeEndingBalance', FloatType(), False),
        StructField('cumulativeContractpaymentw', FloatType(), False),
        StructField('cumulativeInterestpayment', FloatType(), False),
        StructField('cumulativePrincipalpayment', FloatType(), False),
        StructField('cumulativePrepaymentamt', FloatType(), False),
        StructField('cumulativeDefaultbalance', FloatType(), False)
    ]), False)

关于python - 如果函数返回字典数组,如何构造与 UDF 一起使用的模式,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/46450379/

相关文章:

python - Python 中 Selenium Webdriver 测试失败时的自动截图

Python-将数据框列格式化为不同的数据类型

python - 如何在 Spark DataFrame 中添加常量列?

pyspark - 通过对多列进行分组来用平均值填充缺失值

Python Dropbox 应用程序,我应该如何处理应用程序 key 和应用程序 secret ?

python - 主机中任务的指派/分配

python - 从数据框中完全消除行索引及其行

r - 将数据帧从一个转换为另一个

dataframe - 如何在pyspark中连接两个数组

python - 如何将 32 位整数编码为字节数组?