我有一个很大的 tsv 文件 (~2.5Gb)。我遍历每行有 6 个选项卡的行。我获取每行的第一个选项卡并将该行附加到基于第一个选项卡的 csv 文件。目标是以基于 tsv 主行的 csv 文件排序的文件结束。
这适用于小规模文件,但当我在大文件上运行时,IPython 控制台永远不会结束。我正在保存的文件看起来好像正在被填充,但是当我打开它时什么也没有显示。
import csv
file_path = ".../master.tsv"
with open(file_path, 'r') as masterfile:
for line in masterfile:
line_split = line.split("|")
cik = line_split[0].zfill(10)
save_path = ".../data-sorted/"
save_path += cik + ".csv"
with open(save_path, 'a') as savefile:
wr = csv.writer(savefile, quoting=csv.QUOTE_ALL)
wr.writerow(line_split)
最佳答案
您的代码非常低效,因为它为它处理的输入文件的每一行/行打开和附加数据——如果输入文件是这样的话,这将是很多次巨大(因为这样做所需的操作系统调用相对较慢)。
此外,我注意到您的代码中至少有一个错误,即:
save_path += cik + ".csv"
这只会让 save_path
变得越来越长……这不是我们所需要的。
无论如何,这里有一些应该工作得更快的东西,尽管处理这么大的文件可能仍然需要相当长的时间。它通过缓存中间结果来加快处理速度。它通过只打开不同的输出 csv 文件并尽可能少地创建它们相应的 csv.writer
对象来做到这一点,第一次需要它们,并且只有在它们因为缓存达到最大值而关闭时再次创建长度。
请注意,缓存本身可能会消耗大量内存,具体取决于有多少个唯一的 csv 输出文件以及可以同时打开的文件数量——但使用大量内存可以使它运行得更快。您需要尝试并手动调整 MAX_OPEN
值,以找到速度和内存使用之间的最佳平衡点,同时保持低于操作系统对允许打开的文件数的限制一次。
另请注意,通过更智能地选择要关闭的现有文件条目而不是随机选择(打开)一个条目,可能会使其工作效率更高。然而,这样做是否真的有帮助取决于输入文件中的数据是否有任何有利的分组或其他顺序。
import csv
import os
import random
class CSVWriterCache(dict):
""" Dict subclass to cache pairs of csv files and associated
csv.writers. When a specified maximum number of them already
exist, a random one closed, but an entry for it is retained
and marked "closed" so it can be re-opened in append mode
later if it's ever referenced again. This limits the number of
files open at any given time.
"""
_CLOSED = None # Marker to indicate that file has seen before.
def __init__(self, max_open, **kwargs):
self.max_open = max_open
self.cur_open = 0 # Number of currently opened csv files.
self.csv_kwargs = kwargs # keyword args for csv.writer.
# Adding the next two non-dict special methods makes the class a
# context manager which allows it to be used in "with" statements
# to do automatic clean-up.
def __enter__(self):
return self
def __exit__(self, exc_type, exc_val, exc_tb):
self.close()
def __getitem__(self, k):
if k not in self:
return self.__missing__(k)
else:
try:
csv_writer, csv_file = self.get(k)
except TypeError: # Needs to be re-opened in append mode.
csv_file = open(k, 'a', newline='')
csv_writer = csv.writer(csv_file, **self.csv_kwargs)
return csv_writer, csv_file
def __missing__(self, csv_file_path):
""" Create a csv.writer corresponding to the file path and add it
and the file to the cache.
"""
if self.cur_open == self.max_open: # Limit?
# Randomly choose a cached entry with a previously seen
# file path that is still open (not _CLOSED). The associated
# file is then closed, but the entry for the file path is
# left in the dictionary so it can be recognized as having
# been seen before and be re-opened in append mode.
while True:
rand_entry = random.choice(tuple(self.keys()))
if self[rand_entry] is not self._CLOSED:
break
csv_writer, csv_file = self[rand_entry]
csv_file.close()
self.cur_open -= 1
self[rand_entry] = self._CLOSED # Mark as previous seen but closed.
csv_file = open(csv_file_path, 'w', newline='')
csv_writer = csv.writer(csv_file, **self.csv_kwargs)
self.cur_open += 1
# Add pair to cache.
super().__setitem__(csv_file_path, (csv_writer, csv_file))
return csv_writer, csv_file
# Added, non-standard dict method.
def close(self):
""" Close all the opened files in the cache and clear it out. """
for key, entry in self.items():
if entry is not self._CLOSED:
entry[1].close()
self[key] = self._CLOSED # Not strictly necessary.
self.cur_open -= 1 # For sanity check at end.
self.clear()
assert(self.cur_open == 0) # Sanity check.
if __name__ == '__main__':
file_path = "./master.tsv"
save_path = "./data-sorted"
MAX_OPEN = 1000 # Number of opened files allowed (max is OS-dependent).
# MAX_OPEN = 2 # Use small value for testing.
# Create output directory if it does not exist.
if os.path.exists(save_path):
if not os.path.isdir(save_path):
raise RuntimeError("Path {!r} exists, but isn't a directory")
else:
print('Creating directory: {!r}'.format(save_path))
os.makedirs(save_path)
# Process the input file using a cache of csv.writers.
with open(file_path, 'r') as masterfile, \
CSVWriterCache(MAX_OPEN, quoting=csv.QUOTE_ALL) as csv_writer_cache:
for line in masterfile:
line_split = line.rstrip().split("|")
cik = line_split[0].zfill(10)
save_file_path = os.path.join(save_path, cik + ".csv")
writer = csv_writer_cache[save_file_path][0]
writer.writerow(line_split)
print('{!r} file processing completed'.format(os.path.basename(file_path)))
关于python - 文件较大时 CSV 文件出现问题,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/53948853/