python - 如何从 PySpark 中的字符串获取列表

标签 python apache-spark pyspark apache-spark-sql

PySpark 中是否有类似于 eval 函数的等效函数。

我正在尝试将 Python 代码转换为 PySpark

我正在查询数据框,其中一列的数据如下所示,但采用字符串格式

[{u'date': u'2015-02-08', u'by': u'<a href="https://stackoverflow.com/cdn-cgi/l/email-protection" class="__cf_email__" data-cfemail="177675745770703974787a" rel="noreferrer noopener nofollow">[email protected]</a>', u'value': u'NA'}, {u'date': u'2016-02-08', u'by': u'<a href="https://stackoverflow.com/cdn-cgi/l/email-protection" class="__cf_email__" data-cfemail="61050706211800004f020e0c" rel="noreferrer noopener nofollow">[email protected]</a>', u'value': u'applicable'}, {u'date': u'2017-02-08', u'by': u'<a href="https://stackoverflow.com/cdn-cgi/l/email-protection" class="__cf_email__" data-cfemail="8afdf8fdefcae2e5fea4e9e5e7" rel="noreferrer noopener nofollow">[email protected]</a>', u'value': u'ufc'}]

假设“x”是数据框中保存该值的列。

现在我想传入字符串列“x”并获取列表,以便我可以将其传递给mapPartition函数。

我想避免迭代驱动程序上的每一行,这就是我这样想的原因。

在 Python 中使用 eval() 函数(如果使用):我得到以下输出:

x = "[{u'date': u'2015-02-08', u'by': u'<a href="https://stackoverflow.com/cdn-cgi/l/email-protection" class="__cf_email__" data-cfemail="98f9fafbd8ffffb6fbf7f5" rel="noreferrer noopener nofollow">[email protected]</a>', u'value': u'NA'}, {u'date': u'2016-02-08', u'by': u'<a href="https://stackoverflow.com/cdn-cgi/l/email-protection" class="__cf_email__" data-cfemail="c3a7a5a483baa2a2eda0acae" rel="noreferrer noopener nofollow">[email protected]</a>', u'value': u'applicable'}, {u'date': u'2017-02-08', u'by': u'<a href="https://stackoverflow.com/cdn-cgi/l/email-protection" class="__cf_email__" data-cfemail="691e1b1e0c2901061d470a0604" rel="noreferrer noopener nofollow">[email protected]</a>', u'value': u'ufc'}]"

list = eval(x)

for i in list:  print i

输出:(这也是我在 PySpark 中想要的)

{u'date': u'2015-02-08', u'by': u'<a href="https://stackoverflow.com/cdn-cgi/l/email-protection" class="__cf_email__" data-cfemail="95f4f7f6d5f2f2bbf6faf8" rel="noreferrer noopener nofollow">[email protected]</a>', u'value': u'NA'}
{u'date': u'2016-02-08', u'by': u'<a href="https://stackoverflow.com/cdn-cgi/l/email-protection" class="__cf_email__" data-cfemail="afcbc9c8efd6cece81ccc0c2" rel="noreferrer noopener nofollow">[email protected]</a>', u'value': u'applicable'}
{u'date': u'2017-02-08', u'by': u'<a href="https://stackoverflow.com/cdn-cgi/l/email-protection" class="__cf_email__" data-cfemail="1d6a6f6a785d757269337e7270" rel="noreferrer noopener nofollow">[email protected]</a>', u'value': u'ufc'}

如何在 PySpark 中执行此操作?

最佳答案

您可以通过使用 from_json 函数将 json 字符串转换为实际的 json 受益。为此,您必须定义一个与 json 字符串匹配的 schema 。最后使用 explode 函数将结构数组分隔到不同的行,就像使用 eval 一样。

如果您的数据为

x = "[{u'date': u'2015-02-08', u'by': u'<a href="https://stackoverflow.com/cdn-cgi/l/email-protection" class="__cf_email__" data-cfemail="91f0f3f2d1f6f6bff2fefc" rel="noreferrer noopener nofollow">[email protected]</a>', u'value': u'NA'}, {u'date': u'2016-02-08', u'by': u'<a href="https://stackoverflow.com/cdn-cgi/l/email-protection" class="__cf_email__" data-cfemail="a3c7c5c4e3dac2c28dc0ccce" rel="noreferrer noopener nofollow">[email protected]</a>', u'value': u'applicable'}, {u'date': u'2017-02-08', u'by': u'<a href="https://stackoverflow.com/cdn-cgi/l/email-protection" class="__cf_email__" data-cfemail="fc8b8e8b99bc949388d29f9391" rel="noreferrer noopener nofollow">[email protected]</a>', u'value': u'ufc'}]"

然后创建dataframe

df = sqlContext.createDataFrame([(x,),], ["x"])

+---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
|x                                                                                                                                                                                                              |
+---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
|[{u'date': u'2015-02-08', u'by': u'<a href="https://stackoverflow.com/cdn-cgi/l/email-protection" class="__cf_email__" data-cfemail="0a6b68694a6d6d24696567" rel="noreferrer noopener nofollow">[email protected]</a>', u'value': u'NA'}, {u'date': u'2016-02-08', u'by': u'<a href="https://stackoverflow.com/cdn-cgi/l/email-protection" class="__cf_email__" data-cfemail="9df9fbfadde4fcfcb3fef2f0" rel="noreferrer noopener nofollow">[email protected]</a>', u'value': u'applicable'}, {u'date': u'2017-02-08', u'by': u'<a href="https://stackoverflow.com/cdn-cgi/l/email-protection" class="__cf_email__" data-cfemail="5a2d282d3f1a32352e74393537" rel="noreferrer noopener nofollow">[email protected]</a>', u'value': u'ufc'}]|
+---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+


