python - Hadoop/Spark 读取许多 CSV 文件

标签 python csv hadoop apache-spark hdfs

我有很多以非常有意义的方式存储的结构化数据,我想以同样有意义的完整和高效的方式处理它们。

+- some-hdfs-path/
  +- level-1_var-01/
  |  +- level-2_var-001.csv
  |  +- ...
  |  +- level-2_var-nnn.csv
  +- level-1_var-02/
  |  +- level-2_other-001.csv
  |  +- ...
  |  +- level-2_other-mmm.csv
  +- ... /
  +- level-1_var-nn/
  |  +- ...

每个文件大约 100MB,大约有 1,000,000 行。每个目录中的文件数量(通常约为 100 个)各不相同,文件名也各不相同。换句话说,我不知道有多少文件或它们叫什么,但我确实需要它们的名称,显然还有它们的内容。

我无法处理从 sc.textFile("/some-hdfs-path/level-1_var-01/*.csv") 返回的 RDD >sc.wholeTextFiles("/some-hdfs-path/level-1_var-01")

总体目标是实际获取 level-1_var/目录中每个文件的第一行和最后一行。合并每个 level-1_var 的结果,然后返回并在 some-other-hdfs-path/level-1-var/中为每个 level-1_var/写出一组全新的文件

我是 Hadoop/Spark 的新手,使用的是 RDD。我读过 documentation对于上述两个函数,但我仍然对如何迭代我返回的 RDD 并进行处理感到困惑。

编辑:文件包含时间序列数据,因此不希望将每个目录中的文件内容串联起来。我愿意将文件内容作为附加列添加到一个巨大的数据框中,而不是作为行。

最佳答案

通过替换您的配置和属性,使用此代码在 pySpark 中读取 CSV。

from pyspark.sql import SparkSession
from pyspark.sql import Row

def get_first_and_last(filename):
    #rdd variable holds the content of file(it's distributed)
    rdd = spark.read.csv(filename, header=True, mode="DROPMALFORMED").rdd

    #Here filename holds abs path. Feel free to substring as per your needs 
    return Row(filename, rdd.first, rdd.take(rdd.count()).last())


spark = SparkSession \
    .builder \
    .appName("Read CSVs") \
    .config("spark.some.config.option", "some-value") \
    .getOrCreate()

# This file list is not distributed one, It holds list of filenames only
filesList = spark.sparkContext\
    .wholeTextFiles("/some-hdfs-path/level-*_var-*/*.csv")\
    .map(lambda x: x[0])\  
    .collect()

#output array
records = filesList.map(get_first_and_last)

for record in records:
    print(record)

我已经在 scala 中尝试了等效代码,并且能够根据需要查看结果。

编辑:根据评论添加了另一种方法。

注意:当使用 sparkContext.wholeTextFiles() 时,小文件是首选,因为每个文件都将完全加载到内存中。 documentation

records = spark.sparkContext\
    .wholeTextFiles("/some-hdfs-path/level-*_var-*/*.csv")\
    .map(lambda x : Row(x[0], x[1].split("\\n")[0], x[1].split("\\n")[-1]))\

for record in records.collect():
    print(record)

pySpark - SparkSession

关于python - Hadoop/Spark 读取许多 CSV 文件,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/40266465/

相关文章:

java - Hadoop jar 引发反射时使用反射程序运行java.lang.NoClassDefFoundError

python - Visual Studio,Python 不自动缩进

python - 导入pytorch时出现导入错误

perl - 在perl中解析制表符分隔的文件

php - 读取 CSV 文件并存入 MySQL 数据库

hadoop - 将 ORC 文件转换为 Parquet 文件

python - 根据条件 Pandas 重置累积总和

python - 在 Python 中解析深层嵌套的 XML 文件

python - 在 python 中将列标题添加到 csv

hadoop - 使用hadoop hdfs将文件从我的虚拟机上传到另一个虚拟机