python - 如何对多列、不同类型、不同长度进行爆破?

标签 python pyspark

我有一个包含不同时间周期(1/6、3/6、6/6 等)列的 DF,我想“分解”所有列以创建一个新的 DF,其中每一行都是1/6 周期。

from pyspark import Row 
from pyspark.sql import SparkSession 
from pyspark.sql.functions import explode, arrays_zip, col

spark = SparkSession.builder \
    .appName('DataFrame') \
    .master('local[*]') \
    .getOrCreate()

df = spark.createDataFrame([Row(a=1, b=[1, 2, 3, 4, 5, 6], c=[11, 22, 33], d=['foo'])])

|  a|                 b|           c|    d|
+---+------------------+------------+-----+
|  1|[1, 2, 3, 4, 5, 6]|[11, 22, 33]|[foo]|
+---+------------------+------------+-----+

我在做爆炸:

df2 = (df.withColumn("tmp", arrays_zip("b", "c", "d"))
       .withColumn("tmp", explode("tmp"))
       .select("a", col("tmp.b"), col("tmp.c"), "d"))

但输出不是我想要的:

|  a|  b|   c|    d|
+---+---+----+-----+
|  1|  1|  11|[foo]|
|  1|  2|  22|[foo]|
|  1|  3|  33|[foo]|
|  1|  4|null|[foo]|
|  1|  5|null|[foo]|
|  1|  6|null|[foo]|
+---+---+----+-----+

我希望它看起来像这样:

|  a|  b|  c|  d|
+---+---+---+---+
|  1|  1| 11|foo|
|   |  2|   |   |
|   |  3| 22|   |
|   |  4|   |   |
|   |  5| 33|   |
|   |  6|   |   |
+---+---+---+---+

我是 Spark 的新手,从一开始我就遇到了复杂的主题! :)

2019-07-15 更新:也许有人有不使用 UDF 的解决方案? -> @jxc 回答

2019-07-17 更新:也许有人有解决方案如何以更复杂的顺序更改空 <-> 值序列?就像在 c - Null, 11, Null, 22, Null, 33 或更复杂的情况下,我们希望 d 列中的第一个值是Null,下一个 foo 然后是 Null, Null, Null:

|  a|  b|  c|  d|
+---+---+---+---+
|  1|  1|   |   |
|   |  2| 11|foo|
|   |  3|   |   |
|   |  4| 22|   |
|   |  5|   |   |
|   |  6| 33|   |
+---+---+---+---+

最佳答案

这是一种不使用 udf 的方法:

更新于 2019/07/17: 调整了 SQL stmt 并将 N=6 作为参数添加到 SQL。

2019 年 7 月 16 日更新: 删除了临时列 t , 替换为常量 array(0,1,2,3,4,5)转换函数中。在这种情况下,我们可以直接对数组元素的值进行操作,而不是对它们的索引进行操作。

更新:我删除了原来使用String函数并将数组元素中的数据类型全部转换为String且效率较低的方法。 Spark 2.4+ 的 Spark SQL 高阶函数应该比原来的方法更好。

设置

from pyspark.sql import functions as F, Row

df = spark.createDataFrame([ Row(a=1, b=[1, 2, 3, 4, 5, 6], c=['11', '22', '33'], d=['foo'], e=[111,222]) ])

>>> df.show()
+---+------------------+------------+-----+----------+
|  a|                 b|           c|    d|         e|
+---+------------------+------------+-----+----------+
|  1|[1, 2, 3, 4, 5, 6]|[11, 22, 33]|[foo]|[111, 222]|
+---+------------------+------------+-----+----------+

# columns you want to do array-explode
cols = df.columns

# number of array elements to set
N = 6

使用 SQL 高阶函数:transform

