我有一个包含不同时间周期(1/6、3/6、6/6 等)列的 DF,我想“分解”所有列以创建一个新的 DF,其中每一行都是1/6 周期。
from pyspark import Row
from pyspark.sql import SparkSession
from pyspark.sql.functions import explode, arrays_zip, col
spark = SparkSession.builder \
.appName('DataFrame') \
.master('local[*]') \
.getOrCreate()
df = spark.createDataFrame([Row(a=1, b=[1, 2, 3, 4, 5, 6], c=[11, 22, 33], d=['foo'])])
| a| b| c| d|
+---+------------------+------------+-----+
| 1|[1, 2, 3, 4, 5, 6]|[11, 22, 33]|[foo]|
+---+------------------+------------+-----+
我在做爆炸:
df2 = (df.withColumn("tmp", arrays_zip("b", "c", "d"))
.withColumn("tmp", explode("tmp"))
.select("a", col("tmp.b"), col("tmp.c"), "d"))
但输出不是我想要的:
| a| b| c| d|
+---+---+----+-----+
| 1| 1| 11|[foo]|
| 1| 2| 22|[foo]|
| 1| 3| 33|[foo]|
| 1| 4|null|[foo]|
| 1| 5|null|[foo]|
| 1| 6|null|[foo]|
+---+---+----+-----+
我希望它看起来像这样:
| a| b| c| d|
+---+---+---+---+
| 1| 1| 11|foo|
| | 2| | |
| | 3| 22| |
| | 4| | |
| | 5| 33| |
| | 6| | |
+---+---+---+---+
我是 Spark 的新手,从一开始我就遇到了复杂的主题! :)
2019-07-15 更新:也许有人有不使用 UDF 的解决方案? -> @jxc 回答
2019-07-17 更新:也许有人有解决方案如何以更复杂的顺序更改空 <-> 值序列?就像在 c
- Null, 11, Null, 22, Null, 33
或更复杂的情况下,我们希望 d
列中的第一个值是Null
,下一个 foo
然后是 Null, Null, Null
:
| a| b| c| d|
+---+---+---+---+
| 1| 1| | |
| | 2| 11|foo|
| | 3| | |
| | 4| 22| |
| | 5| | |
| | 6| 33| |
+---+---+---+---+
最佳答案
这是一种不使用 udf 的方法:
更新于 2019/07/17: 调整了 SQL stmt 并将 N=6 作为参数添加到 SQL。
2019 年 7 月 16 日更新: 删除了临时列 t
, 替换为常量 array(0,1,2,3,4,5)
在转换函数中。在这种情况下,我们可以直接对数组元素的值进行操作,而不是对它们的索引进行操作。
更新:我删除了原来使用String函数并将数组元素中的数据类型全部转换为String且效率较低的方法。 Spark 2.4+ 的 Spark SQL 高阶函数应该比原来的方法更好。
设置
from pyspark.sql import functions as F, Row
df = spark.createDataFrame([ Row(a=1, b=[1, 2, 3, 4, 5, 6], c=['11', '22', '33'], d=['foo'], e=[111,222]) ])
>>> df.show()
+---+------------------+------------+-----+----------+
| a| b| c| d| e|
+---+------------------+------------+-----+----------+
| 1|[1, 2, 3, 4, 5, 6]|[11, 22, 33]|[foo]|[111, 222]|
+---+------------------+------------+-----+----------+
# columns you want to do array-explode
cols = df.columns
# number of array elements to set
N = 6
使用 SQL 高阶函数:transform
使用Spark SQL高阶函数:transform(),执行如下操作:
创建以下 Spark SQL 代码,其中
{0}
将被 column_name 替换,{1}
将替换为N
:stmt = ''' CASE WHEN '{0}' in ('d') THEN transform(sequence(0,{1}-1), x -> IF(x == 1, `{0}`[0], NULL)) WHEN size(`{0}`) <= {1}/2 AND size(`{0}`) > 1 THEN transform(sequence(0,{1}-1), x -> IF(((x+1)*size(`{0}`))%{1} == 0, `{0}`[int((x-1)*size(`{0}`)/{1})], NULL)) ELSE `{0}` END AS `{0}` '''
注意: 数组转换仅在数组包含多个(除非在单独的
WHEN
子句中指定)和<= N/2
时定义 元素(在本例中为1 < size <= 3
)。其他大小的数组将保持原样。使用 selectExpr() 为所有需要的列运行上面的 SQL
df1 = df.withColumn('a', F.array('a')) \ .selectExpr(*[ stmt.format(c,N) for c in cols ]) >>> df1.show() +---+------------------+----------------+-----------+---------------+ | a| b| c| d| e| +---+------------------+----------------+-----------+---------------+ |[1]|[1, 2, 3, 4, 5, 6]|[, 11,, 22,, 33]|[, foo,,,,]|[,, 111,,, 222]| +---+------------------+----------------+-----------+---------------+
运行arrays_zip 和分解:
df_new = df1.withColumn('vals', F.explode(F.arrays_zip(*cols))) \ .select('vals.*') \ .fillna('', subset=cols) >>> df_new.show() +----+---+---+---+----+ | a| b| c| d| e| +----+---+---+---+----+ | 1| 1| | |null| |null| 2| 11|foo|null| |null| 3| | | 111| |null| 4| 22| |null| |null| 5| | |null| |null| 6| 33| | 222| +----+---+---+---+----+
注意:
fillna('', subset=cols)
仅更改包含字符串的列
在一个方法链中:
df_new = df.withColumn('a', F.array('a')) \
.selectExpr(*[ stmt.format(c,N) for c in cols ]) \
.withColumn('vals', F.explode(F.arrays_zip(*cols))) \
.select('vals.*') \
.fillna('', subset=cols)
转换函数的解释:
转换函数(下面列出,反射(reflect)了需求的旧版本)
transform(sequence(0,5), x -> IF((x*size({0}))%6 == 0, {0}[int(x*size({0})/6)], NULL))
如帖子中所述, {0}
将替换为列名。这里我们使用column- c
以包含 3 个元素为例:
- 在转换函数中,
sequence(0,5)
创建常量数组array(0,1,2,3,4,5)
有 6 个元素,其余的用一个参数设置 lambda 函数x
具有元素的值(value)。 - IF(condition, true_value, false_value):是一个标准的SQL函数
我们应用的条件是:
(x*size(c))%6 == 0
其中size(c)=3
,如果这个条件为真,它将返回c[int(x*size(c)/6)],否则返回NULL。所以x
从 0 到 5,我们将有:((0*3)%6)==0) true --> c[int(0*3/6)] = c[0] ((1*3)%6)==0) false --> NULL ((2*3)%6)==0) true --> c[int(2*3/6)] = c[1] ((3*3)%6)==0) false --> NULL ((4*3)%6)==0) true --> c[int(4*3/6)] = c[2] ((5*3)%6)==0) false --> NULL
类似于包含 2 元素数组的 column-e。
关于python - 如何对多列、不同类型、不同长度进行爆破?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/56931256/