apache-spark - 通过基于条件连接另一列的值来创建新的 pyspark DataFrame 列

标签 apache-spark pyspark apache-spark-sql

我在 pyspark 中有一个数据框,如下所示

df.show()

+-------+--------------------+--------------------+
| Dev_No|               model|              Tested|
+-------+--------------------+--------------------+
|BTA16C5|          Windows PC|                   N|
|BTA16C5|                 SRL|                   N|
|BTA16C5|     Hewlett Packard|                   N|
|CTA16C5|     Android Devices|                   Y|
|CTA16C5|     Hewlett Packard|                   N|
|4MY16A5|               Other|                   N|
|4MY16A5|               Other|                   N|
|4MY16A5|              Tablet|                   Y|
|4MY16A5|               Other|                   N|
|4MY16A5|           Cable STB|                   Y|
|4MY16A5|               Other|                   N|
|4MY16A5|          Windows PC|                   Y|
|4MY16A5|          Windows PC|                   Y|
|4MY16A5|         Smart Watch|                   Y|
+-------+--------------------+--------------------+

现在使用上面的数据框,我想使用名为 Tested_devicesnewcolumn 创建下面的数据框,并用每个 Dev_No< 的值填充该列 选择 model,其中 TestedY 并以逗号分隔填充所有值。

df1.show()

+-------+--------------------+--------------------+------------------------------------------------------+
| Dev_No|               model|              Tested|                                        Tested_devices|
+-------+--------------------+--------------------+------------------------------------------------------+
|BTA16C5|          Windows PC|                   N|                                                      |
|BTA16C5|                 SRL|                   N|                                                      |  
|BTA16C5|     Hewlett Packard|                   N|                                                      |
|CTA16C5|     Android Devices|                   Y|                                       Android Devices|
|CTA16C5|     Hewlett Packard|                   N|                                                      |      
|4MY16A5|               Other|                   N|                                                      |
|4MY16A5|               Other|                   N|                                                      |
|4MY16A5|              Tablet|                   Y| Tablet, Cable STB,Windows PC, Windows PC, Smart Watch| 
|4MY16A5|               Other|                   N|                                                      |
|4MY16A5|           Cable STB|                   Y| Tablet, Cable STB,Windows PC, Windows PC, Smart Watch|
|4MY16A5|               Other|                   N|                                                      |
|4MY16A5|          Windows PC|                   Y| Tablet, Cable STB,Windows PC, Windows PC, Smart Watch|
|4MY16A5|          Windows PC|                   Y| Tablet, Cable STB,Windows PC, Windows PC, Smart Watch|
|4MY16A5|         Smart Watch|                   Y| Tablet, Cable STB,Windows PC, Windows PC, Smart Watch|
+-------+--------------------+--------------------+------------------------------------------------------+

我尝试了类似下面的方法来选择Dev_Nomodel,其中TestedY

a = df.select("Dev_No", "model"), when(df.Tested == 'Y')

我无法得到结果。它给了我以下错误

TypeError: when() takes exactly 2 arguments (1 given)

怎样才能实现我想要的

最佳答案

更新

对于 Spark 1.6,您将需要一种替代方法。在不使用 udf 或任何 Window 函数的情况下执行此操作的一种方法是使用收集的值创建第二个临时 DataFrame,然后将其连接回原始 DataFrame。

首先按两者Dev_NoTested进行分组,并使用concat_wscollect_list进行聚合>。聚合后,仅过滤测试设备的 DataFrame。

import pyspark.sql.functions as f

# create temporary DataFrame
df2 = df.groupBy('Dev_No', 'Tested')\
    .agg(f.concat_ws(", ", f.collect_list('model')).alias('Tested_devices'))\
    .where(f.col('Tested') == 'Y')

