python-3.x - 如何实现自定义 Pyspark 爆炸(用于结构数组),4 列合 1 爆炸?

标签 python-3.x apache-spark pyspark apache-spark-sql

我正在尝试在 Pyspark 中实现自定义爆炸。我有 4 列,它们是具有几乎相同架构的结构数组(一列结构包含的字段比其他三列少一个)。

对于我的 DataFrame 中的每一行,我有 4 列是结构数组。列是学生、助教、教师、管理员。

students、teaching_assistants 和 teachers 是具有字段 idstudent_levelname 的结构数组。

例如,这是 DataFrame 中的示例行。

enter image description here

students、teaching_assistants 和 teachers 结构都具有相同的模式(“id”、“student_level”、“name”),而 administrators 结构具有“id”和“name”字段但缺少学生级别。

我想执行自定义展开,以便在每一行中为每个学生、助教、教授和管理员提供一个条目以及原始列名称,以防我必须按“人员类型”进行搜索。 所以对于上面一行的屏幕截图,输出将是 8 行:

+-----------+---------------------+----+---------------+----------+
| School_id |        type         | id | student_level |   name   |
+-----------+---------------------+----+---------------+----------+
|      1999 | students            |  1 | 0             | Brian    |
|      1999 | students            |  9 | 2             | Max      |
|      1999 | teaching_assistants | 19 | 0             | Xander   |
|      1999 | teachers            | 21 | 0             | Charlene |
|      1999 | teachers            | 12 | 2             | Rob      |
|      1999 | administrators      | 23 | None          | Marsha   |
|      1999 | administrators      | 11 | None          | Ryan     |
|      1999 | administrators      | 14 | None          | Bob      |
+-----------+---------------------+----+---------------+----------+

对于管理员来说,student_level 列将为空。问题是如果我使用 explode 函数,我最终会将所有这些项目放在不同的列中。

是否可以在 Pyspark 中完成此操作?我的一个想法是弄清楚如何将 4 个数组列组合成 1 个数组,然后对数组进行分解,尽管我不确定组合结构数组并将列名作为字段是否可行(我'我已经尝试过各种方法)而且我也不知道如果管理员遗漏了一个字段是否可行。

过去,我通过转换为 RDD 并使用平面图/自定义 udf 来完成此操作,但对于数百万行来说效率非常低。

最佳答案

想法是使用stack转换列 students , teaching_assistants , teachersadministrators分成单独的行,每个行都有正确的值 type .之后,可以分解包含数据的列,然后将单个结构的元素转换为单独的列。

使用 stack要求堆叠的所有列都具有相同的类型。这意味着所有列必须包含相同结构的数组,并且该结构的所有元素的可空性必须匹配。因此 administrators列必须首先转换为正确的结构类型。

df.withColumn("administrators", F.expr("transform(administrators, " +
        "a -> if(1<2,named_struct('id', a.id, 'name', a.name, 'student_level', "+
              "cast(null as long)),null))"))\
  .select("School_id", F.expr("stack(4, 'students', students, "+
          "'teaching_assistants', teaching_assistants, 'teachers', teachers, "+
          "'administrators', administrators) as (type, temp1)")) \
  .withColumn("temp2", F.explode("temp1"))\
  .select("School_id", "type", "temp2.id", "temp2.name", "temp2.student_level")\
  .show()

打印

+---------+-------------------+---+--------+-------------+
|School_id|               type| id|    name|student_level|
+---------+-------------------+---+--------+-------------+
|     1999|           students|  1|   Brian|            0|
|     1999|           students|  9|     Max|            2|
|     1999|teaching_assistants| 19|  Xander|            0|
|     1999|           teachers| 21|Charlene|            0|
|     1999|           teachers| 12|     Rob|            2|
|     1999|     administrators| 23|  Marsha|         null|
|     1999|     administrators| 11|    Ryan|         null|
|     1999|     administrators| 14|     Bob|         null|
+---------+-------------------+---+--------+-------------+

奇怪的样子if(1<2, named_struct(...), null)在第一行中,必须为 administrators 的元素设置正确的可空性数组。

此解决方案适用于 Spark 2.4+。如果有可能 transform administrators struct 在上一步中,此解决方案也适用于早期版本。

关于python-3.x - 如何实现自定义 Pyspark 爆炸(用于结构数组),4 列合 1 爆炸?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/64216766/

相关文章:

python - 如何用Python获取POST参数?

python - 如何在Python中用n个字符填充字符串以使其达到一定长度

python-3.x - 我如何Dockeries一个包含spark依赖项的python脚本?

python - Spark Dataframe 是否具有与 Panda 的合并指示器等效的选项?

python - 在 Python 3 中导入 numba 时出错

django - 一个 BeautifulSoup 文档可以使用多个过滤器吗?

apache-spark - 在集群模式下通过spark-submit共享配置文件

java - 在 ubuntu 上安装配置单元(derby 有问题?)

apache-spark - 得到执行失败 : HTTP 403 when submit spark on k8s

python - 为什么我的 Spark DataFrame 比 RDD 慢很多?