PySpark 中是否有类似于 eval 函数的等效函数。
我正在尝试将 Python 代码转换为 PySpark
我正在查询数据框,其中一列的数据如下所示,但采用字符串格式。
[{u'date': u'2015-02-08', u'by': u'<a href="https://stackoverflow.com/cdn-cgi/l/email-protection" class="__cf_email__" data-cfemail="177675745770703974787a" rel="noreferrer noopener nofollow">[email protected]</a>', u'value': u'NA'}, {u'date': u'2016-02-08', u'by': u'<a href="https://stackoverflow.com/cdn-cgi/l/email-protection" class="__cf_email__" data-cfemail="61050706211800004f020e0c" rel="noreferrer noopener nofollow">[email protected]</a>', u'value': u'applicable'}, {u'date': u'2017-02-08', u'by': u'<a href="https://stackoverflow.com/cdn-cgi/l/email-protection" class="__cf_email__" data-cfemail="8afdf8fdefcae2e5fea4e9e5e7" rel="noreferrer noopener nofollow">[email protected]</a>', u'value': u'ufc'}]
假设“x”是数据框中保存该值的列。
现在我想传入字符串列“x”并获取列表,以便我可以将其传递给mapPartition函数。
我想避免迭代驱动程序上的每一行,这就是我这样想的原因。
在 Python 中使用 eval() 函数(如果使用):我得到以下输出:
x = "[{u'date': u'2015-02-08', u'by': u'<a href="https://stackoverflow.com/cdn-cgi/l/email-protection" class="__cf_email__" data-cfemail="98f9fafbd8ffffb6fbf7f5" rel="noreferrer noopener nofollow">[email protected]</a>', u'value': u'NA'}, {u'date': u'2016-02-08', u'by': u'<a href="https://stackoverflow.com/cdn-cgi/l/email-protection" class="__cf_email__" data-cfemail="c3a7a5a483baa2a2eda0acae" rel="noreferrer noopener nofollow">[email protected]</a>', u'value': u'applicable'}, {u'date': u'2017-02-08', u'by': u'<a href="https://stackoverflow.com/cdn-cgi/l/email-protection" class="__cf_email__" data-cfemail="691e1b1e0c2901061d470a0604" rel="noreferrer noopener nofollow">[email protected]</a>', u'value': u'ufc'}]"
list = eval(x)
for i in list: print i
输出:(这也是我在 PySpark 中想要的)
{u'date': u'2015-02-08', u'by': u'<a href="https://stackoverflow.com/cdn-cgi/l/email-protection" class="__cf_email__" data-cfemail="95f4f7f6d5f2f2bbf6faf8" rel="noreferrer noopener nofollow">[email protected]</a>', u'value': u'NA'}
{u'date': u'2016-02-08', u'by': u'<a href="https://stackoverflow.com/cdn-cgi/l/email-protection" class="__cf_email__" data-cfemail="afcbc9c8efd6cece81ccc0c2" rel="noreferrer noopener nofollow">[email protected]</a>', u'value': u'applicable'}
{u'date': u'2017-02-08', u'by': u'<a href="https://stackoverflow.com/cdn-cgi/l/email-protection" class="__cf_email__" data-cfemail="1d6a6f6a785d757269337e7270" rel="noreferrer noopener nofollow">[email protected]</a>', u'value': u'ufc'}
如何在 PySpark 中执行此操作?
最佳答案
您可以通过使用 from_json
函数将 json 字符串转换为实际的 json 受益。为此,您必须定义一个与 json 字符串匹配的 schema
。最后使用 explode
函数将结构数组分隔到不同的行,就像使用 eval
一样。
如果您的数据为
x = "[{u'date': u'2015-02-08', u'by': u'<a href="https://stackoverflow.com/cdn-cgi/l/email-protection" class="__cf_email__" data-cfemail="91f0f3f2d1f6f6bff2fefc" rel="noreferrer noopener nofollow">[email protected]</a>', u'value': u'NA'}, {u'date': u'2016-02-08', u'by': u'<a href="https://stackoverflow.com/cdn-cgi/l/email-protection" class="__cf_email__" data-cfemail="a3c7c5c4e3dac2c28dc0ccce" rel="noreferrer noopener nofollow">[email protected]</a>', u'value': u'applicable'}, {u'date': u'2017-02-08', u'by': u'<a href="https://stackoverflow.com/cdn-cgi/l/email-protection" class="__cf_email__" data-cfemail="fc8b8e8b99bc949388d29f9391" rel="noreferrer noopener nofollow">[email protected]</a>', u'value': u'ufc'}]"
然后创建dataframe
df = sqlContext.createDataFrame([(x,),], ["x"])
+---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
|x |
+---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
|[{u'date': u'2015-02-08', u'by': u'<a href="https://stackoverflow.com/cdn-cgi/l/email-protection" class="__cf_email__" data-cfemail="0a6b68694a6d6d24696567" rel="noreferrer noopener nofollow">[email protected]</a>', u'value': u'NA'}, {u'date': u'2016-02-08', u'by': u'<a href="https://stackoverflow.com/cdn-cgi/l/email-protection" class="__cf_email__" data-cfemail="9df9fbfadde4fcfcb3fef2f0" rel="noreferrer noopener nofollow">[email protected]</a>', u'value': u'applicable'}, {u'date': u'2017-02-08', u'by': u'<a href="https://stackoverflow.com/cdn-cgi/l/email-protection" class="__cf_email__" data-cfemail="5a2d282d3f1a32352e74393537" rel="noreferrer noopener nofollow">[email protected]</a>', u'value': u'ufc'}]|
+---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
root
|-- x: string (nullable = true)
使用 json
正如我所解释的,您需要一个 schema
、regexp_replace
函数、from_json
函数和 explode
函数如
from pyspark.sql import types as T
schema = T.ArrayType(T.StructType([T.StructField('date', T.StringType()), T.StructField('by', T.StringType()), T.StructField('value', T.StringType())]))
from pyspark.sql import functions as F
df = df.withColumn("x", F.explode(F.from_json(F.regexp_replace(df['x'], "(u')", "'"), schema=schema)))
这应该给你
+-----------------------------------+
|x |
+-----------------------------------+
|[2015-02-08,<a href="https://stackoverflow.com/cdn-cgi/l/email-protection" class="__cf_email__" data-cfemail="4f2e2d2c0f2828612c2022" rel="noreferrer noopener nofollow">[email protected]</a>,NA] |
|[2016-02-08,<a href="https://stackoverflow.com/cdn-cgi/l/email-protection" class="__cf_email__" data-cfemail="d4b0b2b394adb5b5fab7bbb9" rel="noreferrer noopener nofollow">[email protected]</a>,applicable]|
|[2017-02-08,<a href="https://stackoverflow.com/cdn-cgi/l/email-protection" class="__cf_email__" data-cfemail="6710151002270f08134904080a" rel="noreferrer noopener nofollow">[email protected]</a>,ufc] |
+-----------------------------------+
root
|-- x: struct (nullable = true)
| |-- date: string (nullable = true)
| |-- by: string (nullable = true)
| |-- value: string (nullable = true)
如果您需要问题中提到的 json 字符串,那么您可以使用 to_json
函数作为
df = df.withColumn("x", F.to_json(df['x']))
这会给你
+-------------------------------------------------------------+
|x |
+-------------------------------------------------------------+
|{"date":"2015-02-08","by":"<a href="https://stackoverflow.com/cdn-cgi/l/email-protection" class="__cf_email__" data-cfemail="711013123116165f121e1c" rel="noreferrer noopener nofollow">[email protected]</a>","value":"NA"} |
|{"date":"2016-02-08","by":"<a href="https://stackoverflow.com/cdn-cgi/l/email-protection" class="__cf_email__" data-cfemail="482c2e2f08312929662b2725" rel="noreferrer noopener nofollow">[email protected]</a>","value":"applicable"}|
|{"date":"2017-02-08","by":"<a href="https://stackoverflow.com/cdn-cgi/l/email-protection" class="__cf_email__" data-cfemail="6710151002270f08134904080a" rel="noreferrer noopener nofollow">[email protected]</a>","value":"ufc"} |
+-------------------------------------------------------------+
仅使用字符串
如果您不想经历 json 的所有复杂性,那么您可以简单地使用字符串。为此,您需要嵌套regex_replace
、split
和explode
函数作为
from pyspark.sql import functions as F
df = df.withColumn("x", F.explode(F.split(F.regexp_replace(F.regexp_replace(F.regexp_replace(df['x'], "(u')", "'"), "[\\[\\]\s]", ""), "},\\{", "};&;{"), ";&;")))
这应该给你
+-------------------------------------------------------------+
|x |
+-------------------------------------------------------------+
|{'date':'2015-02-08','by':'<a href="https://stackoverflow.com/cdn-cgi/l/email-protection" class="__cf_email__" data-cfemail="ff9e9d9cbf9898d19c9092" rel="noreferrer noopener nofollow">[email protected]</a>','value':'NA'} |
|{'date':'2016-02-08','by':'<a href="https://stackoverflow.com/cdn-cgi/l/email-protection" class="__cf_email__" data-cfemail="80e4e6e7c0f9e1e1aee3efed" rel="noreferrer noopener nofollow">[email protected]</a>','value':'applicable'}|
|{'date':'2017-02-08','by':'<a href="https://stackoverflow.com/cdn-cgi/l/email-protection" class="__cf_email__" data-cfemail="6f181d180a2f07001b410c0002" rel="noreferrer noopener nofollow">[email protected]</a>','value':'ufc'} |
+-------------------------------------------------------------+
关于python - 如何从 PySpark 中的字符串获取列表,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/49204024/