df2.show(truncate=False)
#+-------+------+------------------------------------------------------+
#|Dev_No |Tested|Tested_devices                                        |
#+-------+------+------------------------------------------------------+
#|CTA16C5|Y     |Android Devices                                       |
#|4MY16A5|Y     |Tablet, Cable STB, Windows PC, Windows PC, Smart Watch|
#+-------+------+------------------------------------------------------+

现在使用 Dev_NoTested 列作为连接,将 dfdf2 进行左连接键:

df.join(df2, on=['Dev_No', 'Tested'], how='left')\
    .select('Dev_No', 'model', 'Tested', 'Tested_devices')\
    .show(truncate=False)

最后使用 select 的目的是为了显示目的而以与原始 DataFrame 相同的顺序获取列 - 如果您选择,可以删除此步骤。

这将产生以下输出(与下面的输出相同(使用 concat_ws):

#+-------+---------------+------+------------------------------------------------------+
#|Dev_No |model          |Tested|Tested_devices                                        |
#+-------+---------------+------+------------------------------------------------------+
#|4MY16A5|Other          |N     |null                                                  |
#|4MY16A5|Other          |N     |null                                                  |
#|4MY16A5|Other          |N     |null                                                  |
#|4MY16A5|Other          |N     |null                                                  |
#|CTA16C5|Hewlett Packard|N     |null                                                  |
#|BTA16C5|Windows PC     |N     |null                                                  |
#|BTA16C5|SRL            |N     |null                                                  |
#|BTA16C5|Hewlett Packard|N     |null                                                  |
#|CTA16C5|Android Devices|Y     |Android Devices                                       |
#|4MY16A5|Tablet         |Y     |Tablet, Cable STB, Windows PC, Windows PC, Smart Watch|
#|4MY16A5|Cable STB      |Y     |Tablet, Cable STB, Windows PC, Windows PC, Smart Watch|
#|4MY16A5|Windows PC     |Y     |Tablet, Cable STB, Windows PC, Windows PC, Smart Watch|
#|4MY16A5|Windows PC     |Y     |Tablet, Cable STB, Windows PC, Windows PC, Smart Watch|
#|4MY16A5|Smart Watch    |Y     |Tablet, Cable STB, Windows PC, Windows PC, Smart Watch|
#+-------+---------------+------+------------------------------------------------------+

原始答案:(适用于 Spark 的更高版本)

您可以通过使用两个 pyspark.sql.functions.when() 来实现此目的语句 - 其中之一位于对 pyspark.sql.functions.collect_list() 的调用中超过Window ,利用默认 nulldoes not get added to the list 的事实:

from pyspark.sql import Window
import pyspark.sql.functions as f

df.select(
    "*",
    f.when(
        f.col("Tested") == "Y",
        f.collect_list(
            f.when(
                f.col("Tested") == "Y",
                f.col('model')
            )
        ).over(Window.partitionBy("Dev_No"))
    ).alias("Tested_devices")
).show(truncate=False)
#+-------+---------------+------+--------------------------------------------------------+
#|Dev_No |model          |Tested|Tested_devices                                          |
#+-------+---------------+------+--------------------------------------------------------+
#|BTA16C5|Windows PC     |N     |null                                                    |
#|BTA16C5|SRL            |N     |null                                                    |
#|BTA16C5|Hewlett Packard|N     |null                                                    |
#|4MY16A5|Other          |N     |null                                                    |
#|4MY16A5|Other          |N     |null                                                    |
#|4MY16A5|Tablet         |Y     |[Tablet, Cable STB, Windows PC, Windows PC, Smart Watch]|
#|4MY16A5|Other          |N     |null                                                    |
#|4MY16A5|Cable STB      |Y     |[Tablet, Cable STB, Windows PC, Windows PC, Smart Watch]|
#|4MY16A5|Other          |N     |null                                                    |
#|4MY16A5|Windows PC     |Y     |[Tablet, Cable STB, Windows PC, Windows PC, Smart Watch]|
#|4MY16A5|Windows PC     |Y     |[Tablet, Cable STB, Windows PC, Windows PC, Smart Watch]|
#|4MY16A5|Smart Watch    |Y     |[Tablet, Cable STB, Windows PC, Windows PC, Smart Watch]|
#|CTA16C5|Android Devices|Y     |[Android Devices]                                       |
#|CTA16C5|Hewlett Packard|N     |null                                                    |
#+-------+---------------+------+--------------------------------------------------------+

如果您希望输出与问题中显示的完全相同 - 作为逗号分隔值的字符串而不是列表和空字符串而不是 null - 您可以稍微修改一下,如下所示:

使用pyspark.sql.functions.concat_wscollect_list 的输出连接成一个字符串。我使用 ", " 作为分隔符。这相当于在 python 中执行 ", ".join(some_list) 。接下来,我们将 .otherwise(f.lit("")) 添加到外部 when() 调用的末尾,以指定我们要返回文字空如果条件为 False,则为字符串。

df.select(
    "*",
    f.when(
        f.col("Tested") == "Y",
        f.concat_ws(
            ", ",
            f.collect_list(
                f.when(
                    f.col("Tested") == "Y",
                    f.col('model')
                )
            ).over(Window.partitionBy("Dev_No"))
        )
    ).otherwise(f.lit("")).alias("Tested_devices")
).show(truncate=False)
#+-------+---------------+------+------------------------------------------------------+
#|Dev_No |model          |Tested|Tested_devices                                        |
#+-------+---------------+------+------------------------------------------------------+
#|BTA16C5|Windows PC     |N     |                                                      |
#|BTA16C5|SRL            |N     |                                                      |
#|BTA16C5|Hewlett Packard|N     |                                                      |
#|4MY16A5|Other          |N     |                                                      |
#|4MY16A5|Other          |N     |                                                      |
#|4MY16A5|Tablet         |Y     |Tablet, Cable STB, Windows PC, Windows PC, Smart Watch|
#|4MY16A5|Other          |N     |                                                      |
#|4MY16A5|Cable STB      |Y     |Tablet, Cable STB, Windows PC, Windows PC, Smart Watch|
#|4MY16A5|Other          |N     |                                                      |
#|4MY16A5|Windows PC     |Y     |Tablet, Cable STB, Windows PC, Windows PC, Smart Watch|
#|4MY16A5|Windows PC     |Y     |Tablet, Cable STB, Windows PC, Windows PC, Smart Watch|
#|4MY16A5|Smart Watch    |Y     |Tablet, Cable STB, Windows PC, Windows PC, Smart Watch|
#|CTA16C5|Android Devices|Y     |Android Devices                                       |
#|CTA16C5|Hewlett Packard|N     |                                                      |
#+-------+---------------+------+------------------------------------------------------+

使用pyspark-sql语法,上面的第一个示例相当于:

df.registerTempTable("df")
query = """
 SELECT *, 
        CASE 
          WHEN Tested = 'Y' 
          THEN COLLECT_LIST(
            CASE 
              WHEN Tested = 'Y' 
              THEN model
            END
          ) OVER (PARTITION BY Dev_No) 
        END AS Tested_devices
   FROM df
"""
sqlCtx.sql(query).show(truncate=False)

关于apache-spark - 通过基于条件连接另一列的值来创建新的 pyspark DataFrame 列,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/50260067/

相关文章:

python - 不要在 Spark (Python) 中写入 None 或空行

python - Spark 中的分组线性回归

apache-spark - spark 不使用合并模式从不同文件夹读取所有 orc 文件

apache-spark - 计算pyspark数据框中的地理距离

scala - spark 将函数应用于并行列

r - 如何将 SparkR 数据框中的整数列转换为字符串?

apache-spark - 分析异常 : u'Cannot resolve column name

python - Spark中groupBy的使用

sql - scala中的动态where条件生成

java - 从 Java 中的 Spark 数据集中获取唯一单词