python - 文件较大时 CSV 文件出现问题

标签 python file csv sorting

我有一个很大的 tsv 文件 (~2.5Gb)。我遍历每行有 6 个选项卡的行。我获取每行的第一个选项卡并将该行附加到基于第一个选项卡的 csv 文件。目标是以基于 tsv 主行的 csv 文件排序的文件结束。

这适用于小规模文件,但当我在大文件上运行时,IPython 控制台永远不会结束。我正在保存的文件看起来好像正在被填充,但是当我打开它时什么也没有显示。

import csv

file_path = ".../master.tsv"

with open(file_path, 'r') as masterfile:
    for line in masterfile:
        line_split = line.split("|")
        cik = line_split[0].zfill(10)

        save_path = ".../data-sorted/"
        save_path += cik + ".csv"

        with open(save_path, 'a') as savefile:
            wr = csv.writer(savefile, quoting=csv.QUOTE_ALL)




save_path += cik + ".csv"

这只会让 save_path 变得越来越长……这不是我们所需要的。

无论如何,这里有一些应该工作得更快的东西,尽管处理这么大的文件可能仍然需要相当长的时间。它通过缓存中间结果来加快处理速度。它通过只打开不同的输出 csv 文件并尽可能少地创建它们相应的 csv.writer 对象来做到这一点,第一次需要它们,并且只有在它们因为缓存达到最大值而关闭时再次创建长度。

请注意,缓存本身可能会消耗大量内存,具体取决于有多少个唯一的 csv 输出文件以及可以同时打开的文件数量——但使用大量内存可以使它运行得更快。您需要尝试并手动调整 MAX_OPEN 值,以找到速度和内存使用之间的最佳平衡点,同时保持低于操作系统对允许打开的文件数的限制一次。


import csv
import os
import random

class CSVWriterCache(dict):
    """ Dict subclass to cache pairs of csv files and associated
        csv.writers. When a specified maximum number of them already
        exist, a random one closed, but an entry for it is retained
        and marked "closed" so it can be re-opened in append mode
        later if it's ever referenced again. This limits the number of
        files open at any given time.
    _CLOSED = None  # Marker to indicate that file has seen before.

    def __init__(self, max_open, **kwargs):
        self.max_open = max_open
        self.cur_open = 0  # Number of currently opened csv files.
        self.csv_kwargs = kwargs  # keyword args for csv.writer.

    # Adding the next two non-dict special methods makes the class a
    # context manager which allows it to be used in "with" statements
    # to do automatic clean-up.
    def __enter__(self):
        return self

    def __exit__(self, exc_type, exc_val, exc_tb):

    def __getitem__(self, k):
        if k not in self:
            return self.__missing__(k)
                csv_writer, csv_file = self.get(k)
            except TypeError:  # Needs to be re-opened in append mode.
                csv_file = open(k, 'a', newline='')
                csv_writer = csv.writer(csv_file, **self.csv_kwargs)

            return csv_writer, csv_file

    def __missing__(self, csv_file_path):
        """ Create a csv.writer corresponding to the file path and add it
            and the file to the cache.
        if self.cur_open == self.max_open:  # Limit?
            # Randomly choose a cached entry with a previously seen
            # file path that is still open (not _CLOSED). The associated
            # file is then closed, but the entry for the file path is
            # left in the dictionary so it can be recognized as having
            # been seen before and be re-opened in append mode.
            while True:
                rand_entry = random.choice(tuple(self.keys()))
                if self[rand_entry] is not self._CLOSED:
            csv_writer, csv_file = self[rand_entry]
            self.cur_open -= 1
            self[rand_entry] = self._CLOSED  # Mark as previous seen but closed.

        csv_file = open(csv_file_path, 'w', newline='')
        csv_writer = csv.writer(csv_file, **self.csv_kwargs)
        self.cur_open += 1

        # Add pair to cache.
        super().__setitem__(csv_file_path, (csv_writer, csv_file))
        return csv_writer, csv_file

    # Added, non-standard dict method.
    def close(self):
        """ Close all the opened files in the cache and clear it out. """
        for key, entry in self.items():
            if entry is not self._CLOSED:
                self[key] = self._CLOSED  # Not strictly necessary.
                self.cur_open -= 1  # For sanity check at end.
        assert(self.cur_open == 0)  # Sanity check.

if __name__ == '__main__':
    file_path = "./master.tsv"
    save_path = "./data-sorted"
    MAX_OPEN  = 1000  # Number of opened files allowed (max is OS-dependent).
#    MAX_OPEN  = 2  # Use small value for testing.

    # Create output directory if it does not exist.
    if os.path.exists(save_path):
        if not os.path.isdir(save_path):
            raise RuntimeError("Path {!r} exists, but isn't a directory")
        print('Creating directory: {!r}'.format(save_path))

    # Process the input file using a cache of csv.writers.
    with open(file_path, 'r') as masterfile, \
         CSVWriterCache(MAX_OPEN, quoting=csv.QUOTE_ALL) as csv_writer_cache:
        for line in masterfile:
            line_split = line.rstrip().split("|")
            cik = line_split[0].zfill(10)

            save_file_path = os.path.join(save_path, cik + ".csv")
            writer = csv_writer_cache[save_file_path][0]

    print('{!r} file processing completed'.format(os.path.basename(file_path)))

关于python - 文件较大时 CSV 文件出现问题,我们在Stack Overflow上找到一个类似的问题:


python - 如何将元组解析为多个属性?

java - 如何将一个文本拆分为多个文本文件

c - 如何让用户在我的 C 程序中命名 output.txt 文件?

python - 如何将 python crypt 对象转换为 csv,以便我可以对内容调用 csv 方法

添加 # -*- 编码 : utf-8 -*- 后出现 python 语法错误

python - 将各种小型二维矩阵交织成一个更大的矩阵

c++ - 如何使用 C 修改文本数据文件中的值

Python:Scrapy CSV 导出不正确?

java - 如何在Java中处理csv数据中的逗号

c++ - 使用 Python 测试 C++ 库