我在 pyspark
中有一个数据框,如下所示
df.show()
+-------+--------------------+--------------------+
| Dev_No| model| Tested|
+-------+--------------------+--------------------+
|BTA16C5| Windows PC| N|
|BTA16C5| SRL| N|
|BTA16C5| Hewlett Packard| N|
|CTA16C5| Android Devices| Y|
|CTA16C5| Hewlett Packard| N|
|4MY16A5| Other| N|
|4MY16A5| Other| N|
|4MY16A5| Tablet| Y|
|4MY16A5| Other| N|
|4MY16A5| Cable STB| Y|
|4MY16A5| Other| N|
|4MY16A5| Windows PC| Y|
|4MY16A5| Windows PC| Y|
|4MY16A5| Smart Watch| Y|
+-------+--------------------+--------------------+
现在使用上面的数据框,我想使用名为 Tested_devices
的 newcolumn
创建下面的数据框,并用每个 Dev_No< 的值填充该列
选择 model
,其中 Tested
为 Y
并以逗号分隔填充所有值。
df1.show()
+-------+--------------------+--------------------+------------------------------------------------------+
| Dev_No| model| Tested| Tested_devices|
+-------+--------------------+--------------------+------------------------------------------------------+
|BTA16C5| Windows PC| N| |
|BTA16C5| SRL| N| |
|BTA16C5| Hewlett Packard| N| |
|CTA16C5| Android Devices| Y| Android Devices|
|CTA16C5| Hewlett Packard| N| |
|4MY16A5| Other| N| |
|4MY16A5| Other| N| |
|4MY16A5| Tablet| Y| Tablet, Cable STB,Windows PC, Windows PC, Smart Watch|
|4MY16A5| Other| N| |
|4MY16A5| Cable STB| Y| Tablet, Cable STB,Windows PC, Windows PC, Smart Watch|
|4MY16A5| Other| N| |
|4MY16A5| Windows PC| Y| Tablet, Cable STB,Windows PC, Windows PC, Smart Watch|
|4MY16A5| Windows PC| Y| Tablet, Cable STB,Windows PC, Windows PC, Smart Watch|
|4MY16A5| Smart Watch| Y| Tablet, Cable STB,Windows PC, Windows PC, Smart Watch|
+-------+--------------------+--------------------+------------------------------------------------------+
我尝试了类似下面的方法来选择Dev_No
和model
,其中Tested
是Y
a = df.select("Dev_No", "model"), when(df.Tested == 'Y')
我无法得到结果。它给了我以下错误
TypeError: when() takes exactly 2 arguments (1 given)
怎样才能实现我想要的
最佳答案
更新
对于 Spark 1.6,您将需要一种替代方法。在不使用 udf 或任何 Window 函数的情况下执行此操作的一种方法是使用收集的值创建第二个临时 DataFrame,然后将其连接回原始 DataFrame。
首先按两者Dev_No
和Tested
进行分组,并使用concat_ws
和collect_list
进行聚合>。聚合后,仅过滤测试设备的 DataFrame。
import pyspark.sql.functions as f
# create temporary DataFrame
df2 = df.groupBy('Dev_No', 'Tested')\
.agg(f.concat_ws(", ", f.collect_list('model')).alias('Tested_devices'))\
.where(f.col('Tested') == 'Y')
df2.show(truncate=False)
#+-------+------+------------------------------------------------------+
#|Dev_No |Tested|Tested_devices |
#+-------+------+------------------------------------------------------+
#|CTA16C5|Y |Android Devices |
#|4MY16A5|Y |Tablet, Cable STB, Windows PC, Windows PC, Smart Watch|
#+-------+------+------------------------------------------------------+
现在使用 Dev_No
和 Tested
列作为连接,将 df
与 df2
进行左连接键:
df.join(df2, on=['Dev_No', 'Tested'], how='left')\
.select('Dev_No', 'model', 'Tested', 'Tested_devices')\
.show(truncate=False)
最后使用 select
的目的是为了显示目的而以与原始 DataFrame 相同的顺序获取列 - 如果您选择,可以删除此步骤。
这将产生以下输出(与下面的输出相同(使用 concat_ws
):
#+-------+---------------+------+------------------------------------------------------+
#|Dev_No |model |Tested|Tested_devices |
#+-------+---------------+------+------------------------------------------------------+
#|4MY16A5|Other |N |null |
#|4MY16A5|Other |N |null |
#|4MY16A5|Other |N |null |
#|4MY16A5|Other |N |null |
#|CTA16C5|Hewlett Packard|N |null |
#|BTA16C5|Windows PC |N |null |
#|BTA16C5|SRL |N |null |
#|BTA16C5|Hewlett Packard|N |null |
#|CTA16C5|Android Devices|Y |Android Devices |
#|4MY16A5|Tablet |Y |Tablet, Cable STB, Windows PC, Windows PC, Smart Watch|
#|4MY16A5|Cable STB |Y |Tablet, Cable STB, Windows PC, Windows PC, Smart Watch|
#|4MY16A5|Windows PC |Y |Tablet, Cable STB, Windows PC, Windows PC, Smart Watch|
#|4MY16A5|Windows PC |Y |Tablet, Cable STB, Windows PC, Windows PC, Smart Watch|
#|4MY16A5|Smart Watch |Y |Tablet, Cable STB, Windows PC, Windows PC, Smart Watch|
#+-------+---------------+------+------------------------------------------------------+
原始答案:(适用于 Spark 的更高版本)
您可以通过使用两个 pyspark.sql.functions.when()
来实现此目的语句 - 其中之一位于对 pyspark.sql.functions.collect_list()
的调用中超过Window
,利用默认 null
值 does not get added to the list 的事实:
from pyspark.sql import Window
import pyspark.sql.functions as f
df.select(
"*",
f.when(
f.col("Tested") == "Y",
f.collect_list(
f.when(
f.col("Tested") == "Y",
f.col('model')
)
).over(Window.partitionBy("Dev_No"))
).alias("Tested_devices")
).show(truncate=False)
#+-------+---------------+------+--------------------------------------------------------+
#|Dev_No |model |Tested|Tested_devices |
#+-------+---------------+------+--------------------------------------------------------+
#|BTA16C5|Windows PC |N |null |
#|BTA16C5|SRL |N |null |
#|BTA16C5|Hewlett Packard|N |null |
#|4MY16A5|Other |N |null |
#|4MY16A5|Other |N |null |
#|4MY16A5|Tablet |Y |[Tablet, Cable STB, Windows PC, Windows PC, Smart Watch]|
#|4MY16A5|Other |N |null |
#|4MY16A5|Cable STB |Y |[Tablet, Cable STB, Windows PC, Windows PC, Smart Watch]|
#|4MY16A5|Other |N |null |
#|4MY16A5|Windows PC |Y |[Tablet, Cable STB, Windows PC, Windows PC, Smart Watch]|
#|4MY16A5|Windows PC |Y |[Tablet, Cable STB, Windows PC, Windows PC, Smart Watch]|
#|4MY16A5|Smart Watch |Y |[Tablet, Cable STB, Windows PC, Windows PC, Smart Watch]|
#|CTA16C5|Android Devices|Y |[Android Devices] |
#|CTA16C5|Hewlett Packard|N |null |
#+-------+---------------+------+--------------------------------------------------------+
如果您希望输出与问题中显示的完全相同 - 作为逗号分隔值的字符串而不是列表和空字符串而不是 null
- 您可以稍微修改一下,如下所示:
使用pyspark.sql.functions.concat_ws
将collect_list
的输出连接成一个字符串。我使用 ", "
作为分隔符。这相当于在 python 中执行 ", ".join(some_list)
。接下来,我们将 .otherwise(f.lit(""))
添加到外部 when()
调用的末尾,以指定我们要返回文字空如果条件为 False
,则为字符串。
df.select(
"*",
f.when(
f.col("Tested") == "Y",
f.concat_ws(
", ",
f.collect_list(
f.when(
f.col("Tested") == "Y",
f.col('model')
)
).over(Window.partitionBy("Dev_No"))
)
).otherwise(f.lit("")).alias("Tested_devices")
).show(truncate=False)
#+-------+---------------+------+------------------------------------------------------+
#|Dev_No |model |Tested|Tested_devices |
#+-------+---------------+------+------------------------------------------------------+
#|BTA16C5|Windows PC |N | |
#|BTA16C5|SRL |N | |
#|BTA16C5|Hewlett Packard|N | |
#|4MY16A5|Other |N | |
#|4MY16A5|Other |N | |
#|4MY16A5|Tablet |Y |Tablet, Cable STB, Windows PC, Windows PC, Smart Watch|
#|4MY16A5|Other |N | |
#|4MY16A5|Cable STB |Y |Tablet, Cable STB, Windows PC, Windows PC, Smart Watch|
#|4MY16A5|Other |N | |
#|4MY16A5|Windows PC |Y |Tablet, Cable STB, Windows PC, Windows PC, Smart Watch|
#|4MY16A5|Windows PC |Y |Tablet, Cable STB, Windows PC, Windows PC, Smart Watch|
#|4MY16A5|Smart Watch |Y |Tablet, Cable STB, Windows PC, Windows PC, Smart Watch|
#|CTA16C5|Android Devices|Y |Android Devices |
#|CTA16C5|Hewlett Packard|N | |
#+-------+---------------+------+------------------------------------------------------+
使用pyspark-sql
语法,上面的第一个示例相当于:
df.registerTempTable("df")
query = """
SELECT *,
CASE
WHEN Tested = 'Y'
THEN COLLECT_LIST(
CASE
WHEN Tested = 'Y'
THEN model
END
) OVER (PARTITION BY Dev_No)
END AS Tested_devices
FROM df
"""
sqlCtx.sql(query).show(truncate=False)
关于apache-spark - 通过基于条件连接另一列的值来创建新的 pyspark DataFrame 列,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/50260067/