root
 |-- x: string (nullable = true)

使用 json

正如我所解释的,您需要一个 schemaregexp_replace 函数、from_json 函数和 explode 函数如

from pyspark.sql import types as T
schema = T.ArrayType(T.StructType([T.StructField('date', T.StringType()), T.StructField('by', T.StringType()), T.StructField('value', T.StringType())]))

from pyspark.sql import functions as F
df = df.withColumn("x", F.explode(F.from_json(F.regexp_replace(df['x'], "(u')", "'"), schema=schema)))

这应该给你

+-----------------------------------+
|x                                  |
+-----------------------------------+
|[2015-02-08,<a href="https://stackoverflow.com/cdn-cgi/l/email-protection" class="__cf_email__" data-cfemail="4f2e2d2c0f2828612c2022" rel="noreferrer noopener nofollow">[email protected]</a>,NA]         |
|[2016-02-08,<a href="https://stackoverflow.com/cdn-cgi/l/email-protection" class="__cf_email__" data-cfemail="d4b0b2b394adb5b5fab7bbb9" rel="noreferrer noopener nofollow">[email protected]</a>,applicable]|
|[2017-02-08,<a href="https://stackoverflow.com/cdn-cgi/l/email-protection" class="__cf_email__" data-cfemail="6710151002270f08134904080a" rel="noreferrer noopener nofollow">[email protected]</a>,ufc]      |
+-----------------------------------+

root
 |-- x: struct (nullable = true)
 |    |-- date: string (nullable = true)
 |    |-- by: string (nullable = true)
 |    |-- value: string (nullable = true)

如果您需要问题中提到的 json 字符串,那么您可以使用 to_json 函数作为

df = df.withColumn("x", F.to_json(df['x']))

这会给你

+-------------------------------------------------------------+
|x                                                            |
+-------------------------------------------------------------+
|{"date":"2015-02-08","by":"<a href="https://stackoverflow.com/cdn-cgi/l/email-protection" class="__cf_email__" data-cfemail="711013123116165f121e1c" rel="noreferrer noopener nofollow">[email protected]</a>","value":"NA"}         |
|{"date":"2016-02-08","by":"<a href="https://stackoverflow.com/cdn-cgi/l/email-protection" class="__cf_email__" data-cfemail="482c2e2f08312929662b2725" rel="noreferrer noopener nofollow">[email protected]</a>","value":"applicable"}|
|{"date":"2017-02-08","by":"<a href="https://stackoverflow.com/cdn-cgi/l/email-protection" class="__cf_email__" data-cfemail="6710151002270f08134904080a" rel="noreferrer noopener nofollow">[email protected]</a>","value":"ufc"}      |
+-------------------------------------------------------------+

仅使用字符串

如果您不想经历 json 的所有复杂性,那么您可以简单地使用字符串。为此,您需要嵌套regex_replacesplitexplode函数作为

from pyspark.sql import functions as F
df = df.withColumn("x", F.explode(F.split(F.regexp_replace(F.regexp_replace(F.regexp_replace(df['x'], "(u')", "'"), "[\\[\\]\s]", ""), "},\\{", "};&;{"), ";&;")))

这应该给你

+-------------------------------------------------------------+
|x                                                            |
+-------------------------------------------------------------+
|{'date':'2015-02-08','by':'<a href="https://stackoverflow.com/cdn-cgi/l/email-protection" class="__cf_email__" data-cfemail="ff9e9d9cbf9898d19c9092" rel="noreferrer noopener nofollow">[email protected]</a>','value':'NA'}         |
|{'date':'2016-02-08','by':'<a href="https://stackoverflow.com/cdn-cgi/l/email-protection" class="__cf_email__" data-cfemail="80e4e6e7c0f9e1e1aee3efed" rel="noreferrer noopener nofollow">[email protected]</a>','value':'applicable'}|
|{'date':'2017-02-08','by':'<a href="https://stackoverflow.com/cdn-cgi/l/email-protection" class="__cf_email__" data-cfemail="6f181d180a2f07001b410c0002" rel="noreferrer noopener nofollow">[email protected]</a>','value':'ufc'}      |
+-------------------------------------------------------------+

关于python - 如何从 PySpark 中的字符串获取列表,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/49204024/

相关文章:

python - 如何在查询集中创建具有过滤值的另一列

Python numpy 从 1.6 更新到 1.8

apache-spark - 如何在 SparkR 中进行 map 和 reduce

apache-spark - KMeans 的不平衡因子?

apache-spark - 在 spark 本地模式下从 worker 内部写入文件不会写入?

python - pythons tar 文件是否完成添加的文件,或者等到其关闭?

python - 将 pandas groupby() 中的值提取到结合单个值和 numpy 数组的新数据集中

python - 如何将 numpy.array 作为新列添加到 pyspark.SQL DataFrame?

scala - Spark 无法与 pureconfig 一起使用

scala - 当我尝试通过 Cloudera VM 在 spark 中运行 scala 命令时,topology.py 出现语法错误