python - 在进程之间更新 pandas 数据框

标签 python pandas multiprocessing

我有一个(某种程度上)最小的多处理测试示例,其中预期输出是共享的 Pandas 数据帧。但是,共享数据框永远不会更新。在我的示例中,首先创建 10 个文本文件用于测试目的,每个文件都包含与文件名相对应的单个整数。为工作函数提供 10 个文件路径和用于共享数据帧的命名空间中的每一个,然后它分析每个文件并将“结果”输入到数据帧中的适当位置(出于测试目的,该结果是整数的总和)文件中给出的值以及列表中的每个常量称为“常量”)。

关于在每个任务之后更新数据框以及使变量共享发挥作用有什么想法吗?我犯了一个简单的错误吗?有几篇文章提出了这种共享数据框的方法,但它们通常具有简单的结构,而我的结构中的某些内容导致共享失败。例如,我尝试遵循此处给出的方法:How to share pandas DataFrame object between processes?

from multiprocessing import Manager
import multiprocessing as mp
import pandas as pd
import os

test_folder = r'C:\test_files'
test_filenames = ['one', 'two', 'three', 'four', 'five', 'six', 'seven', 'eight', 'nine', 'ten']
constants = [10, 15, 30, 60, 1440]

ct = 1

for filename in test_filenames:
    with open(test_folder + '\\' + filename + '.txt', 'w') as f:
        f.write(str(ct))
    f.close()    

    ct += 1

def worker_function(file_paths, ns):

    dataframe = ns.df

    for file_path in file_paths:

        with open(file_path) as f:
            value = int(f.readline())
        f.close()

        filename = file_path.split( '\\' )[-1]    
        for constant in constants:
            result = value + constant 
            dataframe.at[constant, filename] = result

    ns.df = dataframe

def run_parallel(file_paths, number_procs, ns):    
    procs = []
    for i in range(number_procs):
        paths_load = file_paths[i::number_procs]
        proc = mp.Process(target=worker_function, args=(paths_load, ns))
        procs.append(proc)
        procs[i].start()
    for p in procs:
        p.join()

if __name__ == '__main__':        
    num_procs = 4
    files = os.listdir(test_folder)
    file_paths = [test_folder + '\\' + file for file in files]
    output_df = pd.DataFrame(columns=files, index=constants)   
    mgr = Manager()
    ns = mgr.Namespace()
    ns.df = output_df

    run_parallel(file_paths, num_procs, ns)

    output_df = ns.df

***我编辑了标题以反射(reflect)不再使用命名空间的解决方案。我采用了已接受的答案并对其进行了修改(如下)以使用尽可能少的代码并且不处理异常。如果您想要多处理,您可以导入 ProcessPoolExecutor。

from concurrent.futures import ThreadPoolExecutor, as_completed
import pandas as pd
import os

test_folder = r'C:\test_files'
test_filenames = ['one', 'two', 'three', 'four', 'five', 'six', 'seven', 'eight', 'nine', 'ten']
constants = [10, 15, 30, 60, 1440]

ct = 1

for filename in test_filenames:
    with open(test_folder + '\\' + filename + '.txt', 'w') as f:
        f.write(str(ct))

    ct += 1

def worker_function(file_path):

    with open(file_path) as f:
        value = int(f.readline())

    result_list = []
    filename = file_path.split( '\\' )[-1]    
    result_list.append(filename)
    for constant in constants:
        result = value + constant
        result_list.append(result)

    return result_list

if __name__ == '__main__':

    files = os.listdir(test_folder)
    file_paths = [test_folder + '\\' + file for file in files]
    output_df = pd.DataFrame(columns=constants, index=files)

    with ThreadPoolExecutor(max_workers=4) as executor:
        pool = {executor.submit(worker_function, p): p for p in file_paths}

        for future in as_completed(pool):
            worker_result = future.result()
            output_df.loc[worker_result[0]] = worker_result[1:]

最佳答案

concurrent.futures模块对于因并行数据查找或处理步骤而受到 CPU 或 I/O 限制的工作流程很有帮助。

对于您的情况,它应该如下所示。我不在 Windows 上,所以我没有尝试重新创建文件名来测试它,但我希望该结构能让您了解该模式。请注意,我使用多个线程而不是进程,因为工作函数主要从事 I/O 而不是处理。

from concurrent.futures import ThreadPoolExecutor, as_completed
import os
import pandas as pd

test_folder = r'C:\test_files'
test_filenames = ['one', 'two', 'three', 'four', 'five', 'six', 'seven', 'eight', 'nine', 'ten']
constants = [10, 15, 30, 60, 1440]

#ct = 1

def file_counter(ct=1):
    for filename in test_filenames:
        with open(test_folder + '\\' + filename + '.txt', 'w') as f:
            f.write(str(ct))
        # no need to use f.close() with a context manager
        ct += 1

def worker_function(file_path):
    result_list = []
    with open(file_path) as f:
        value = int(f.readline())
    # no need to use f.close() with a context manager
    filename = file_path.split( '\\' )[-1]    
    for constant in constants:
        result = value + constant
        result_list.append((constant, filename, result))
    return result_list


if __name__ == '__main__':
    file_counter() # keep execution below the if...main
    files = os.listdir(test_folder)
    file_paths = [test_folder + '\\' + file for file in files]
    dataframe_collection = []

    # for I/O you should prefer threads over processes
    with ThreadPoolExecutor(max_workers=4) as executor:
        pool = {executor.submit(worker_function, p): p for p in file_paths}

        for future in as_completed(pool):
            worker_result = future.result()
            if isinstance(worker_result, Exception):  # choose your own exception types
                # handle the exception
                pass
            else:
                output_df = pd.DataFrame(data=worker_result, columns=files, index=constants)
                dataframe_collection.append(output_df)

    # now concatenate all the DataFrames
    single_df = pd.concat(dataframe_collection)

关于python - 在进程之间更新 pandas 数据框,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/55502519/

相关文章:

python - 我应该如何在 django admin 中表示一个 bit flags int 字段?

python - 修复应用级别允许的并发 session 数的方法。 Django

python - Pandas 根据日期加入

python-3.x - Pandas 数据框 : adding 'count' column for multiple occurrences on one column/duplicates

python - 将 pandas 时间序列加入日期范围

python - 由于后台 semaphore_tracker 进程导致多处理卡住

python - 无法在 dev_server 上运行 appengine-admin

python - 连续数据帧行之间的时间差

没有 "if __name__ == ' __main_ _':"的 python3.x 多处理循环

bash - 如何编写进程池 bash shell