背景:
我目前有大文件上传到AWS S3,这些文件在某些列中包含换行符,这导致它们被错误读取。但是,这些文件确实有一个非常具体的列分隔符 ~#~
。为了删除不正确的换行符,我目前正在通过 awsglue 流式传输文件,如果当前行没有应有的列数,则将每一行与下一行合并。
示例:
取行:"val1"~#~"va\nl\n2"~#~"val3"
,其呈现方式如下
"val1"~#~"va
l
2"~#~"val3"
逐行进行,使用:
colnum=3
for row in f:
while not len(row.split('~#~'))==colnum:
row += next(f)
cleanrow = row.replace('\n','. ')+'\n
cleanrow 示例将返回一行上的示例,如下所示预期输出:
“val1”~#~“va.l.2”~#~“val3”
问题:
目前,即使计算机位于 AWS 网络上,通过计算机流式传输这些大文件来清理它们也需要很长时间。因此,我考虑使用 pyspark 为此,我尝试设置自定义换行符,如下所示 spark._jsc.hadoopConfiguration().set("textinputformat.record.delimiter","\"\n")
但问题是,事实证明我们也可以在文本字段中包含 '"\n'
,这意味着并非所有行都得到修复。我是 pyspark 的新手,所以不太确定从哪里开始。我尝试过 map
、flatMap
和 reduce
但似乎这不是我所追求的,因为它们似乎只使用当前行,或将所有行合并为一行。我发现的最接近的 SO 是 this post它使用 sliding
函数,但问题与我想要实现的目标有点不同,我在 pyspark 中找不到该函数的任何文档,只有 scala.
关于如何使用可以在 AWSglue 中实现的其他工具解决新线路问题的其他建议,不涉及流式传输数据集,我们将受到欢迎。 (文件太大,内存无法容纳)
最佳答案
我设法解决了我的问题
#first I read in the data
rdd = spark.sparkContext.textFile(MessyFile)
#the first line is expected to have the correct number of columns (no linebreaks within a column)
cols = len(rdd.first().split("~#~"))
#I store the already "correct" rows in one RDD, and the incorrect ones in a different RDD
correct = rdd.filter(lambda x: len(x.split("~#~"))==cols)
wrong = rdd.filter(lambda x: len(x.split("~#~"))!=cols)
#The incorrect rows are now so small that they will fit in memory, so I can make RDD into an iterable list
fix = iter(wrong.collect())
fixed = []
#I then iterate over all the rows in the incorrect list and add next row until the row has the expected number of columns, and I add ". " to indicate where there was a linebreak
#The new rows are added to a new list called fixed
for item in fix:
row = item
while len(row.split("~#~"))!=cols:
row+='. '+next(fix)
fixed.append(row)
#I then union the already correct rows with the newly fixed rows
new = correct.union(spark.sparkContext.parallelize(fixed)) \
.map(lambda row: row.split("~#~"))
#I then create a dataframe, assing the first row as header and write it out as a parquet file
header = new.first()
df = new.filter(lambda line: line != header).toDF()
oldcols = df.columns
df = reduce(lambda df, idx:df.withColumnRenamed(oldcols[idx],header[idx]),range(len(oldcols)),df)
df.coalesce(10).write.parquet(CleanFile,mode='overwrite')
我能想到的唯一问题是,错误行的数量是否超过内存的容纳范围(不太可能),或者第一列或最后一列中有换行符(在我的文件中不太可能)
关于python - Pyspark RDD 将当前行与下一行合并,直到当前行长度达到 x,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/53845157/