apache-spark - 将包含 JSON 字符串的列拆分为每个包含字符串中的一个键值对的列

标签 apache-spark pyspark apache-spark-sql azure-databricks azure-eventhub

我有一个如下所示的数据框(名为“value”的列,其中包含 JSON 字符串)。我使用 Kafka API 将其发送到事件中心,然后我想从事件中心读取该数据并对其应用一些转换。数据以二进制格式接收,如 Kafka 文档中所述。

enter image description here

以下是 CSV 格式的几列:

value
"{""id"":""e52f247c-f46c-4021-bc62-e28e56db1ad8"",""latitude"":""34.5016064725731"",""longitude"":""123.43996453687777""}"
"{""id"":""32782100-9b59-49c7-9d56-bb4dfc368a86"",""latitude"":""49.938541626415144"",""longitude"":""111.88360885971986""}"
"{""id"":""a72a600f-2b99-4c41-a388-9a24c00545c0"",""latitude"":""4.988768300413497"",""longitude"":""-141.92727675177588""}"
"{""id"":""5a5f056a-cdfd-4957-8e84-4d5271253509"",""latitude"":""41.802942545247134"",""longitude"":""90.45164573613573""}"
"{""id"":""d00d0926-46eb-45dd-9e35-ab765804340d"",""latitude"":""70.60161063520081"",""longitude"":""20.566520665122482""}"
"{""id"":""dda14397-6922-4bb6-9be3-a1546f08169d"",""latitude"":""68.400462882435"",""longitude"":""135.7167027587489""}"
"{""id"":""c7f13b8a-3468-4bc6-9db4-e0b1b34bf9ea"",""latitude"":""26.04757722355835"",""longitude"":""175.20227554031783""}"
"{""id"":""97f8f1cf-3aa0-49bb-b3d5-05b736e0c883"",""latitude"":""35.52624182094499"",""longitude"":""-164.18066699972852""}"
"{""id"":""6bed49bc-ee93-4ed9-893f-4f51c7b7f703"",""latitude"":""-24.319581484353847"",""longitude"":""85.27338980948076""}"

我想要做的是应用转换并创建一个包含 3 列的数据框,一列包含 id,一列包含纬度,一列包含经度。 这是我尝试过的,但结果不是我所期望的:

from pyspark.sql.types import StructType
from pyspark.sql.functions import from_json
from pyspark.sql import functions as F

# df is the data frame received from Kafka
location_schema = StructType().add("id", "string").add("latitude", "float").add("longitude", "float")
string_df = df.selectExpr("CAST(value AS STRING)").withColumn("value", from_json(F.col("value"), location_schema))
string_df.printSchema()
string_df.show()

这就是结果:

enter image description here

它创建了一个“值”列,并将结构作为值。知道如何获得 3 个不同的列吗?正如我所描述的那样?

最佳答案

您的 df:

df = spark.createDataFrame(
    [
(1, '{"id":"e52f247c-f46c-4021-bc62-e28e56db1ad8","latitude":"34.5016064725731","longitude":"123.43996453687777"}'),
(2, '{"id":"32782100-9b59-49c7-9d56-bb4dfc368a86","latitude":"49.938541626415144","longitude":"111.88360885971986"}'),
(3, '{"id":"a72a600f-2b99-4c41-a388-9a24c00545c0","latitude":"4.988768300413497","longitude":"-141.92727675177588"}'),
(4, '{"id":"5a5f056a-cdfd-4957-8e84-4d5271253509","latitude":"41.802942545247134","longitude":"90.45164573613573"}'),
(5, '{"id":"d00d0926-46eb-45dd-9e35-ab765804340d","latitude":"70.60161063520081","longitude":"20.566520665122482"}'),
(6, '{"id":"dda14397-6922-4bb6-9be3-a1546f08169d","latitude":"68.400462882435","longitude":"135.7167027587489"}'),
(7, '{"id":"c7f13b8a-3468-4bc6-9db4-e0b1b34bf9ea","latitude":"26.04757722355835","longitude":"175.20227554031783"}'),
(8, '{"id":"97f8f1cf-3aa0-49bb-b3d5-05b736e0c883","latitude":"35.52624182094499","longitude":"-164.18066699972852"}'),
(9, '{"id":"6bed49bc-ee93-4ed9-893f-4f51c7b7f703","latitude":"-24.319581484353847","longitude":"85.27338980948076"}')
    ],
    ['id', 'value']
).drop('id')

