python - 如何并行处理 CSV 文件?

标签 python csv multiprocessing

我机器上的一个目录中有几千个 CSV 文件,需要根据我制定的正则表达式对其进行验证。 path_to_validator 指向 Scala 脚本,该脚本通过命令行上的 windows .bat 文件运行。它读取正则表达式和 csv 文件并给它一个通过/失败等级,该等级被打印到 output.txt。

限制是这个 Scala 脚本将目录作为参数,而不是 Python 列表,因此我不能在进程之间如此轻松地拆分工作负载。我可以将每个进程的文件移动到一个临时目录,但项目的详细信息是这样的,理想情况下,我部署的程序不需要对 CSV 文件的写入权限。


这是代码:

with open("output.txt", 'w') as output:
    for filename in os.listdir(path_to_csv_folder):
        print("Processing file " + str(current_file_count) + "/" + str(TOTAL_FILE_COUNT), end='\r')

        output.write(filename + ': ')
        validator = subprocess.Popen([path_to_validator, path_to_csv_folder + filename, path_to_csv_schema, "-x",
                                      CSV_ENCODING, "-y", CSV_SCHEMA_ENCODING], stdout=subprocess.PIPE,
                                     stderr=subprocess.PIPE)
        result = validator.stdout.read()
        output.write(result.decode('windows-1252'))

        current_file_count += 1

问题是它需要 1 小时 30 分钟以上,而只使用了大约 20% 的 CPU。这应该是并行化加速的明显候选者。该目录有 5000+ 个 CSV 文件,它们都需要处理。如何将工作负载分配给 4 个不同的进程以利用所有 CPU 能力?

这是我实际编写的代码:

"""
Command line API to CSV validator using Scala implementation from:
http://digital-preservation.github.io/csv-validator/#toc7
"""

PATH_TO_VALIDATOR = r"C:\prog\csv\csv-validator-cmd-1.2-RC2\bin\validate.bat"
PATH_TO_CSV_FOLDER = r"C:\prog\csv\CSVFiles"
PATH_TO_CSV_SCHEMA = r"C:\prog\csv\ocr-schema.csvs"
# Set defaults
CSV_ENCODING = "windows-1252"
CSV_SCHEMA_ENCODING = "UTF-8"


def open_csv(CSV_LIST):
    import subprocess

    # To be used to display a simple progress indicator
    TOTAL_FILE_COUNT = len(CSV_LIST)
    current_file_count = 1

    with open("output.txt", 'w') as output:
        for filename in CSV_LIST:
            print("Processing file " + str(current_file_count) + "/" + str(TOTAL_FILE_COUNT))

            output.write(filename + ': ')
            validator = subprocess.Popen(
                [PATH_TO_VALIDATOR, PATH_TO_CSV_FOLDER + "/" + filename, PATH_TO_CSV_SCHEMA, "--csv-encoding",
                 CSV_ENCODING, "--csv-schema-encoding", CSV_SCHEMA_ENCODING, '--fail-fast', 'true'], stdout=subprocess.PIPE)
            result = validator.stdout.read()
            output.write(result.decode('windows-1252'))

            current_file_count += 1


# Split a list into n sublists of roughly equal size
def split_list(alist, wanted_parts=1):
    length = len(alist)
    return [alist[i * length // wanted_parts: (i + 1) * length // wanted_parts]
            for i in range(wanted_parts)]


if __name__ == '__main__':
    import argparse
    import multiprocessing
    import os

    parser = argparse.ArgumentParser(description="Command line API to Scala CSV validator")
    parser.add_argument('-pv', '--PATH_TO_VALIDATOR', help="Specify the path to csv-validator-cmd/bin/validator.bat",
                        required=True)
    parser.add_argument('-pf', '--PATH_TO_CSV_FOLDER', help="Specify the path to the folder containing the csv files "
                                                            "you want to validate", required=True)
    parser.add_argument('-ps', '--PATH_TO_CSV_SCHEMA', help="Specify the path to CSV schema you want to use to "
                                                            "validate the given files", required=True)

    parser.add_argument('-cenc', '--CSV_ENCODING', help="Optional parameter to specify the encoding used by the CSV "
                                                        "files. Choose UTF-8 or windows-1252. Default windows-1252")
    parser.add_argument('-csenc', '--CSV_SCHEMA_ENCODING', help="Optional parameter to specify the encoding used by "
                                                                "the CSV Schema. Choose UTF-8 or windows-1252. "
                                                                "Default UTF-8")

    args = vars(parser.parse_args())

    if args['CSV_ENCODING'] is not None:
        CSV_ENCODING = args['CSV_ENCODING']

    if args['CSV_SCHEMA_ENCODING'] is not None:
        CSV_SCHEMA_ENCODING = args['CSV_SCHEMA_ENCODING']

    PATH_TO_VALIDATOR = args["PATH_TO_VALIDATOR"]
    PATH_TO_CSV_SCHEMA = args["PATH_TO_CSV_SCHEMA"]
    PATH_TO_CSV_FOLDER = args["PATH_TO_CSV_FOLDER"]

    CPU_COUNT = multiprocessing.cpu_count()

    split_csv_directory = split_list(os.listdir(args["PATH_TO_CSV_FOLDER"]), wanted_parts=CPU_COUNT)

    # Spawn a Process for each CPU on the system
    for csv_list in split_csv_directory:
        p = multiprocessing.Process(target=open_csv, args=(csv_list,))
        p.start()

请让我知道我的代码中的任何陷阱。

最佳答案

看看This introductionmultiprocessing package .

例如,尝试:

import multiprocessing as mp
import os

def process_csv(csv):
    % process the csv
    return {csv: collected_debug_information}

pool = mp.Pool(processes=4)
results = pool.map(process_csv, os.listdir(path_to_csv_folder))

使用返回的字典,您可以查看结果以评估一些解析错误等。它将是一个以 csv 名称作为键的字典列表。

也是一个很好的包是 joblib ,也看看它,在引擎盖下它使用多处理包。

关于python - 如何并行处理 CSV 文件?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/48404125/

相关文章:

python - 多处理:如何在列表上使用 pool.map 并使用参数函数?

python - 如何在 Google Colaboratory 中使用 IBM python 模块 "pixiedust"?

python - 如果Python中不存在全局变量,如何创建它?

ios - 使用 Swift 从 CSV 初始化 CoreData

python - 将 dict 写入 csv 文件,其中键不按字母顺序排列

python 多处理挂起,潜在的队列内存错误?

python - 是否可以从 Scrapy spider 运行另一个 spider?

python - Fortran - Cython 工作流程

python - 在 print() 方法中格式化小数位?

perl - 需要帮助迭代特定格式的文件