python - 如何在 PySpark 中将列从字符串转换为数组

标签 python dataframe apache-spark pyspark apache-spark-sql

我有一个从继承的数据集转换而来的数据框,如下所示:

data = [("[]","2000","M",False),
            ("[{'username':'aabb','points':'200','active':'true'}, {'username':'bbaa22','points':'0','active':'false'}]","1999","F",True),
            ("[{'username':'topuser','points':'855','active':'false'}]","1974","M",True),
            ("[]","2005","F",False),
            ("[{'username':'myprimary','points':'1050','active':'true'}, {'username':'mylurk','points':'100','active':'true'}, {'username':'closedlurk','points':'50','active':'false'}]","1992","M",True)
            ]

columns=["user","dob_year","gender","member"]
df=spark.createDataFrame(data,columns)
+--------------------+--------+------+------+
|                user|dob_year|gender|member|
+--------------------+--------+------+------+
|                  []|    2000|     M| false|
|[{'username':'aab...|    1999|     F|  true|
|[{'username':'top...|    1974|     M|  true|
|                  []|    2005|     F| false|
|[{'username':'myp...|    1992|     M|  true|
+--------------------+--------+------+------+

我需要从 user 列中提取一些元素,并尝试使用 pyspark explode 函数。

from pyspark.sql.functions import explode

df2 = df.select(explode(df.user), df.dob_year)

当我尝试执行此操作时,遇到以下错误:

AnalysisException: cannot resolve 'explode(user)' due to data type mismatch: input to function explode should be array or map type, not string;

当我运行 df.printSchema() 时,我意识到用户列是字符串,而不是根据需要列出。

我还尝试通过创建 UDF 将列中的字符串转换为数组

import pyspark.sql.functions as f

df2 = df.withColumn("user", str2list_udf(f.col("user"))).withColumn("user",df.user.cast(ArrayType(StringType())))

当我这样做时,遇到以下错误:

AnalysisException: cannot resolve 'user' due to data type mismatch: cannot cast string to array;

如何将此列中的数据转换或转换为数组,以便可以利用爆炸函数并将各个键解析为自己的列(例如:为 usernamepointsactive 提供单独的列)?如果爆炸不是最好的方法,我还应该遵循其他路线吗?

如果有帮助,这里是我创建的 UDF:

def str2list(x):
  if x == '[]':
    return list()
  else:
    return list(x)

str2list_udf = udf(lambda x: str2list(x))

谢谢

最佳答案

使用 from_json 将字符串化数组转换为结构数组,并分解结果数组:

from pyspark.sql import functions as F
from pyspark.sql.types import StructField, ArrayType, StringType, StructType

user_schema = ArrayType(
    StructType([
        StructField("username", StringType(), True),
        StructField("points", StringType(), True),
        StructField("active", StringType(), True)
    ])
)

df1 = (df.withColumn("user", F.from_json("user", user_schema))
       .selectExpr("inline(user)", "dob_year", "gender", "member")
       )

df1.show()
#+----------+------+------+--------+------+------+
#|  username|points|active|dob_year|gender|member|
#+----------+------+------+--------+------+------+
#|      aabb|   200|  true|    1999|     F|  true|
#|    bbaa22|     0| false|    1999|     F|  true|
#|   topuser|   855| false|    1974|     M|  true|
#| myprimary|  1050|  true|    1992|     M|  true|
#|    mylurk|   100|  true|    1992|     M|  true|
#|closedlurk|    50| false|    1992|     M|  true|
#+----------+------+------+--------+------+------+

关于python - 如何在 PySpark 中将列从字符串转换为数组,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/71055297/

相关文章:

python - 修复 matplotlib 图

python - 从字符串中解析始发城市/目的地城市

python - 正则表达式多个相同模式/重复捕获无法正常工作,仅匹配第一个和最后一个

python - 如何使具有重复日期时间索引条目的数据框唯一?

amazon-web-services - 从 Pyspark 调用 AWS S3 存储桶时出错。 AWS 错误代码 : null, AWS 错误消息:错误请求

apache-spark - ApplicationMaster-在YARN中运行的不同类型的应用程序是否有所不同?

python - 向量化连续 numpy 计算

python - 在 pandas 中查找重复项并按日期使用非 Nan 值修改它们

python - 如何在 Python pandas DataFrame 中对列值进行切片

apache-spark - 在 pycharm 上使用 pyspark