+--------------------------------------------------------------------------------------------------------------+
|value                                                                                                         |
+--------------------------------------------------------------------------------------------------------------+
|{"id":"e52f247c-f46c-4021-bc62-e28e56db1ad8","latitude":"34.5016064725731","longitude":"123.43996453687777"}  |
|{"id":"32782100-9b59-49c7-9d56-bb4dfc368a86","latitude":"49.938541626415144","longitude":"111.88360885971986"}|
|{"id":"a72a600f-2b99-4c41-a388-9a24c00545c0","latitude":"4.988768300413497","longitude":"-141.92727675177588"}|
|{"id":"5a5f056a-cdfd-4957-8e84-4d5271253509","latitude":"41.802942545247134","longitude":"90.45164573613573"} |
|{"id":"d00d0926-46eb-45dd-9e35-ab765804340d","latitude":"70.60161063520081","longitude":"20.566520665122482"} |
|{"id":"dda14397-6922-4bb6-9be3-a1546f08169d","latitude":"68.400462882435","longitude":"135.7167027587489"}    |
|{"id":"c7f13b8a-3468-4bc6-9db4-e0b1b34bf9ea","latitude":"26.04757722355835","longitude":"175.20227554031783"} |
|{"id":"97f8f1cf-3aa0-49bb-b3d5-05b736e0c883","latitude":"35.52624182094499","longitude":"-164.18066699972852"}|
|{"id":"6bed49bc-ee93-4ed9-893f-4f51c7b7f703","latitude":"-24.319581484353847","longitude":"85.27338980948076"}|
+--------------------------------------------------------------------------------------------------------------+

然后:

from pyspark.sql import functions as F
from pyspark.sql.types import *

json_schema = StructType([
                          StructField("id", StringType(), True),
                          StructField("latitude", FloatType(), True),
                          StructField("longitude", FloatType(), True)
                         ])

df\
    .withColumn('json', F.from_json(F.col('value'), json_schema))\
    .select(F.col('json').getItem('id').alias('id'),
            F.col('json').getItem('latitude').alias('latitude'),
            F.col('json').getItem('longitude').alias('longitude')
           )\
    .show(truncate=False)

+------------------------------------+-------------------+-------------------+
|id                                  |latitude           |longitude          |
+------------------------------------+-------------------+-------------------+
|e52f247c-f46c-4021-bc62-e28e56db1ad8|34.5016064725731   |123.43996453687777 |
|32782100-9b59-49c7-9d56-bb4dfc368a86|49.938541626415144 |111.88360885971986 |
|a72a600f-2b99-4c41-a388-9a24c00545c0|4.988768300413497  |-141.92727675177588|
|5a5f056a-cdfd-4957-8e84-4d5271253509|41.802942545247134 |90.45164573613573  |
|d00d0926-46eb-45dd-9e35-ab765804340d|70.60161063520081  |20.566520665122482 |
|dda14397-6922-4bb6-9be3-a1546f08169d|68.400462882435    |135.7167027587489  |
|c7f13b8a-3468-4bc6-9db4-e0b1b34bf9ea|26.04757722355835  |175.20227554031783 |
|97f8f1cf-3aa0-49bb-b3d5-05b736e0c883|35.52624182094499  |-164.18066699972852|
|6bed49bc-ee93-4ed9-893f-4f51c7b7f703|-24.319581484353847|85.27338980948076  |
+------------------------------------+-------------------+-------------------+

关于apache-spark - 将包含 JSON 字符串的列拆分为每个包含字符串中的一个键值对的列,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/72815754/

相关文章:

scala - Spark /SQL :spark can't resolve symbol toDF

hadoop - 如何获得 Spark 以在Windows上访问本地HDFS?

java - 自定义 RDD 的分区提示

python - 替换数据框中的重复列

apache-spark - 使用 Spark 将 CSV 转换为 ORC

apache-spark - 在 Spark 2.4 中,Spark JDBC 是否允许将内置函数指定为 partitionColumn?

apache-spark - 如何控制Spark作业在写入时创建的输出部分文件的数量?

pyspark - 如何迭代 Spark 数据帧的一列并逐个访问其中的值?

scala - PySpark 到 Scala : UDF with StructType, GenericRowWithSchema 无法转换为 org.apache.spark.sql.Column

scala - spark sql中sc.broadcast和broadcast函数的区别