python - Pyspark 展平 RDD 错误::太多值无法解压

标签 python apache-spark pyspark rdd

我正在尝试展平 RDD 中的数据。 RDD 的结构为一个 4 元组列表,其中第一个元素 - Primary_id ,第二个元素 - 字典列表,第三个和第四个元素各自包含一个包含字典的列表。

rdd=   [('xxxxx99', [{'cov_id':'Q', 'cov_cd':'100','cov_amt':'100', 'cov_state':'AZ'},
                  {'cov_id':'Q', 'cov_cd':'33','cov_amt':'200', 'cov_state':'AZ'},
                  {'cov_id':'Q', 'cov_cd':'64','cov_amt':'10', 'cov_state':'AZ'}],
                  [{'pol_cat_id':'234','pol_dt':'20100220'}],
                  [{'qor_pol_id':'23492','qor_cd':'30'}]),

     ('xxxxx86', [{'cov_id':'R', 'cov_cd':'20','cov_amt':'100', 'cov_state':'TX'},
                  {'cov_id':'R', 'cov_cd':'44','cov_amt':'500', 'cov_state':'TX'},
                  {'cov_id':'R', 'cov_cd':'66','cov_amt':'50', 'cov_state':'TX'}],
                  [{'pol_cat_id':'532','pol_dt':'20091020'}],
                  [{'qor_pol_id':'49320','qor_cd':'21'}]) ]

我想展平数据,使其以格式显示

enter image description here

我该如何在 Pyspark 中执行此操作?

这是我尝试过的,但这给了我一个错误:太多元组无法解压

def flatten_map(record):
    try:
        yield(record)
        # Unpack items
        id, items, line, pls = record
        pol_id = pls["pol_cat_id"]
        pol_dt = pls["pol_dt"]
        qor_id = pls["qor_pol_id"]
        for item in items:
            yield (id,item["cov_id"],item["cov_cd"], item["cov_amt"], item["cov_state"], pol_id, pol_dt, qor_id), 1
    except Exception as e:
        pass

 result = (rdd
    # Expand data
    .flatMap(flatten_map)
    # Flatten tuples
    .map(lambda x: x[0], ))

如果需要,我可以发布完整的错误,但为了简洁起见,

ValueError: too many values to unpack (expected 2)

注意:转换为 pandas 不起作用,因为 RDD 太大

最佳答案

IIUC,您可以通过使用列表理解来迭代 4 项元组(1 个字符串 + 3 个列表)的第 2 项来运行 flatMap(),例如:

from pyspark.sql import Row

myrdd = sc.parallelize(rdd)

myrdd.flatMap(lambda x: [ ({'primary_id':x[0]}, z, x[2][0], x[3][0]) for z in x[1] ] ).collect()
#[({'primary_id': 'xxxxx99'},
#  {'cov_id': 'Q', 'cov_cd': '100', 'cov_amt': '100', 'cov_state': 'AZ'},
#  {'pol_cat_id': '234', 'pol_dt': '20100220'},
#  {'qor_pol_id': '23492', 'qor_cd': '30'}),
# ......

简短说明:在flatMap函数的列表推导中,除了迭代第二项x[1] (如 z 这是一本字典),我还将第一个字符串项 x[0] 转换为包含一个条目的字典: {"primary_id":x[0]}并取 x[2]x[3] 的第一项,它们都是字典。

因此,运行上面的 flatMap 函数后,RDD 元素就变成了 4 个字典的元组,接下来你需要做的就是合并它们。下面是我的示例代码,用于将 4 个字典的元组映射到 Row 对象,您可能需要更改如何处理异常和缺失字段的逻辑以满足您自己的要求。

cols = ['primary_id', 'cov_id', 'cov_cd', 'cov_amt', 'cov_state', 'pol_cat_id', 'pol_dt', 'qor_pol_id', 'qor_cd']

def merge_dict(arr, cols):
  row = {}
  try:
    for e in arr:
      if type(e) is dict: row.update(e)
  except:
    pass
  finally:
    return Row(**dict({ c:row.get(c, None) for c in cols })) if row else None

myrdd.flatMap(lambda x: [ ({'primary_id':x[0]}, z, x[2][0], x[3][0]) for z in x[1] ] ) \
   .map(lambda x: merge_dict(x, cols)) \
   .filter(bool) \
   .toDF() \
   .show()
+-------+------+------+---------+----------+--------+----------+------+----------+
|cov_amt|cov_cd|cov_id|cov_state|pol_cat_id|  pol_dt|primary_id|qor_cd|qor_pol_id|
+-------+------+------+---------+----------+--------+----------+------+----------+
|    100|   100|     Q|       AZ|       234|20100220|   xxxxx99|    30|     23492|
|    200|    33|     Q|       AZ|       234|20100220|   xxxxx99|    30|     23492|
|     10|    64|     Q|       AZ|       234|20100220|   xxxxx99|    30|     23492|
|    100|    20|     R|       TX|       532|20091020|   xxxxx86|    21|     49320|
|    500|    44|     R|       TX|       532|20091020|   xxxxx86|    21|     49320|
|     50|    66|     R|       TX|       532|20091020|   xxxxx86|    21|     49320|
+-------+------+------+---------+----------+--------+----------+------+----------+

顺便说一句。如果您想让原来的函数正常工作,请检查以下包含 #<-- 的 5 行。 :

def flatten_map(record): 
  try: 
    #yield(record)    #<-- comment this out, no need unprocessed data in output
    # Unpack items 
    id, items, line, pls = record 
    pol_id = line[0]["pol_cat_id"]      #<-- from line[0] not pls
    pol_dt = line[0]["pol_dt"]          #<-- from line[0] not pls
    qor_id = pls[0]["qor_pol_id"]       #<-- from pls[0] not pls
    for item in items: 
      #<-- below line removed the ending ", 1", thus no need the last map() function to flatten tuples
      yield (id,item["cov_id"],item["cov_cd"], item["cov_amt"], item["cov_state"], pol_id, pol_dt, qor_id)
  except Exception as e: 
    pass 

关于python - Pyspark 展平 RDD 错误::太多值无法解压,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/58091885/

相关文章:

python - 文本大小变化持久性

未调用python cmd completedefault()方法

python - BeautifulSoup:删除小部件

apache-spark - CassandraDaemon.java :195 - Exception in thread Thread[MemtableFlushWriter:10, 5,主要]

pyspark - 如何添加一列指示磁盘上文件的行号?

python - 多个 RDD 的 Spark union

apache-spark - 如何将字符串冒号分隔的列转换为 MapType?

python - 如何使用 Python tkinter 设置消息框的字体?

python - 将 RDD 写入 PySpark 中的多个文件

java - Spark - 如何使用给定权限写入文件