python - 使用 PySpark 将 JSON 文件读取为 Pyspark Dataframe?

标签 python apache-spark pyspark apache-spark-sql

如何使用 PySpark 读取以下 JSON 结构以激发数据帧?

我的 JSON 结构

{"results":[{"a":1,"b":2,"c":"name"},{"a":2,"b":5,"c":"foo"}]}

我试过:

df = spark.read.json('simple.json');

我希望输出 a、b、c 作为列,值作为相应的行。

谢谢。

最佳答案

Json 字符串变量

如果你有 json 字符串作为变量 那么你可以这样做

simple_json = '{"results":[{"a":1,"b":2,"c":"name"},{"a":2,"b":5,"c":"foo"}]}'
rddjson = sc.parallelize([simple_json])
df = sqlContext.read.json(rddjson)

from pyspark.sql import functions as F
df.select(F.explode(df.results).alias('results')).select('results.*').show(truncate=False)

这会给你

+---+---+----+
|a  |b  |c   |
+---+---+----+
|1  |2  |name|
|2  |5  |foo |
+---+---+----+

Json 字符串作为文件中的单独行(sparkContext 和 sqlContext)

如果你有 json 字符串作为文件中的单独行 那么你可以使用 sparkContext 将其读入 rdd[string] 如上所述,其余过程相同如上

rddjson = sc.textFile('/home/anahcolus/IdeaProjects/pythonSpark/test.csv')
df = sqlContext.read.json(rddjson)
df.select(F.explode(df['results']).alias('results')).select('results.*').show(truncate=False)

Json 字符串作为文件中的单独行(仅限 sqlContext)

如果你有 json 字符串作为文件中的单独行 那么你可以只使用 sqlContext。但是这个过程很复杂,因为您必须为其创建架构

df = sqlContext.read.text('path to the file')

from pyspark.sql import functions as F
from pyspark.sql import types as T
df = df.select(F.from_json(df.value, T.StructType([T.StructField('results', T.ArrayType(T.StructType([T.StructField('a', T.IntegerType()), T.StructField('b', T.IntegerType()), T.StructField('c', T.StringType())])))])).alias('results'))
df.select(F.explode(df['results.results']).alias('results')).select('results.*').show(truncate=False)

应该给你和上面一样的结果

希望回答对你有帮助

关于python - 使用 PySpark 将 JSON 文件读取为 Pyspark Dataframe?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/49399245/

相关文章:

apache-spark - Parquet 文件大小,消防软管与 Spark

scala - 在 Spark 数据集中滚动你自己的 reduceByKey

pandas - java.lang.NoSuchMethodError : com. google.flatbuffers.FlatBufferBuilder.createString(Ljava/lang/CharSequence;)我

Elasticsearch -pyspark : not getting specific fields from document (getting all fields) even after specifying with spark

Python:最快的 list.index() 操作使用 'is' ,而不是 '==' (即通过引用)

python - Tensorflow Deeplab 图像颜色图去除混淆

apache-spark - pyspark。数据框中的 zip 数组

python - 如何将 Pyspark Dataframe header 设置为另一行?

python - 如何在 seaborn 中检索错误栏

python - 如何在 Python 中使用排序键函数调用?