使用Spark SQL高阶函数:transform(),执行如下操作:

  1. 创建以下 Spark SQL 代码,其中 {0} 将被 column_name 替换, {1} 将替换为 N :

    stmt = '''
       CASE
          WHEN '{0}' in ('d') THEN
            transform(sequence(0,{1}-1), x -> IF(x == 1, `{0}`[0], NULL))
          WHEN size(`{0}`) <= {1}/2 AND size(`{0}`) > 1 THEN
            transform(sequence(0,{1}-1), x -> IF(((x+1)*size(`{0}`))%{1} == 0, `{0}`[int((x-1)*size(`{0}`)/{1})], NULL))
          ELSE `{0}`
        END AS `{0}`
    '''
    

    注意: 数组转换仅在数组包含多个(除非在单独的 WHEN 子句中指定)和 <= N/2 时定义 元素(在本例中为 1 < size <= 3 )。其他大小的数组将保持原样。

  2. 使用 selectExpr() 为所有需要的列运行上面的 SQL

    df1 = df.withColumn('a', F.array('a')) \
            .selectExpr(*[ stmt.format(c,N) for c in cols ])
    
    >>> df1.show()
    +---+------------------+----------------+-----------+---------------+
    |  a|                 b|               c|          d|              e|
    +---+------------------+----------------+-----------+---------------+
    |[1]|[1, 2, 3, 4, 5, 6]|[, 11,, 22,, 33]|[, foo,,,,]|[,, 111,,, 222]|
    +---+------------------+----------------+-----------+---------------+
    
  3. 运行arrays_zip分解:

    df_new = df1.withColumn('vals', F.explode(F.arrays_zip(*cols))) \
                .select('vals.*') \
                .fillna('', subset=cols)
    
    >>> df_new.show()
    +----+---+---+---+----+
    |   a|  b|  c|  d|   e|
    +----+---+---+---+----+
    |   1|  1|   |   |null|
    |null|  2| 11|foo|null|
    |null|  3|   |   | 111|
    |null|  4| 22|   |null|
    |null|  5|   |   |null|
    |null|  6| 33|   | 222|
    +----+---+---+---+----+
    

    注意:fillna('', subset=cols)仅更改包含字符串的列

在一个方法链中:

df_new = df.withColumn('a', F.array('a')) \
           .selectExpr(*[ stmt.format(c,N) for c in cols ]) \
           .withColumn('vals', F.explode(F.arrays_zip(*cols))) \
           .select('vals.*') \
           .fillna('', subset=cols)

转换函数的解释:

转换函数(下面列出,反射(reflect)了需求的旧版本)

transform(sequence(0,5), x -> IF((x*size({0}))%6 == 0, {0}[int(x*size({0})/6)], NULL))

如帖子中所述, {0} 将替换为列名。这里我们使用column- c以包含 3 个元素为例:

  • 在转换函数中,sequence(0,5)创建常量数组 array(0,1,2,3,4,5)有 6 个元素,其余的用一个参数设置 lambda 函数 x具有元素的值(value)。
  • IF(condition, true_value, false_value):是一个标准的SQL函数
  • 我们应用的条件是: (x*size(c))%6 == 0 其中size(c)=3 ,如果这个条件为真,它将返回c[int(x*size(c)/6)],否则返回NULL。所以 x从 0 到 5,我们将有:

    ((0*3)%6)==0) true   -->  c[int(0*3/6)] = c[0]
    ((1*3)%6)==0) false  -->  NULL
    ((2*3)%6)==0) true   -->  c[int(2*3/6)] = c[1]
    ((3*3)%6)==0) false  -->  NULL
    ((4*3)%6)==0) true   -->  c[int(4*3/6)] = c[2]
    ((5*3)%6)==0) false  -->  NULL
    

类似于包含 2 元素数组的 column-e。

关于python - 如何对多列、不同类型、不同长度进行爆破?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/56931256/

相关文章:

apache-spark - 连接后如何重命名重复的列?

apache-spark - 在 PySpark 中读取文件在读取整个目录然后过滤和读取目录的一部分之间有什么区别?

python - 在pycharm中连接html文件和css文件

具有多个 'for' 子句和单个 'if' 的 python 理解

python - Pandas 注释数据框历史

csv - 不完整的 HDFS URI,没有主机

azure - Pyspark - 基于数据帧创建包含所有组合的 json 结构

python - Pyspark - 将 datetime.time 函数应用于数据帧的所有行

Python 使用 Pandas/Urllib 下载文件

python - Theano 中的切片和索引