python - 在pyspark中加入具有相同列名的数据框

标签 python apache-spark pyspark apache-spark-sql apache-spark-2.0

我有两个数据框,它们是从两个 csv 文件中读取的。

+---+----------+-----------------+
| ID|  NUMBER  |  RECHARGE_AMOUNT|
+---+----------+-----------------+
|  1|9090909092|               30|
|  2|9090909093|               30|
|  3|9090909090|               30|
|  4|9090909094|               30|
+---+----------+-----------------+

+---+----------+-----------------+
| ID|  NUMBER  |  RECHARGE_AMOUNT|
+---+----------+-----------------+
|  1|9090909092|               40|
|  2|9090909093|               50|
|  3|9090909090|               60|
|  4|9090909094|               70|
+---+----------+-----------------+

我正在尝试使用 pyspark 代码 dfFinal = dfFinal.join(df2, on=['NUMBER'], how='inner') 和 new 使用 NUMBER coumn 连接这两个数据数据帧生成如下。

+----------+---+-----------------+---+-----------------+
|  NUMBER  | ID|  RECHARGE_AMOUNT| ID|  RECHARGE_AMOUNT|
+----------+---+-----------------+---+-----------------+
|9090909092|  1|               30|  1|               40|
|9090909093|  2|               30|  2|               50|
|9090909090|  3|               30|  3|               60|
|9090909094|  4|               30|  4|               70|
+----------+---+-----------------+---+-----------------+

但是我无法将此数据帧写入文件,因为加入后的数据帧具有重复的列。我正在使用以下代码。 dfFinal.coalesce(1).write.format('com.databricks.spark.csv').save('/home/user/output',header = 'true') 有什么办法吗以避免加入 Spark 后重复列。下面给出的是我的 pyspark 代码。

from pyspark.sql import SparkSession
from pyspark.sql.functions import col
spark = SparkSession.builder.appName("test1").getOrCreate()
files = ["/home/user/test1.txt", "/home/user/test2.txt"]
dfFinal = spark.read.load(files[0],format="csv", sep=",", inferSchema="false", header="true", mode="DROPMALFORMED")
dfFinal.show()
for i in range(1,len(files)):
    df2 = spark.read.load(files[i],format="csv", sep=",", inferSchema="false", header="true", mode="DROPMALFORMED")
    df2.show()
    dfFinal = dfFinal.join(df2, on=['NUMBER'], how='inner')
dfFinal.show()
dfFinal.coalesce(1).write.format('com.databricks.spark.csv').save('/home/user/output',header = 'true')

我需要生成唯一的列名称。即:如果我在文件数组中给出了具有相同列的两个文件,它应该生成如下。

+----------+----+-------------------+-----+-------------------+
|  NUMBER  |IDx |  RECHARGE_AMOUNTx | IDy |  RECHARGE_AMOUNTy |
+----------+----+-------------------+-----+-------------------+
|9090909092|  1 |               30  |  1  |               40  |
|9090909093|  2 |               30  |  2  |               50  |
|9090909090|  3 |               30  |  3  |               60  |
|9090909094|  4 |               30  |  4  |               70  |
+----------+---+-----------------+---+------------------------+

在 panda 中,我可以使用 suffixes 参数,如下所示 dfFinal = dfFinal.merge(df2,left_on='NUMBER',right_on='NUMBER',how='inner',suffixes =('x', 'y'),sort=True) 这将生成上述数据帧。有什么方法可以在 pyspark 上复制这个吗?

最佳答案

您可以从每个数据框中选择列并为其指定别名。
像这样。

dfFinal = dfFinal.join(df2, on=['NUMBER'], how='inner') \
                 .select('NUMBER',
                         dfFinal.ID.alias('ID_1'),
                         dfFinal.RECHARGE_AMOUNT.alias('RECHARGE_AMOUNT_1'),
                         df2.ID.alias('ID_2'),
                         df2.RECHARGE_AMOUNT.alias('RECHARGE_AMOUNT_2'))

关于python - 在pyspark中加入具有相同列名的数据框,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/52622076/

相关文章:

python - 如何在 Python 中编辑纯文本文件?

python - 使用 "Restarting & Run All"时未应用 Jupyter notebook custom.js

scala - 如何使用scala规范化或标准化spark中具有多个列/变量的数据?

python - 无法导入 SparkContext

apache-spark - 读取分区 Parquet 时,Spark 错误地将以 'd' 或 'f' 结尾的分区名称解释为数字

python - Tight_Layout : Attribute Error . 'AxesSubplot' 对象没有属性 'tight_layout'

python - 索引和 for 循环解决方法中没有日期

algorithm - Spark : What is the time complexity of the connected components algorithm used in GraphX?

macos - 如何设置 mesos 在独立 OS/X 上运行 Spark

apache-spark - 通过基于条件连接另一列的值来创建新的 pyspark DataFrame 列