python - 在pyspark中解析一行中的多个json

标签 python json apache-spark pyspark

我有一个 json 行,如下所示:

{"test":"valid2","workflowId":79370,"email":"<a href="https://stackoverflow.com/cdn-cgi/l/email-protection" class="__cf_email__" data-cfemail="680c59280d10090518040d460b0705" rel="noreferrer noopener nofollow">[email protected]</a>"}{"email":"<a href="https://stackoverflow.com/cdn-cgi/l/email-protection" class="__cf_email__" data-cfemail="99fda8d9fce1f8f4e9f5fcb7faf6f4" rel="noreferrer noopener nofollow">[email protected]</a>","eventName":"emailOpen","dataFields":{"campaignId":1125010,"ip":"100.100.200.243","userAgentDevice":"Gmail","messageId":"be4e071c11594bb0b4ee3c444fd08b99","emailId":"<a href="https://stackoverflow.com/cdn-cgi/l/email-protection" class="__cf_email__" data-cfemail="593d68193c21383429353c773a3634" rel="noreferrer noopener nofollow">[email protected]</a>","userAgent":"Mozilla/5.0 (Windows NT 5.1; rv:11.0) Gecko Firefox/11.0 (via ggpht.com GoogleImageProxy)","workflowName":"DWH TEST 06042020 WITH CALL","locale":null,"templateId":1576122,"emailSubject":"DWH TEST","labels":[],"createdAt":"2020-04-06 15:06:16 +00:00","templateName":"DWH TEST","messageTypeId":27043,"experimentId":79413,"campaignName":"DWH Test Automation","workflowId":79370,"email":"<a href="https://stackoverflow.com/cdn-cgi/l/email-protection" class="__cf_email__" data-cfemail="0d693c4d68756c607d6168236e6260" rel="noreferrer noopener nofollow">[email protected]</a>","channelId":24365}}{"email":"<a href="https://stackoverflow.com/cdn-cgi/l/email-protection" class="__cf_email__" data-cfemail="88ececb9c8edf0e9e5f8e4eda6ebe7e5" rel="noreferrer noopener nofollow">[email protected]</a>","eventName":"emailOpen","dataFields":{"campaignId":1100,"ip":"50.100.200.243","userAgentDevice":"Gmail","messageId":"zz4e071c11594bb0b4ee3c444fd08b99","emailId":"<a href="https://stackoverflow.com/cdn-cgi/l/email-protection" class="__cf_email__" data-cfemail="187c7c29587d60797568747d367b7775" rel="noreferrer noopener nofollow">[email protected]</a>","userAgent":"Mozilla/5.0 (Windows NT 5.1; rv:11.0) Gecko Firefox/11.0 (via ggpht.com GoogleImageProxy)","workflowName":"TEST","locale":null,"templateId":1576122,"emailSubject":"TEST","labels":"Cambbridge test","createdAt":"2020-04-10 15:06:16 +00:00","templateName":"TEST","messageTypeId":27043,"experimentId":89413,"campaignName":"Cambridge Test","workflowId":18370,"email":"<a href="https://stackoverflow.com/cdn-cgi/l/email-protection" class="__cf_email__" data-cfemail="3b5f5f0a7b5e435a564b575e15585456" rel="noreferrer noopener nofollow">[email protected]</a>","channelId":1111}}{"email":"<a href="https://stackoverflow.com/cdn-cgi/l/email-protection" class="__cf_email__" data-cfemail="70141441301508111d001c155e131f1d" rel="noreferrer noopener nofollow">[email protected]</a>","eventName":"emailClick","dataFields":{"campaignId":1100,"ip":"50.100.200.243","userAgentDevice":"Gmail","messageId":"zzee071c11594bb0b4ee3c444fd08b99","emailId":"<a href="https://stackoverflow.com/cdn-cgi/l/email-protection" class="__cf_email__" data-cfemail="f99d9dc8b99c81989489959cd79a9694" rel="noreferrer noopener nofollow">[email protected]</a>","userAgent":"Mozilla/5.0 (Windows NT 5.1; rv:11.0) Gecko Firefox/11.0 (via ggpht.com GoogleImageProxy)","workflowName":"TEST","locale":null,"templateId":1576122,"emailSubject":"TEST","labels":"Cambbridge test","createdAt":"2020-04-10 15:08:16 +00:00","templateName":"TEST","messageTypeId":27043,"experimentId":89413,"campaignName":"Cambridge Test","workflowId":18370,"email":"<a href="https://stackoverflow.com/cdn-cgi/l/email-protection" class="__cf_email__" data-cfemail="adc9c99cedc8d5ccc0ddc1c883cec2c0" rel="noreferrer noopener nofollow">[email protected]</a>","channelId":1111}}{"test":"valid2","workflowId":79370,"email":"<a href="https://stackoverflow.com/cdn-cgi/l/email-protection" class="__cf_email__" data-cfemail="0f6b3e4f6a776e627f636a216c6062" rel="noreferrer noopener nofollow">[email protected]</a>"}{"email":"<a href="https://stackoverflow.com/cdn-cgi/l/email-protection" class="__cf_email__" data-cfemail="02663342677a636f726e672c616d6f" rel="noreferrer noopener nofollow">[email protected]</a>","eventName":"emailOpen","dataFields":{"campaignId":1125010,"ip":"100.100.200.243","userAgentDevice":"Gmail","messageId":"be4e071c11594bb0b4ee3c444fd08b99","emailId":"<a href="https://stackoverflow.com/cdn-cgi/l/email-protection" class="__cf_email__" data-cfemail="a9cd98e9ccd1c8c4d9c5cc87cac6c4" rel="noreferrer noopener nofollow">[email protected]</a>","userAgent":"Mozilla/5.0 (Windows NT 5.1; rv:11.0) Gecko Firefox/11.0 (via ggpht.com GoogleImageProxy)","workflowName":"DWH TEST 06042020 WITH CALL","locale":null,"templateId":1576122,"emailSubject":"DWH TEST","labels":[],"createdAt":"2020-04-06 15:06:16 +00:00","templateName":"DWH TEST","messageTypeId":27043,"experimentId":79413,"campaignName":"DWH Test Automation","workflowId":79370,"email":"<a href="https://stackoverflow.com/cdn-cgi/l/email-protection" class="__cf_email__" data-cfemail="91f5a0d1f4e9f0fce1fdf4bff2fefc" rel="noreferrer noopener nofollow">[email protected]</a>","channelId":24365}}

正如你所看到的,一行中有多个 json。 我需要将额外的 json json 对象 "{"test":"valid2","workflowId":79370,"email":"[email protected] "}"与后面跟随的所有/任何事件 json 关联起来,只要有工作流程Id 和电子邮件额外的 json 与事件的工作流 ID 和电子邮件相匹配。

在一个单独的事件中可以有多个此类额外的 json。 我不知道如何使用 python 和 pyspark 的组合来准备这样的文件。 使用 pyspark 是强制性的。 我尝试过:

df = sql_context.read.json('test.json')
df.show() 

但输出只是额外的 json :

+--------------+------+----------+
|         email|  test|workflowId|
+--------------+------+----------+
|<a href="https://stackoverflow.com/cdn-cgi/l/email-protection" class="__cf_email__" data-cfemail="402371002538212d302c256e232f2d" rel="noreferrer noopener nofollow">[email protected]</a>|valid1|     79370|
+--------------+------+----------+

我希望输出看起来像:

        id                              email           event     workflow_id   custom  createdatdate   createdattime
0   be4e071c11594bb0b4ee3c444fd08b99    <a href="https://stackoverflow.com/cdn-cgi/l/email-protection" class="__cf_email__" data-cfemail="dabeeb9abfa2bbb7aab6bff4b9b5b7" rel="noreferrer noopener nofollow">[email protected]</a>  emailOpen   79370       valid2  2020414         154248
1   be4e071c11594bb0b4ee3c444fd08b99    <a href="https://stackoverflow.com/cdn-cgi/l/email-protection" class="__cf_email__" data-cfemail="f397c2b3968b929e839f96dd909c9e" rel="noreferrer noopener nofollow">[email protected]</a>  emailOpen   79370       valid2  2020414         154248

任何人都可以指导我如何处理这样的文件并使用 pyspark 获得结果 df

最佳答案

由于这是格式错误的 JSON,我建议您运行修复该文件的预处理步骤。这可以使用 jq 命令行实用程序轻松完成。请参阅here .

-c 标志用于紧凑输出,并且将导致以换行符分隔的 JSON,而不是 pretty-print 。

jq -c . test.json > test_repaired.json

然后您可以使用 Spark 读取该文件,如下所示:

>>> spark \
...     .read \
...     .json('test_repaired.json') \
...     .show()
+--------------------+---------------+----------+------+----------+
|          dataFields|          email| eventName|  test|workflowId|
+--------------------+---------------+----------+------+----------+
|                null| <a href="https://stackoverflow.com/cdn-cgi/l/email-protection" class="__cf_email__" data-cfemail="6c085d2c09140d011c0009420f0301" rel="noreferrer noopener nofollow">[email protected]</a>|      null|valid2|     79370|
|{1125010, DWH Tes...| <a href="https://stackoverflow.com/cdn-cgi/l/email-protection" class="__cf_email__" data-cfemail="593d68193c21383429353c773a3634" rel="noreferrer noopener nofollow">[email protected]</a>| emailOpen|  null|      null|
|{1100, Cambridge ...|<a href="https://stackoverflow.com/cdn-cgi/l/email-protection" class="__cf_email__" data-cfemail="593d3d68193c21383429353c773a3634" rel="noreferrer noopener nofollow">[email protected]</a>| emailOpen|  null|      null|
|{1100, Cambridge ...|<a href="https://stackoverflow.com/cdn-cgi/l/email-protection" class="__cf_email__" data-cfemail="492d2d78092c31282439252c672a2624" rel="noreferrer noopener nofollow">[email protected]</a>|emailClick|  null|      null|
|                null| <a href="https://stackoverflow.com/cdn-cgi/l/email-protection" class="__cf_email__" data-cfemail="bcd88dfcd9c4ddd1ccd0d992dfd3d1" rel="noreferrer noopener nofollow">[email protected]</a>|      null|valid2|     79370|
|{1125010, DWH Tes...| <a href="https://stackoverflow.com/cdn-cgi/l/email-protection" class="__cf_email__" data-cfemail="a6c297e6c3dec7cbd6cac388c5c9cb" rel="noreferrer noopener nofollow">[email protected]</a>| emailOpen|  null|      null|
+--------------------+---------------+----------+------+----------+

关于python - 在pyspark中解析一行中的多个json,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/69351498/

相关文章:

python - 如何在 django admin 中自定义多对多内联模型

python通过偏移轮廓/缩小多边形来分离圆形粒子

Python Kafka 客户端 - 没有错误但无法正常工作

javascript - JSON.Stringify |将结果输出到屏幕上,并带有引号标签 ''

java - 使用 Lucene 建立索引时如何将 JSON 对象视为单独的文档

apache-spark - Spark 是否受益于持久表中的 `sortBy`?

用于递归搜索 FTP 特定文件名且超过 24 小时的 python 脚本

javascript - 使用不同的键迭代 Json

java - OrcRelation不可分配给HadoopFsRelation

scala - 如何创建仅给出开始和结束以及步骤数的非线性数字序列