python - PySpark 对已排序的内容进行排序

标签 python apache-spark dataframe pyspark

请帮助初学者。 用于下单的常用数据库,全部在一张表中。

使用 Python 分析 Apache Spark 中的数据。想要编写一个查询来提取按电子邮件排序的客户的所有交易,这些客户订购了现已停产的产品,并且有尚未发货的订单。基本上使用“&item_in_list(F.lit("NotShipped"), ShippedStatus)”它不起作用。

%python
import pyspark.sql.functions as F
from pyspark.sql.types import *

list_len = F.udf(lambda x: len(x), IntegerType())
item_in_list = F.udf(lambda x, y: x in y, BooleanType())
df = spark.sql("select * from orderdb")
df1 = df.select("email", "OrderedProduct","ShippedStatus").groupBy("email")
df1 = df1.agg(F.collect_set("OrderedProduct"))\
       .withColumnRenamed("collect_set(OrderedProduct)", "OrderedProduct")
df1 = df1.filter((list_len(df1.OrderedProduct) > 1) & 
               item_in_list(F.lit("DiscontinuedProduct"), OrderedProduct) 
        &item_in_list(F.lit("NotShipped"), ShippedStatus)

df1 = df1.select("email")
df = df1.join(df, "email", "left_outer")
display(df)

ID 字符串为空 日期 日期时间戳 null 订购产品字符串为空 ShippedStatus bool 值 null

最佳答案

首先,udf在pyspark中的表现非常糟糕。如果您想更改类型,请使用如下内容:

from pyspark.sql.types import IntegerType

df = df.withColumn("column", df["column"].cast(IntegerType()))

话虽这么说,我们需要一个可重现的示例,但我想您可以使用 'where' 子句来解决它。

# Your code
df1 = df1.filter((list_len(df1.OrderedProduct) > 1) & 
               item_in_list(F.lit("DiscontinuedProduct"), OrderedProduct) 
        &item_in_list(F.lit("NotShipped"), ShippedStatus)

# My code
condition1 = F.col('OrderedProduct') > 1
condition2 = F.col('ShippedStatus') == F.lit('NotShipped')
condition3 = F.col('OrderedProduct') == F.lit('DiscontinuedProduct')

df1 = df1.where(condition 1 & condition2 & condition3)

关于python - PySpark 对已排序的内容进行排序,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/53704073/

相关文章:

apache-spark - 如何在 Apache Spark/Hive 中合并 DataFrame,然后增加版本

python - 比较 Pandas Dataframe 的匹配行和列的差异

Python for 循环优化

python - 使用 lambda 作为约束函数

scala - 如何在 Spark 中将 RDD<String> 转换为 RDD<Vector>?

返回整数数组的 Java Spark UDF 给我 ClassException

python - 按数据框重新排序 pandas group

python - 将未知数量的列表打印为列

python - ValueError : not enough values to unpack (expected 3, 在 Pytorch 中得到 2)

python - 运行 celeryd 时出错