我正在尝试展平 RDD 中的数据。 RDD 的结构为一个 4 元组列表,其中第一个元素 - Primary_id ,第二个元素 - 字典列表,第三个和第四个元素各自包含一个包含字典的列表。
rdd= [('xxxxx99', [{'cov_id':'Q', 'cov_cd':'100','cov_amt':'100', 'cov_state':'AZ'},
{'cov_id':'Q', 'cov_cd':'33','cov_amt':'200', 'cov_state':'AZ'},
{'cov_id':'Q', 'cov_cd':'64','cov_amt':'10', 'cov_state':'AZ'}],
[{'pol_cat_id':'234','pol_dt':'20100220'}],
[{'qor_pol_id':'23492','qor_cd':'30'}]),
('xxxxx86', [{'cov_id':'R', 'cov_cd':'20','cov_amt':'100', 'cov_state':'TX'},
{'cov_id':'R', 'cov_cd':'44','cov_amt':'500', 'cov_state':'TX'},
{'cov_id':'R', 'cov_cd':'66','cov_amt':'50', 'cov_state':'TX'}],
[{'pol_cat_id':'532','pol_dt':'20091020'}],
[{'qor_pol_id':'49320','qor_cd':'21'}]) ]
我想展平数据,使其以格式显示
我该如何在 Pyspark 中执行此操作?
这是我尝试过的,但这给了我一个错误:太多元组无法解压
def flatten_map(record):
try:
yield(record)
# Unpack items
id, items, line, pls = record
pol_id = pls["pol_cat_id"]
pol_dt = pls["pol_dt"]
qor_id = pls["qor_pol_id"]
for item in items:
yield (id,item["cov_id"],item["cov_cd"], item["cov_amt"], item["cov_state"], pol_id, pol_dt, qor_id), 1
except Exception as e:
pass
result = (rdd
# Expand data
.flatMap(flatten_map)
# Flatten tuples
.map(lambda x: x[0], ))
如果需要,我可以发布完整的错误,但为了简洁起见,
ValueError: too many values to unpack (expected 2)
注意:转换为 pandas 不起作用,因为 RDD 太大
最佳答案
IIUC,您可以通过使用列表理解来迭代 4 项元组(1 个字符串 + 3 个列表)的第 2 项来运行 flatMap(),例如:
from pyspark.sql import Row
myrdd = sc.parallelize(rdd)
myrdd.flatMap(lambda x: [ ({'primary_id':x[0]}, z, x[2][0], x[3][0]) for z in x[1] ] ).collect()
#[({'primary_id': 'xxxxx99'},
# {'cov_id': 'Q', 'cov_cd': '100', 'cov_amt': '100', 'cov_state': 'AZ'},
# {'pol_cat_id': '234', 'pol_dt': '20100220'},
# {'qor_pol_id': '23492', 'qor_cd': '30'}),
# ......
简短说明:在flatMap函数的列表推导中,除了迭代第二项x[1]
(如 z
这是一本字典),我还将第一个字符串项 x[0] 转换为包含一个条目的字典: {"primary_id":x[0]}
并取 x[2] 和 x[3] 的第一项,它们都是字典。
因此,运行上面的 flatMap 函数后,RDD 元素就变成了 4 个字典的元组,接下来你需要做的就是合并它们。下面是我的示例代码,用于将 4 个字典的元组映射到 Row 对象,您可能需要更改如何处理异常和缺失字段的逻辑以满足您自己的要求。
cols = ['primary_id', 'cov_id', 'cov_cd', 'cov_amt', 'cov_state', 'pol_cat_id', 'pol_dt', 'qor_pol_id', 'qor_cd']
def merge_dict(arr, cols):
row = {}
try:
for e in arr:
if type(e) is dict: row.update(e)
except:
pass
finally:
return Row(**dict({ c:row.get(c, None) for c in cols })) if row else None
myrdd.flatMap(lambda x: [ ({'primary_id':x[0]}, z, x[2][0], x[3][0]) for z in x[1] ] ) \
.map(lambda x: merge_dict(x, cols)) \
.filter(bool) \
.toDF() \
.show()
+-------+------+------+---------+----------+--------+----------+------+----------+
|cov_amt|cov_cd|cov_id|cov_state|pol_cat_id| pol_dt|primary_id|qor_cd|qor_pol_id|
+-------+------+------+---------+----------+--------+----------+------+----------+
| 100| 100| Q| AZ| 234|20100220| xxxxx99| 30| 23492|
| 200| 33| Q| AZ| 234|20100220| xxxxx99| 30| 23492|
| 10| 64| Q| AZ| 234|20100220| xxxxx99| 30| 23492|
| 100| 20| R| TX| 532|20091020| xxxxx86| 21| 49320|
| 500| 44| R| TX| 532|20091020| xxxxx86| 21| 49320|
| 50| 66| R| TX| 532|20091020| xxxxx86| 21| 49320|
+-------+------+------+---------+----------+--------+----------+------+----------+
顺便说一句。如果您想让原来的函数正常工作,请检查以下包含 #<--
的 5 行。 :
def flatten_map(record):
try:
#yield(record) #<-- comment this out, no need unprocessed data in output
# Unpack items
id, items, line, pls = record
pol_id = line[0]["pol_cat_id"] #<-- from line[0] not pls
pol_dt = line[0]["pol_dt"] #<-- from line[0] not pls
qor_id = pls[0]["qor_pol_id"] #<-- from pls[0] not pls
for item in items:
#<-- below line removed the ending ", 1", thus no need the last map() function to flatten tuples
yield (id,item["cov_id"],item["cov_cd"], item["cov_amt"], item["cov_state"], pol_id, pol_dt, qor_id)
except Exception as e:
pass
关于python - Pyspark 展平 RDD 错误::太多值无法解压,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/58091885/