我有一个 json 行,如下所示:
{"test":"valid2","workflowId":79370,"email":"<a href="https://stackoverflow.com/cdn-cgi/l/email-protection" class="__cf_email__" data-cfemail="680c59280d10090518040d460b0705" rel="noreferrer noopener nofollow">[email protected]</a>"}{"email":"<a href="https://stackoverflow.com/cdn-cgi/l/email-protection" class="__cf_email__" data-cfemail="99fda8d9fce1f8f4e9f5fcb7faf6f4" rel="noreferrer noopener nofollow">[email protected]</a>","eventName":"emailOpen","dataFields":{"campaignId":1125010,"ip":"100.100.200.243","userAgentDevice":"Gmail","messageId":"be4e071c11594bb0b4ee3c444fd08b99","emailId":"<a href="https://stackoverflow.com/cdn-cgi/l/email-protection" class="__cf_email__" data-cfemail="593d68193c21383429353c773a3634" rel="noreferrer noopener nofollow">[email protected]</a>","userAgent":"Mozilla/5.0 (Windows NT 5.1; rv:11.0) Gecko Firefox/11.0 (via ggpht.com GoogleImageProxy)","workflowName":"DWH TEST 06042020 WITH CALL","locale":null,"templateId":1576122,"emailSubject":"DWH TEST","labels":[],"createdAt":"2020-04-06 15:06:16 +00:00","templateName":"DWH TEST","messageTypeId":27043,"experimentId":79413,"campaignName":"DWH Test Automation","workflowId":79370,"email":"<a href="https://stackoverflow.com/cdn-cgi/l/email-protection" class="__cf_email__" data-cfemail="0d693c4d68756c607d6168236e6260" rel="noreferrer noopener nofollow">[email protected]</a>","channelId":24365}}{"email":"<a href="https://stackoverflow.com/cdn-cgi/l/email-protection" class="__cf_email__" data-cfemail="88ececb9c8edf0e9e5f8e4eda6ebe7e5" rel="noreferrer noopener nofollow">[email protected]</a>","eventName":"emailOpen","dataFields":{"campaignId":1100,"ip":"50.100.200.243","userAgentDevice":"Gmail","messageId":"zz4e071c11594bb0b4ee3c444fd08b99","emailId":"<a href="https://stackoverflow.com/cdn-cgi/l/email-protection" class="__cf_email__" data-cfemail="187c7c29587d60797568747d367b7775" rel="noreferrer noopener nofollow">[email protected]</a>","userAgent":"Mozilla/5.0 (Windows NT 5.1; rv:11.0) Gecko Firefox/11.0 (via ggpht.com GoogleImageProxy)","workflowName":"TEST","locale":null,"templateId":1576122,"emailSubject":"TEST","labels":"Cambbridge test","createdAt":"2020-04-10 15:06:16 +00:00","templateName":"TEST","messageTypeId":27043,"experimentId":89413,"campaignName":"Cambridge Test","workflowId":18370,"email":"<a href="https://stackoverflow.com/cdn-cgi/l/email-protection" class="__cf_email__" data-cfemail="3b5f5f0a7b5e435a564b575e15585456" rel="noreferrer noopener nofollow">[email protected]</a>","channelId":1111}}{"email":"<a href="https://stackoverflow.com/cdn-cgi/l/email-protection" class="__cf_email__" data-cfemail="70141441301508111d001c155e131f1d" rel="noreferrer noopener nofollow">[email protected]</a>","eventName":"emailClick","dataFields":{"campaignId":1100,"ip":"50.100.200.243","userAgentDevice":"Gmail","messageId":"zzee071c11594bb0b4ee3c444fd08b99","emailId":"<a href="https://stackoverflow.com/cdn-cgi/l/email-protection" class="__cf_email__" data-cfemail="f99d9dc8b99c81989489959cd79a9694" rel="noreferrer noopener nofollow">[email protected]</a>","userAgent":"Mozilla/5.0 (Windows NT 5.1; rv:11.0) Gecko Firefox/11.0 (via ggpht.com GoogleImageProxy)","workflowName":"TEST","locale":null,"templateId":1576122,"emailSubject":"TEST","labels":"Cambbridge test","createdAt":"2020-04-10 15:08:16 +00:00","templateName":"TEST","messageTypeId":27043,"experimentId":89413,"campaignName":"Cambridge Test","workflowId":18370,"email":"<a href="https://stackoverflow.com/cdn-cgi/l/email-protection" class="__cf_email__" data-cfemail="adc9c99cedc8d5ccc0ddc1c883cec2c0" rel="noreferrer noopener nofollow">[email protected]</a>","channelId":1111}}{"test":"valid2","workflowId":79370,"email":"<a href="https://stackoverflow.com/cdn-cgi/l/email-protection" class="__cf_email__" data-cfemail="0f6b3e4f6a776e627f636a216c6062" rel="noreferrer noopener nofollow">[email protected]</a>"}{"email":"<a href="https://stackoverflow.com/cdn-cgi/l/email-protection" class="__cf_email__" data-cfemail="02663342677a636f726e672c616d6f" rel="noreferrer noopener nofollow">[email protected]</a>","eventName":"emailOpen","dataFields":{"campaignId":1125010,"ip":"100.100.200.243","userAgentDevice":"Gmail","messageId":"be4e071c11594bb0b4ee3c444fd08b99","emailId":"<a href="https://stackoverflow.com/cdn-cgi/l/email-protection" class="__cf_email__" data-cfemail="a9cd98e9ccd1c8c4d9c5cc87cac6c4" rel="noreferrer noopener nofollow">[email protected]</a>","userAgent":"Mozilla/5.0 (Windows NT 5.1; rv:11.0) Gecko Firefox/11.0 (via ggpht.com GoogleImageProxy)","workflowName":"DWH TEST 06042020 WITH CALL","locale":null,"templateId":1576122,"emailSubject":"DWH TEST","labels":[],"createdAt":"2020-04-06 15:06:16 +00:00","templateName":"DWH TEST","messageTypeId":27043,"experimentId":79413,"campaignName":"DWH Test Automation","workflowId":79370,"email":"<a href="https://stackoverflow.com/cdn-cgi/l/email-protection" class="__cf_email__" data-cfemail="91f5a0d1f4e9f0fce1fdf4bff2fefc" rel="noreferrer noopener nofollow">[email protected]</a>","channelId":24365}}
正如你所看到的,一行中有多个 json。 我需要将额外的 json json 对象 "{"test":"valid2","workflowId":79370,"email":"[email protected] "}"与后面跟随的所有/任何事件 json 关联起来,只要有工作流程Id 和电子邮件额外的 json 与事件的工作流 ID 和电子邮件相匹配。
在一个单独的事件中可以有多个此类额外的 json。 我不知道如何使用 python 和 pyspark 的组合来准备这样的文件。 使用 pyspark 是强制性的。 我尝试过:
df = sql_context.read.json('test.json')
df.show()
但输出只是额外的 json :
+--------------+------+----------+
| email| test|workflowId|
+--------------+------+----------+
|<a href="https://stackoverflow.com/cdn-cgi/l/email-protection" class="__cf_email__" data-cfemail="402371002538212d302c256e232f2d" rel="noreferrer noopener nofollow">[email protected]</a>|valid1| 79370|
+--------------+------+----------+
我希望输出看起来像:
id email event workflow_id custom createdatdate createdattime
0 be4e071c11594bb0b4ee3c444fd08b99 <a href="https://stackoverflow.com/cdn-cgi/l/email-protection" class="__cf_email__" data-cfemail="dabeeb9abfa2bbb7aab6bff4b9b5b7" rel="noreferrer noopener nofollow">[email protected]</a> emailOpen 79370 valid2 2020414 154248
1 be4e071c11594bb0b4ee3c444fd08b99 <a href="https://stackoverflow.com/cdn-cgi/l/email-protection" class="__cf_email__" data-cfemail="f397c2b3968b929e839f96dd909c9e" rel="noreferrer noopener nofollow">[email protected]</a> emailOpen 79370 valid2 2020414 154248
任何人都可以指导我如何处理这样的文件并使用 pyspark 获得结果 df
最佳答案
由于这是格式错误的 JSON,我建议您运行修复该文件的预处理步骤。这可以使用 jq
命令行实用程序轻松完成。请参阅here .
-c
标志用于紧凑输出,并且将导致以换行符分隔的 JSON,而不是 pretty-print 。
jq -c . test.json > test_repaired.json
然后您可以使用 Spark 读取该文件,如下所示:
>>> spark \
... .read \
... .json('test_repaired.json') \
... .show()
+--------------------+---------------+----------+------+----------+
| dataFields| email| eventName| test|workflowId|
+--------------------+---------------+----------+------+----------+
| null| <a href="https://stackoverflow.com/cdn-cgi/l/email-protection" class="__cf_email__" data-cfemail="6c085d2c09140d011c0009420f0301" rel="noreferrer noopener nofollow">[email protected]</a>| null|valid2| 79370|
|{1125010, DWH Tes...| <a href="https://stackoverflow.com/cdn-cgi/l/email-protection" class="__cf_email__" data-cfemail="593d68193c21383429353c773a3634" rel="noreferrer noopener nofollow">[email protected]</a>| emailOpen| null| null|
|{1100, Cambridge ...|<a href="https://stackoverflow.com/cdn-cgi/l/email-protection" class="__cf_email__" data-cfemail="593d3d68193c21383429353c773a3634" rel="noreferrer noopener nofollow">[email protected]</a>| emailOpen| null| null|
|{1100, Cambridge ...|<a href="https://stackoverflow.com/cdn-cgi/l/email-protection" class="__cf_email__" data-cfemail="492d2d78092c31282439252c672a2624" rel="noreferrer noopener nofollow">[email protected]</a>|emailClick| null| null|
| null| <a href="https://stackoverflow.com/cdn-cgi/l/email-protection" class="__cf_email__" data-cfemail="bcd88dfcd9c4ddd1ccd0d992dfd3d1" rel="noreferrer noopener nofollow">[email protected]</a>| null|valid2| 79370|
|{1125010, DWH Tes...| <a href="https://stackoverflow.com/cdn-cgi/l/email-protection" class="__cf_email__" data-cfemail="a6c297e6c3dec7cbd6cac388c5c9cb" rel="noreferrer noopener nofollow">[email protected]</a>| emailOpen| null| null|
+--------------------+---------------+----------+------+----------+
关于python - 在pyspark中解析一行中的多个json,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/69351498/