我有一个如下所示的数据框(名为“value”的列,其中包含 JSON 字符串)。我使用 Kafka API 将其发送到事件中心,然后我想从事件中心读取该数据并对其应用一些转换。数据以二进制格式接收,如 Kafka 文档中所述。
以下是 CSV 格式的几列:
value
"{""id"":""e52f247c-f46c-4021-bc62-e28e56db1ad8"",""latitude"":""34.5016064725731"",""longitude"":""123.43996453687777""}"
"{""id"":""32782100-9b59-49c7-9d56-bb4dfc368a86"",""latitude"":""49.938541626415144"",""longitude"":""111.88360885971986""}"
"{""id"":""a72a600f-2b99-4c41-a388-9a24c00545c0"",""latitude"":""4.988768300413497"",""longitude"":""-141.92727675177588""}"
"{""id"":""5a5f056a-cdfd-4957-8e84-4d5271253509"",""latitude"":""41.802942545247134"",""longitude"":""90.45164573613573""}"
"{""id"":""d00d0926-46eb-45dd-9e35-ab765804340d"",""latitude"":""70.60161063520081"",""longitude"":""20.566520665122482""}"
"{""id"":""dda14397-6922-4bb6-9be3-a1546f08169d"",""latitude"":""68.400462882435"",""longitude"":""135.7167027587489""}"
"{""id"":""c7f13b8a-3468-4bc6-9db4-e0b1b34bf9ea"",""latitude"":""26.04757722355835"",""longitude"":""175.20227554031783""}"
"{""id"":""97f8f1cf-3aa0-49bb-b3d5-05b736e0c883"",""latitude"":""35.52624182094499"",""longitude"":""-164.18066699972852""}"
"{""id"":""6bed49bc-ee93-4ed9-893f-4f51c7b7f703"",""latitude"":""-24.319581484353847"",""longitude"":""85.27338980948076""}"
我想要做的是应用转换并创建一个包含 3 列的数据框,一列包含 id,一列包含纬度,一列包含经度。 这是我尝试过的,但结果不是我所期望的:
from pyspark.sql.types import StructType
from pyspark.sql.functions import from_json
from pyspark.sql import functions as F
# df is the data frame received from Kafka
location_schema = StructType().add("id", "string").add("latitude", "float").add("longitude", "float")
string_df = df.selectExpr("CAST(value AS STRING)").withColumn("value", from_json(F.col("value"), location_schema))
string_df.printSchema()
string_df.show()
这就是结果:
它创建了一个“值”列,并将结构作为值。知道如何获得 3 个不同的列吗?正如我所描述的那样?
最佳答案
您的 df:
df = spark.createDataFrame(
[
(1, '{"id":"e52f247c-f46c-4021-bc62-e28e56db1ad8","latitude":"34.5016064725731","longitude":"123.43996453687777"}'),
(2, '{"id":"32782100-9b59-49c7-9d56-bb4dfc368a86","latitude":"49.938541626415144","longitude":"111.88360885971986"}'),
(3, '{"id":"a72a600f-2b99-4c41-a388-9a24c00545c0","latitude":"4.988768300413497","longitude":"-141.92727675177588"}'),
(4, '{"id":"5a5f056a-cdfd-4957-8e84-4d5271253509","latitude":"41.802942545247134","longitude":"90.45164573613573"}'),
(5, '{"id":"d00d0926-46eb-45dd-9e35-ab765804340d","latitude":"70.60161063520081","longitude":"20.566520665122482"}'),
(6, '{"id":"dda14397-6922-4bb6-9be3-a1546f08169d","latitude":"68.400462882435","longitude":"135.7167027587489"}'),
(7, '{"id":"c7f13b8a-3468-4bc6-9db4-e0b1b34bf9ea","latitude":"26.04757722355835","longitude":"175.20227554031783"}'),
(8, '{"id":"97f8f1cf-3aa0-49bb-b3d5-05b736e0c883","latitude":"35.52624182094499","longitude":"-164.18066699972852"}'),
(9, '{"id":"6bed49bc-ee93-4ed9-893f-4f51c7b7f703","latitude":"-24.319581484353847","longitude":"85.27338980948076"}')
],
['id', 'value']
).drop('id')
+--------------------------------------------------------------------------------------------------------------+
|value |
+--------------------------------------------------------------------------------------------------------------+
|{"id":"e52f247c-f46c-4021-bc62-e28e56db1ad8","latitude":"34.5016064725731","longitude":"123.43996453687777"} |
|{"id":"32782100-9b59-49c7-9d56-bb4dfc368a86","latitude":"49.938541626415144","longitude":"111.88360885971986"}|
|{"id":"a72a600f-2b99-4c41-a388-9a24c00545c0","latitude":"4.988768300413497","longitude":"-141.92727675177588"}|
|{"id":"5a5f056a-cdfd-4957-8e84-4d5271253509","latitude":"41.802942545247134","longitude":"90.45164573613573"} |
|{"id":"d00d0926-46eb-45dd-9e35-ab765804340d","latitude":"70.60161063520081","longitude":"20.566520665122482"} |
|{"id":"dda14397-6922-4bb6-9be3-a1546f08169d","latitude":"68.400462882435","longitude":"135.7167027587489"} |
|{"id":"c7f13b8a-3468-4bc6-9db4-e0b1b34bf9ea","latitude":"26.04757722355835","longitude":"175.20227554031783"} |
|{"id":"97f8f1cf-3aa0-49bb-b3d5-05b736e0c883","latitude":"35.52624182094499","longitude":"-164.18066699972852"}|
|{"id":"6bed49bc-ee93-4ed9-893f-4f51c7b7f703","latitude":"-24.319581484353847","longitude":"85.27338980948076"}|
+--------------------------------------------------------------------------------------------------------------+
然后:
from pyspark.sql import functions as F
from pyspark.sql.types import *
json_schema = StructType([
StructField("id", StringType(), True),
StructField("latitude", FloatType(), True),
StructField("longitude", FloatType(), True)
])
df\
.withColumn('json', F.from_json(F.col('value'), json_schema))\
.select(F.col('json').getItem('id').alias('id'),
F.col('json').getItem('latitude').alias('latitude'),
F.col('json').getItem('longitude').alias('longitude')
)\
.show(truncate=False)
+------------------------------------+-------------------+-------------------+
|id |latitude |longitude |
+------------------------------------+-------------------+-------------------+
|e52f247c-f46c-4021-bc62-e28e56db1ad8|34.5016064725731 |123.43996453687777 |
|32782100-9b59-49c7-9d56-bb4dfc368a86|49.938541626415144 |111.88360885971986 |
|a72a600f-2b99-4c41-a388-9a24c00545c0|4.988768300413497 |-141.92727675177588|
|5a5f056a-cdfd-4957-8e84-4d5271253509|41.802942545247134 |90.45164573613573 |
|d00d0926-46eb-45dd-9e35-ab765804340d|70.60161063520081 |20.566520665122482 |
|dda14397-6922-4bb6-9be3-a1546f08169d|68.400462882435 |135.7167027587489 |
|c7f13b8a-3468-4bc6-9db4-e0b1b34bf9ea|26.04757722355835 |175.20227554031783 |
|97f8f1cf-3aa0-49bb-b3d5-05b736e0c883|35.52624182094499 |-164.18066699972852|
|6bed49bc-ee93-4ed9-893f-4f51c7b7f703|-24.319581484353847|85.27338980948076 |
+------------------------------------+-------------------+-------------------+
关于apache-spark - 将包含 JSON 字符串的列拆分为每个包含字符串中的一个键值对的列,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/72815754/