python - 将不同数据帧中的列添加到 PySpark 中的目标数据帧

标签 python apache-spark pyspark

我有一些像这样的数据框:

 rdd_1 = sc.parallelize([(0,10,"A",2), (1,20,"B",1), (2,30,"A",2)])
 rdd_2 = sc.parallelize([(0,10,223,"201601"), (0,10,83,"2016032"),(1,20,3213,"201602"),(1,20,3003,"201601"), (1,20,9872,"201603"), (2,40, 2321,"201601"), (2,30, 10,"201602"),(2,61, 2321,"201601")])
 df_tg = sqlContext.createDataFrame(rdd_1, ["id", "type", "route_a", "route_b"])
 df_data = sqlContext.createDataFrame(rdd_2, ["id", "type", "cost", "date"])

 df_tg.show()


+---+----+-------+-------+
| id|type|route_a|route_b|
+---+----+-------+-------+
|  0|10  |      A|      2|
|  1|20  |      B|      1|
|  2|30  |      A|      2|
+---+----+-------+-------+

df_data.show()   

+---+----+----+------+
| id|type|cost|  date|
+---+----+----+------+
|  0|10  | 223|201603|
|  0|10  | 83 |201602|
|  1|20  |3003|201601|
|  1|20  |3213|201602|
|  1|20  |9872|201603|
|  2|30  |  10|201602|
|  2|30  |  62|201601|
|  2|40  |2321|201601|
+---+----+----+------+

所以我需要添加这样的列:

+---+----+-------+-------+-----------+-----------+-----------+
| id|type|route_a|route_b|cost_201603|cost_201602|cost_201601|
+---+----+-------+-------+-----------+-----------+-----------+
|  0|10  |      A|      2|       223 |   83      |       None|
|  1|20  |      B|      1|      9872 |     3213  |       3003|
|  2|30  |      A|      2|      None |   10      |         62|
+---+----+-------+-------+-----------+-----------+-----------+

为此,我必须进行一些连接:

df_tg = df_tg.join(df_data[df_data.date == "201603"], ["id", "type"])

因此我也必须重命名这些列,以免覆盖它们:

df_tg = df_tg.join(df_data[df_data.date == "201603"], ["id", "type"]).withColumnRenamed("cost","cost_201603")

我可以编写一个函数来执行此操作,但我必须循环遍历可用日期和列,通过全表扫描生成大量联接:

def feature_add(df_target, df_feat, feat_cols, period):
    for ref_month in period:
        df_target = df_target.join(df_feat, ["id", "type"]).select(
                *[df_target[column] for column in df_target.columns] + [df_feat[feat_col]]
                ).withColumnRenamed(feat_col, feat_col + '_' + ref_month)
    return df_target

df_tg = feature_add(df_tg, df_data, ["cost"], ["201602", "201603", "201601"])

这可行,但很糟糕。如何添加这些列,包括当我为其他数据帧调用相同的函数时?请注意,列没有完全对齐,我需要进行内部联接。

最佳答案

我建议使用枢轴函数,如下所示:

from pyspark.sql.functions import *

rdd_1 = sc.parallelize([(0,10,"A",2), (1,20,"B",1), (2,30,"A",2)])
rdd_2 = sc.parallelize([(0,10,223,"201601"), (0,10,83,"2016032"),(1,20,3213,"201602"),(1,20,3003,"201601"), (1,20,9872,"201603"), (2,40, 2321,"201601"), (2,30, 10,"201602"),(2,61, 2321,"201601")])
df_tg = sqlContext.createDataFrame(rdd_1, ["id", "type", "route_a", "route_b"])
df_data = sqlContext.createDataFrame(rdd_2, ["id", "type", "cost", "date"])

pivot_df_data = df_data.groupBy("id","type").pivot("date").agg({"cost" : "sum"})

pivot_df_data.join(df_tg, ['id','type'], 'inner').select('id','type','route_a','route_b','201601','201602','201603','2016032').show()

# +---+----+-------+-------+------+------+------+-------+
# | id|type|route_a|route_b|201601|201602|201603|2016032|
# +---+----+-------+-------+------+------+------+-------+
# |  0|  10|      A|      2|   223|  null|  null|     83|
# |  1|  20|      B|      1|  3003|  3213|  9872|   null|
# |  2|  30|      A|      2|  null|    10|  null|   null|
# +---+----+-------+-------+------+------+------+-------+

关于python - 将不同数据帧中的列添加到 PySpark 中的目标数据帧,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/37660680/

相关文章:

python - 从 pyspark shell 中运行脚本

split - 如何将一行拆分为成对的单词而不是单个单词?

apache-spark - Pyspark - 加载文件 : Path does not exist

python - 检查符号和大写/小写以使用凯撒密码进行加密?

python - django-allauth,我怎样才能只允许通过社交注册/登录?

javascript - Google 图表刷新卡住页面/div + 内存不足

apache-spark - Spark SQL 中的 OUTER 和 FULL OUTER 之间有区别吗?

java - 集成 Spark 和 Spring Boot

python - Visual Studio Code 智能感知不显示修饰函数的参数

apache-spark - 如何在 pyspark 中合并具有条件的两列?