python - Pyspark RDD 将当前行与下一行合并,直到当前行长度达到 x

标签 python apache-spark pyspark

背景:

我目前有大文件上传到AWS S3,这些文件在某些​​列中包含换行符,这导致它们被错误读取。但是,这些文件确实有一个非常具体的列分隔符 ~#~。为了删除不正确的换行符,我目前正在通过 awsglue 流式传输文件,如果当前行没有应有的列数,则将每一行与下一行合并。

示例:

取行:"val1"~#~"va\nl\n2"~#~"val3",其呈现方式如下

"val1"~#~"va
l
2"~#~"val3"

逐行进行,使用:

colnum=3
for row in f:
    while not len(row.split('~#~'))==colnum:
        row += next(f)
cleanrow = row.replace('\n','. ')+'\n

cleanrow 示例将返回一行上的示例,如下所示预期输出:

“val1”~#~“va.l.2”~#~“val3”

问题:

目前,即使计算机位于 AWS 网络上,通过计算机流式传输这些大文件来清理它们也需要很长时间。因此,我考虑使用 pyspark 为此,我尝试设置自定义换行符,如下所示 spark._jsc.hadoopConfiguration().set("textinputformat.record.delimiter","\"\n") 但问题是,事实证明我们也可以在文本字段中包含 '"\n',这意味着并非所有行都得到修复。我是 pyspark 的新手,所以不太确定从哪里开始。我尝试过 mapflatMapreduce 但似乎这不是我所追求的,因为它们似乎只使用当前行,或将所有行合并为一行。我发现的最接近的 SO 是 this post它使用 sliding 函数,但问题与我想要实现的目标有点不同,我在 pyspark 中找不到该函数的任何文档,只有 scala.

关于如何使用可以在 AWSglue 中实现的其他工具解决新线路问题的其他建议,不涉及流式传输数据集,我们将受到欢迎。 (文件太大,内存无法容纳)

最佳答案

我设法解决了我的问题

#first I read in the data
rdd = spark.sparkContext.textFile(MessyFile)

#the first line is expected to have the correct number of columns (no linebreaks within a column)
cols = len(rdd.first().split("~#~"))

#I store the already "correct" rows in one RDD, and the incorrect ones in a different RDD
correct = rdd.filter(lambda x: len(x.split("~#~"))==cols)
wrong = rdd.filter(lambda x: len(x.split("~#~"))!=cols)

#The incorrect rows are now so small that they will fit in memory, so I can make RDD into an iterable list
fix = iter(wrong.collect())
fixed = []

#I then iterate over all the rows in the incorrect list and add next row until the row has the expected number of columns, and I add ". " to indicate where there was a linebreak
#The new rows are added to a new list called fixed
for item in fix:
    row = item
    while len(row.split("~#~"))!=cols:
        row+='. '+next(fix)
    fixed.append(row)

#I then union the already correct rows with the newly fixed rows
new = correct.union(spark.sparkContext.parallelize(fixed)) \
        .map(lambda row: row.split("~#~"))

#I then create a dataframe, assing the first row as header and write it out as a parquet file
header = new.first()
df = new.filter(lambda line: line != header).toDF()
oldcols = df.columns

df = reduce(lambda df, idx:df.withColumnRenamed(oldcols[idx],header[idx]),range(len(oldcols)),df)

df.coalesce(10).write.parquet(CleanFile,mode='overwrite') 

我能想到的唯一问题是,错误行的数量是否超过内存的容纳范围(不太可能),或者第一列或最后一列中有换行符(在我的文件中不太可能)

关于python - Pyspark RDD 将当前行与下一行合并,直到当前行长度达到 x,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/53845157/

相关文章:

python - q, w = 1, 2 如果 1 < 2 否则 2, 1; ValueError : too many values to unpack. 为什么?

python - 对同一列应用不同的重采样方法(pandas)

python - 如何将对象作为参数传递给函数?

python - Apache Spark - 将 UDF 的结果分配给多个数据框列

python - numpy 多维索引和对角线对称

scala - yarn 上的 Spark ;如何将指标发送到 Graphite 水槽?

hadoop - Spark Hive Context - 带有分区和大写字段名称的 Avro 表

apache-spark - Spark 性能从 Dataframe 保存到 hdfs 或 hive 的大型数据集

python - 使用数组的数组分解列 - PySpark

python - 从 Databrick 文件系统读取文件