python - PySpark 无法将字典的 RDD 转换为 DataFrame。错误 : can not accept object in type <class 'pyspark.sql.types.Row' >

标签 python apache-spark pyspark apache-spark-sql

我目前使用的是 Spark 1.4.1,无法将带有嵌套字典的字典转换为 Spark DataFrame。我将嵌套的 dict 转换为 Row,但它似乎不接受我的模式。

这是重现我的错误的代码:

from pyspark.sql import Row, SQLContext, types as pst
sqlContext = SQLContext(sc)

example_dict = Row(**{"name": "Mike", "data": Row(**{"age": 10, "like": True})})

example_rdd = sc.parallelize([example_dict])

nested_fields = [pst.StructField("age", pst.IntegerType(), True), 
                 pst.StructField("like", pst.BooleanType(), True)]

schema = pst.StructType([
               pst.StructField("data", pst.StructType(nested_fields), True),
               pst.StructField("name", pst.StringType(), True)
])

df = sqlContext.createDataFrame(example_rdd, schema)

TypeError: StructType(List(StructField(age,IntegerType,true),StructField(like,BooleanType,true))) can not accept object in type <class 'pyspark.sql.types.Row'>

我不确定为什么会收到此错误。以下是对象 rddschema:

>>> example_rdd.first()
Row(data=Row(age=10, like=True), name='Mike')

>>> schema
StructType(List(StructField(data,StructType(List(StructField(age,IntegerType,true),StructField(like,BooleanType,true))),true),StructField(name,StringType,true)))

我不确定我是否遗漏了什么,但模式似乎与对象匹配。 Spark 1.4.1 不接受行内行有什么原因吗?

请注意:这在 Spark 2.0.2 中不是问题,但不幸的是我在使用 Spark 1.4.1 的共享资源上,所以我需要暂时找到解决方法:(。任何帮助将不胜感激,在此先感谢!

最佳答案

发生这种情况是因为 Row 在 Spark 1.4 中不被接受为 StructType。接受的类型是:

pst._acceptable_types[pst.StructType]
(tuple, list)

Spark 进行简单检查:

type(obj) not in _acceptable_types[_type]

这显然不适用于 Row 对象。正确的条件,相当于当前版本中发生的情况,将是:

isinstance(obj, _acceptable_types[_type])

如果你想使用嵌套列,你可以使用普通的 Python tuple:

Row(**{"name": "Mike", "data": (10, True)})

((10, True), "Mike")

关于python - PySpark 无法将字典的 RDD 转换为 DataFrame。错误 : can not accept object in type <class 'pyspark.sql.types.Row' >,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/40819434/

相关文章:

python - 无法使用带有复合行键(UTF8Type、DateType)的 Pycassa 插入 Cassandra 列族

java - Spark和MongoDB应用程序在Scala 2.10 maven构建错误

apache-spark - 了解 Spark 版本

PySpark 如何根据行值创建列

python - jdbc.SQLServerException : The "variant" data type is not supported

python - 按索引连接两个列表列表

python - Python 的 coerce() 是做什么用的?

python - 括号和引号被注入(inject)到 SQL 查询的返回结果中

hadoop - 如何从hdfs读取二进制文件?

scala - 如何在 IntelliJ IDEA 中为 Spark 应用程序设置日志记录级别?