python-3.x - 使用pyspark从元组列表创建DataFrame

标签 python-3.x pyspark spark-dataframe

我正在使用使用simple-salesforce软件包从SFDC提取的数据。
我正在使用Python3编写脚本和Spark 1.5.2。

我创建了一个包含以下数据的rdd:

[('Id', 'a0w1a0000003xB1A'), ('PackSize', 1.0), ('Name', 'A')]
[('Id', 'a0w1a0000003xAAI'), ('PackSize', 1.0), ('Name', 'B')]
[('Id', 'a0w1a00000xB3AAI'), ('PackSize', 30.0), ('Name', 'C')]
...


此数据在名为v_rdd的RDD中

我的架构如下所示:

StructType(List(StructField(Id,StringType,true),StructField(PackSize,StringType,true),StructField(Name,StringType,true)))


我正在尝试根据此RDD创建DataFrame:

sqlDataFrame = sqlContext.createDataFrame(v_rdd, schema)


我打印我的DataFrame:

sqlDataFrame.printSchema()


并获得以下信息:

+--------------------+--------------------+--------------------+
|                  Id|  PackSize|                          Name|
+--------------------+--------------------+--------------------+
|[Ljava.lang.Objec...|[Ljava.lang.Objec...|[Ljava.lang.Objec...|
|[Ljava.lang.Objec...|[Ljava.lang.Objec...|[Ljava.lang.Objec...|
|[Ljava.lang.Objec...|[Ljava.lang.Objec...|[Ljava.lang.Objec...|


我期望看到这样的实际数据:

+------------------+------------------+--------------------+
|                Id|PackSize|                          Name|
+------------------+------------------+--------------------+
|a0w1a0000003xB1A  |               1.0|       A            |
|a0w1a0000003xAAI  |               1.0|       B            |
|a0w1a00000xB3AAI  |              30.0|       C            |


您能否帮我确定我在这里做错了什么。

我的Python脚本很长,我不确定人们浏览它会不会很方便,所以我只发布了遇到问题的部分。

在此先感谢一吨!

最佳答案

嘿,您下次可以提供一个有效的示例吗?那会容易些。

RDD的呈现方式基本上怪异于创建DataFrame。这是根据Spark文档创建DF的方式。

>>> l = [('Alice', 1)]
>>> sqlContext.createDataFrame(l).collect()
[Row(_1=u'Alice', _2=1)]
>>> sqlContext.createDataFrame(l, ['name', 'age']).collect()
[Row(name=u'Alice', age=1)]


因此,对于您的示例,您可以按照以下方式创建所需的输出:

# Your data at the moment
data = sc.parallelize([ 
[('Id', 'a0w1a0000003xB1A'), ('PackSize', 1.0), ('Name', 'A')],
[('Id', 'a0w1a0000003xAAI'), ('PackSize', 1.0), ('Name', 'B')],
[('Id', 'a0w1a00000xB3AAI'), ('PackSize', 30.0), ('Name', 'C')]
    ])
# Convert to tuple
data_converted = data.map(lambda x: (x[0][1], x[1][1], x[2][1]))

# Define schema
schema = StructType([
    StructField("Id", StringType(), True),
    StructField("Packsize", StringType(), True),
    StructField("Name", StringType(), True)
])

# Create dataframe
DF = sqlContext.createDataFrame(data_converted, schema)

# Output
DF.show()
+----------------+--------+----+
|              Id|Packsize|Name|
+----------------+--------+----+
|a0w1a0000003xB1A|     1.0|   A|
|a0w1a0000003xAAI|     1.0|   B|
|a0w1a00000xB3AAI|    30.0|   C|
+----------------+--------+----+


希望这可以帮助

关于python-3.x - 使用pyspark从元组列表创建DataFrame,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/35001229/

相关文章:

python-3.x - 在 Google Cloud Build 中运行 python 单元测试

python-3.x - 使用开始和结束列在同一 Pandas 数据框中有效合并重叠间隔

表达式中的 Python 'in' 关键字与 for 循环中的关键字

apache-spark - 如何从计算机中删除现有的 spark 环境和相关包?

apache-spark - 如何将数据框中的 org.apache.spark.mllib.linalg.Vector 保存到 cassandra

performance - Python 3.3 在大循环期间变慢

apache-spark - 从pyspark读取hdfs中的文件

python - 使用 PySpark 展平嵌套 json 响应结构的最有效方法是什么?

apache-spark - Spark 数据帧十进制精度

apache-spark - Pyspark - 如何进行不区分大小写的数据帧连接?