python - 文件较大时CSV文件出现问题

原文 标签 python file csv sorting

我有一个大的tsv文件(~2.5Gb)。我遍历每一行,其中该行有6个选项卡。我取每行的第一个选项卡,并将该行附加到基于第一个选项卡的csv文件中。目标是以基于主tsv行的csv文件排序的文件结束。
这在小规模的文件上有效,但是当我在大文件上运行时,IPython控制台永远不会结束。我要保存的文件看起来像是被填满了,但当我打开它时,没有显示任何内容。

import csv

file_path = ".../master.tsv"

with open(file_path, 'r') as masterfile:
    for line in masterfile:
        line_split = line.split("|")
        cik = line_split[0].zfill(10)

        save_path = ".../data-sorted/"
        save_path += cik + ".csv"

        with open(save_path, 'a') as savefile:
            wr = csv.writer(savefile, quoting=csv.QUOTE_ALL)
            wr.writerow(line_split)

最佳答案

你的代码效率很低,因为它为输入文件的每一行/每一行打开并附加数据,如果输入文件那么大,那么这将是一个巨大的次数(因为需要这样做的操作系统调用相对较慢)。
另外,我注意到你的代码中至少有一个bug,那就是:

save_path += cik + ".csv"

它只会让save_path变得越来越长…而不是我们需要的。
不管怎样,这里有一些东西应该工作得更快,尽管处理这么大的文件可能还需要相当长的时间。它通过缓存中间结果来加快进程。它只通过打开不同的输出CSV文件并尽可能少地创建相应的csv.writer对象,这是第一次需要的,并且只有当缓存达到最大长度时它们才被关闭。
请注意,缓存本身可能会消耗大量内存,这取决于有多少个唯一的csv输出文件,以及可以同时打开其中多少个文件,但使用大量内存会使它运行得更快。您需要在速度和内存使用之间寻找最佳的平衡点,并手动调整MAX_OPEN值,同时始终低于操作系统一次允许打开多少文件的限制。
还要注意的是,通过更明智地选择关闭现有的文件条目,而不是随意地选择一个开放的文件条目,可能会使它的效率更高。但是,这样做是否真的有用取决于输入文件中的数据是否有任何有利的分组或其他顺序。
import csv
import os
import random

class CSVWriterCache(dict):
    """ Dict subclass to cache pairs of csv files and associated
        csv.writers. When a specified maximum number of them already
        exist, a random one closed, but an entry for it is retained
        and marked "closed" so it can be re-opened in append mode
        later if it's ever referenced again. This limits the number of
        files open at any given time.
    """
    _CLOSED = None  # Marker to indicate that file has seen before.

    def __init__(self, max_open, **kwargs):
        self.max_open = max_open
        self.cur_open = 0  # Number of currently opened csv files.
        self.csv_kwargs = kwargs  # keyword args for csv.writer.

    # Adding the next two non-dict special methods makes the class a
    # context manager which allows it to be used in "with" statements
    # to do automatic clean-up.
    def __enter__(self):
        return self

    def __exit__(self, exc_type, exc_val, exc_tb):
        self.close()

    def __getitem__(self, k):
        if k not in self:
            return self.__missing__(k)
        else:
            try:
                csv_writer, csv_file = self.get(k)
            except TypeError:  # Needs to be re-opened in append mode.
                csv_file = open(k, 'a', newline='')
                csv_writer = csv.writer(csv_file, **self.csv_kwargs)

            return csv_writer, csv_file

    def __missing__(self, csv_file_path):
        """ Create a csv.writer corresponding to the file path and add it
            and the file to the cache.
        """
        if self.cur_open == self.max_open:  # Limit?
            # Randomly choose a cached entry with a previously seen
            # file path that is still open (not _CLOSED). The associated
            # file is then closed, but the entry for the file path is
            # left in the dictionary so it can be recognized as having
            # been seen before and be re-opened in append mode.
            while True:
                rand_entry = random.choice(tuple(self.keys()))
                if self[rand_entry] is not self._CLOSED:
                    break
            csv_writer, csv_file = self[rand_entry]
            csv_file.close()
            self.cur_open -= 1
            self[rand_entry] = self._CLOSED  # Mark as previous seen but closed.

        csv_file = open(csv_file_path, 'w', newline='')
        csv_writer = csv.writer(csv_file, **self.csv_kwargs)
        self.cur_open += 1

        # Add pair to cache.
        super().__setitem__(csv_file_path, (csv_writer, csv_file))
        return csv_writer, csv_file

    # Added, non-standard dict method.
    def close(self):
        """ Close all the opened files in the cache and clear it out. """
        for key, entry in self.items():
            if entry is not self._CLOSED:
                entry[1].close()
                self[key] = self._CLOSED  # Not strictly necessary.
                self.cur_open -= 1  # For sanity check at end.
        self.clear()
        assert(self.cur_open == 0)  # Sanity check.

if __name__ == '__main__':
    file_path = "./master.tsv"
    save_path = "./data-sorted"
    MAX_OPEN  = 1000  # Number of opened files allowed (max is OS-dependent).
#    MAX_OPEN  = 2  # Use small value for testing.

    # Create output directory if it does not exist.
    if os.path.exists(save_path):
        if not os.path.isdir(save_path):
            raise RuntimeError("Path {!r} exists, but isn't a directory")
    else:
        print('Creating directory: {!r}'.format(save_path))
        os.makedirs(save_path)

    # Process the input file using a cache of csv.writers.
    with open(file_path, 'r') as masterfile, \
         CSVWriterCache(MAX_OPEN, quoting=csv.QUOTE_ALL) as csv_writer_cache:
        for line in masterfile:
            line_split = line.rstrip().split("|")
            cik = line_split[0].zfill(10)

            save_file_path = os.path.join(save_path, cik + ".csv")
            writer = csv_writer_cache[save_file_path][0]
            writer.writerow(line_split)

    print('{!r} file processing completed'.format(os.path.basename(file_path)))

相关文章:

python - 一串命令的别名

python - Python中的循环结构

c - 问题从C中的.dat文件读取变量

java - 使用Java加载和处理非常大的文件

android - FileNotFoundException-读取CSV文件

python - 可以将random.uniform(0,1)生成0或1吗?

python - Python Pandas GroupBy返回空白行

java - JAX-RS POST文件输入流为空

python - 为什么我不能使用TensorFlow一键编码标签? (错误的切片索引没有类型<type'NoneType'>的类型)

python - Python脚本对目录中所有文件中的行数进行计数