python-3.x - 迭代列 PySpark

标签 python-3.x pyspark apache-spark-sql

我有一个包含 40 列的 SQL 表:ID、Product、Product_ID、Date 等,并且希望迭代所有列以获得不同的值。

客户表(示例):

ID Product 
1  gadget
2  VR
2  AR
3  hi-fi

我尝试在循环所有列的函数中使用dropDuplicates,但结果输出只是每列吐出一个不同的值,而不是所有可能的不同值。

预期结果:

Column    Value
ID        1 
ID        2
ID        3
Product   gadget
Product   VR
Product   AR
Product   hi-fi

实际结果:

Column    Value
ID        1 
Product   gadget

最佳答案

这个想法是使用collect_set()获取列中的不同元素,然后分解数据框。

#All columns which need to be aggregated should be added here in col_list.
col_list = ['ID','Product']
exprs = [collect_set(x) for x in col_list]

让我们开始聚合。

from pyspark.sql.functions import lit , collect_set, explode, array, struct, col, substring, length, expr
df = spark.createDataFrame([(1,'gadget'),(2,'VR'),(2,'AR'),(3,'hi-fi')], schema = ['ID','Product'])

df = df.withColumn('Dummy',lit('Dummy'))

#While exploding later, the datatypes must be the same, so we have to cast ID as a String.
df = df.withColumn('ID',col('ID').cast('string'))

#Creating the list of distinct values.
df = df.groupby("Dummy").agg(*exprs)
df.show(truncate=False)
+-----+---------------+-----------------------+
|Dummy|collect_set(ID)|collect_set(Product)   |
+-----+---------------+-----------------------+
|Dummy|[3, 1, 2]      |[AR, VR, hi-fi, gadget]|
+-----+---------------+-----------------------+

def to_transpose(df, by):

    # Filter dtypes and split into column names and type description
    cols, dtypes = zip(*((c, t) for (c, t) in df.dtypes if c not in by))
    # Spark SQL supports only homogeneous columns
    assert len(set(dtypes)) == 1, "All columns have to be of the same type"

    # Create and explode an array of (column_name, column_value) structs
    kvs = explode(array([
      struct(lit(c).alias("key"), col(c).alias("val")) for c in cols
    ])).alias("kvs")

    return df.select(by + [kvs]).select(by + ["kvs.key", "kvs.val"])

df = to_transpose(df, ['Dummy']).drop('Dummy')
df.show()
+--------------------+--------------------+
|                 key|                 val|
+--------------------+--------------------+
|     collect_set(ID)|           [3, 1, 2]|
|collect_set(Product)|[AR, VR, hi-fi, g...|
+--------------------+--------------------+

df = df.withColumn('val', explode(col('val')))
df = df.withColumnRenamed('key', 'Column').withColumnRenamed('val', 'Value')
df = df.withColumn('Column', expr("substring(Column,13,length(Column)-13)"))
df.show()
+-------+------+
| Column| Value|
+-------+------+
|     ID|     3|
|     ID|     1|
|     ID|     2|
|Product|    AR|
|Product|    VR|
|Product| hi-fi|
|Product|gadget|
+-------+------+

注意:所有非字符串的列都应转换为字符串,如 df = df.withColumn('ID',col('ID').cast('string '))。否则,您将收到错误。

关于python-3.x - 迭代列 PySpark,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/54613692/

相关文章:

python - 使用 numpy 在网格中显示图像的更惯用方式

python - 删除引号python外的字符

python - 在 pyspark 中对列表中的不同数据框列求和的正确方法是什么?

python - 如何优化 Spark SQL 中的非等值连接?

python - 如何在python中导入google的taskqueue库

python - 如何修复 suggest_users() 中的 Tweepy 错误 'Sorry, that page does not exist.'

python - PySpark 将 IntegerTypes 转换为 ByteType 以进行优化

python - 检查 RDD 中是否存在值

python - PySpark:在窗口上加盐并倾斜的 CumSum

apache-spark - 如何合并 Spark SQL 查询的结果以避免出现大量小文件/避免空文件