dataframe - 如何按列值在pyspark df中添加更多行

标签 dataframe apache-spark pyspark user-defined-functions flatmap

我被这个问题困扰了很长一段时间,可能使它变得比实际情况更大。我会尽量简化它。

我在我的代码中使用了 pyspark 和数据框函数。

我已经有一个 df 了:

+--+-----+---------+
|id|col1 |col2     |
+--+-----+---------+
|1 |Hello|Repeat   |
|2 |Word |Repeat   |
|3 |Aux  |No repeat|
|4 |Test |Repeat   |
+--+-----+---------+

我想要实现的是在 col2 为“重复”时重复 df 的行,增加 col1 在 value+1 中的值。

+--+-----+---------+------+
|id|col1 |col2     |col3  |
+--+-----+---------+------+
|1 |Hello|Repeat   |Hello1|
|1 |Hello|Repeat   |Hello2|
|1 |Hello|Repeat   |Hello3|
|2 |Word |Repeat   |Word1 |
|2 |Word |Repeat   |Word2 |
|2 |Word |Repeat   |Word3 |
|3 |Aux  |No repeat|Aux   |
|4 |Test |Repeat   |Test1 |
|4 |Test |Repeat   |Test2 |
|4 |Test |Repeat   |Test3 |
+--+-----+---------+------+

我的第一种方法是在 udf 的帮助下使用 withColumn 运算符创建一个新列:

my_func = udf(lambda words: (words + str(i + 1 for i in range(3))), StringType())
df = df\
    .withColumn('col3', when(col('col2') == 'No Repeat', col('col1'))
                            .otherwise(my_func(col('col1'))))

但是当我在 df.show(10,False) 中评估它时,它抛出了一个错误。我的猜测是因为我无法以这种方式使用 withColumn 函数创建更多行。

所以我决定采用另一种方法,但也没有成功。使用 rdd.flatMap:

test = df.rdd.flatMap(lambda row: (row if (row.col2== 'No Repeat') else (row.col1 + str(i+1) for i in range(3))))
print(test.collect())

但在这里我丢失了 df 模式 并且我不能在 else 条件下抛出整行,它只抛出 col1 单词加上它的迭代器 .

你知道解决这个问题的正确方法吗?

最后,我的问题是我没有找到一种正确的方法来根据列值创建更多行,因为我对这个世界还很陌生。我发现的答案似乎也不适合这个问题。

我们将不胜感激。

最佳答案

一种方法是使用条件并分配一个数组,然后展开,

import pyspark.sql.functions as F

(df.withColumn("test",F.when(df['col2']=='Repeat',
       F.array([F.lit(str(i)) for i in range(1,4)])).otherwise(F.array(F.lit(''))))
  .withColumn("col3",F.explode(F.col("test"))).drop("test")
  .withColumn("col3",F.concat(F.col("col1"),F.col("col3")))).show()

@MohammadMurtazaHashmi 建议的更整洁的版本如下所示:

(df.withColumn("test",F.when(df['col2']=='Repeat',
     F.array([F.concat(F.col("col1"),F.lit(str(i))) for i in range(1,4)]))
    .otherwise(F.array(F.col("col1"))))
    .select("id","col1","col2", F.explode("test"))).show()

+---+-----+---------+------+
| id| col1|     col2|  col3|
+---+-----+---------+------+
|  1|Hello|   Repeat|Hello1|
|  1|Hello|   Repeat|Hello2|
|  1|Hello|   Repeat|Hello3|
|  2| Word|   Repeat| Word1|
|  2| Word|   Repeat| Word2|
|  2| Word|   Repeat| Word3|
|  3|  Aux|No repeat|   Aux|
|  4| Test|   Repeat| Test1|
|  4| Test|   Repeat| Test2|
|  4| Test|   Repeat| Test3|
+---+-----+---------+------+

关于dataframe - 如何按列值在pyspark df中添加更多行,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/61248099/

相关文章:

apache-spark - 如何使用pyspark将数据帧保存在 ".txt"文件中

scala - 添加包含按 df 分组的列数 og 的列

python - Pyspark:解析一列 json 字符串

python - 限制每个唯一 pyspark 数据帧列值返回的行,无需循环

python - pandas.DataFrame 中重复列的有趣结果

python - 如果 dtype 是类别(MemoryError),则 pivot_table 需要更多内存

python - 如何将新列添加到 pandas df 中,该列从另一个数据框中返回同一组中较大的最小值

python - pandas 在每个组中找到满足特定条件的行的索引并为这些行分配值

python - 在 Spark 中应用具有非恒定帧大小的窗口函数

python - 如何在 PySpark/Python 中有效地将数组转换为字符串