csv - 将 csv 文件与不匹配的列合并

标签 csv apache-spark pyspark spark-dataframe data-analysis

我需要将多个 csv 文件组合成一个对象(我假设是一个数据框),但它们都有不匹配的列,如下所示:

CSV A

store_location_key | product_key | collector_key | trans_dt | sales | units | trans_key

CSV B

collector_key | trans_dt | store_location_key | product_key | sales | units | trans_key

CSV C

collector_key | trans_dt | store_location_key |product_key | sales | units | trans_id

最重要的是,我需要将这些与具有匹配列的另外两个 csv 文件相匹配:

位置 CSV

store_location_key | region | province | city | postal_code | banner | store_num

产品 CSV

product_key | sku | item_name | item_description | department | category

数据类型都是一致的,即 sales 列始终为 float,store_location_key 始终为 int 等。即使我先将每个 csv 转换为数据框,我也不确定 join 会工作(除了最后两个),因为列需要匹配的方式。

最佳答案

要合并前三个 CSV 文件,首先将它们分别读取为数据帧,然后使用 union。使用 union 时列的顺序和数量很重要,因此首先您需要将任何缺失的列添加到 DataFrame,然后使用 select 确保列在相同的位置订单。

all_columns = ['collector_key', 'trans_dt', 'store_location_key', 'product_key', 'sales', 'units', 'trans_key', 'trans_id']

dfA = (spark.read.csv("a.csv", header=True)
  .withColumn(trans_id, lit(null))
  .select(all_columns))
dfB = (spark.read.csv("b.csv", header=True)
  .withColumn(trans_id, lit(null))
  .select(all_columns))
dfC = (spark.read.csv("c.csv", header=True)
  .withColumn(trans_key, lit(null))
  .select(all_columns))

df = dfA.union(dfB).union(dfC)

注意:如果 CSV 文件的列顺序/数量相同,则可以使用单个 spark.read 操作轻松组合它们。

合并前三个 CSV 后,剩下的就很简单了。位置和产品 CSV 都可以使用 join 与其余部分组合。

df_location = spark.read.csv("location.csv", header=True)
df_product = spark.read.csv("product.csv", header=True)

df2 = df.join(df_location, df.store_location_key == df_location.store_location_key)
df3 = df2.join(df_product, df2.product_key == df_product.product_key)

关于csv - 将 csv 文件与不匹配的列合并,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/48999381/

相关文章:

java - Spark flatMap/减少 : How to scale and avoid OutOfMemory?

python - 使用python3将json数据转换为csv但不起作用

javascript - 类型错误:无法读取未定义的 D3.js 的属性 '1'

scala - Spark:使用Scala进行HBase批量加载

python - Pyspark:如何从时间戳中提取小时

python - "normalize"将句子的数据帧转换为更大的单词数据帧

python - 从 Pyspark Dataframe 列中提取文件扩展名

Python:正则表达式使用re.search

r - 从 R 导出 data.frame 为 .csv 后日期格式发生变化

intellij-idea - 为什么IDEA无法识别Spark jar文件?