我有一个(某种程度上)最小的多处理测试示例,其中预期输出是共享的 Pandas 数据帧。但是,共享数据框永远不会更新。在我的示例中,首先创建 10 个文本文件用于测试目的,每个文件都包含与文件名相对应的单个整数。为工作函数提供 10 个文件路径和用于共享数据帧的命名空间中的每一个,然后它分析每个文件并将“结果”输入到数据帧中的适当位置(出于测试目的,该结果是整数的总和)文件中给出的值以及列表中的每个常量称为“常量”)。
关于在每个任务之后更新数据框以及使变量共享发挥作用有什么想法吗?我犯了一个简单的错误吗?有几篇文章提出了这种共享数据框的方法,但它们通常具有简单的结构,而我的结构中的某些内容导致共享失败。例如,我尝试遵循此处给出的方法:How to share pandas DataFrame object between processes?
from multiprocessing import Manager
import multiprocessing as mp
import pandas as pd
import os
test_folder = r'C:\test_files'
test_filenames = ['one', 'two', 'three', 'four', 'five', 'six', 'seven', 'eight', 'nine', 'ten']
constants = [10, 15, 30, 60, 1440]
ct = 1
for filename in test_filenames:
with open(test_folder + '\\' + filename + '.txt', 'w') as f:
f.write(str(ct))
f.close()
ct += 1
def worker_function(file_paths, ns):
dataframe = ns.df
for file_path in file_paths:
with open(file_path) as f:
value = int(f.readline())
f.close()
filename = file_path.split( '\\' )[-1]
for constant in constants:
result = value + constant
dataframe.at[constant, filename] = result
ns.df = dataframe
def run_parallel(file_paths, number_procs, ns):
procs = []
for i in range(number_procs):
paths_load = file_paths[i::number_procs]
proc = mp.Process(target=worker_function, args=(paths_load, ns))
procs.append(proc)
procs[i].start()
for p in procs:
p.join()
if __name__ == '__main__':
num_procs = 4
files = os.listdir(test_folder)
file_paths = [test_folder + '\\' + file for file in files]
output_df = pd.DataFrame(columns=files, index=constants)
mgr = Manager()
ns = mgr.Namespace()
ns.df = output_df
run_parallel(file_paths, num_procs, ns)
output_df = ns.df
***我编辑了标题以反射(reflect)不再使用命名空间的解决方案。我采用了已接受的答案并对其进行了修改(如下)以使用尽可能少的代码并且不处理异常。如果您想要多处理,您可以导入 ProcessPoolExecutor。
from concurrent.futures import ThreadPoolExecutor, as_completed
import pandas as pd
import os
test_folder = r'C:\test_files'
test_filenames = ['one', 'two', 'three', 'four', 'five', 'six', 'seven', 'eight', 'nine', 'ten']
constants = [10, 15, 30, 60, 1440]
ct = 1
for filename in test_filenames:
with open(test_folder + '\\' + filename + '.txt', 'w') as f:
f.write(str(ct))
ct += 1
def worker_function(file_path):
with open(file_path) as f:
value = int(f.readline())
result_list = []
filename = file_path.split( '\\' )[-1]
result_list.append(filename)
for constant in constants:
result = value + constant
result_list.append(result)
return result_list
if __name__ == '__main__':
files = os.listdir(test_folder)
file_paths = [test_folder + '\\' + file for file in files]
output_df = pd.DataFrame(columns=constants, index=files)
with ThreadPoolExecutor(max_workers=4) as executor:
pool = {executor.submit(worker_function, p): p for p in file_paths}
for future in as_completed(pool):
worker_result = future.result()
output_df.loc[worker_result[0]] = worker_result[1:]
最佳答案
concurrent.futures
模块对于因并行数据查找或处理步骤而受到 CPU 或 I/O 限制的工作流程很有帮助。
对于您的情况,它应该如下所示。我不在 Windows 上,所以我没有尝试重新创建文件名来测试它,但我希望该结构能让您了解该模式。请注意,我使用多个线程而不是进程,因为工作函数主要从事 I/O 而不是处理。
from concurrent.futures import ThreadPoolExecutor, as_completed
import os
import pandas as pd
test_folder = r'C:\test_files'
test_filenames = ['one', 'two', 'three', 'four', 'five', 'six', 'seven', 'eight', 'nine', 'ten']
constants = [10, 15, 30, 60, 1440]
#ct = 1
def file_counter(ct=1):
for filename in test_filenames:
with open(test_folder + '\\' + filename + '.txt', 'w') as f:
f.write(str(ct))
# no need to use f.close() with a context manager
ct += 1
def worker_function(file_path):
result_list = []
with open(file_path) as f:
value = int(f.readline())
# no need to use f.close() with a context manager
filename = file_path.split( '\\' )[-1]
for constant in constants:
result = value + constant
result_list.append((constant, filename, result))
return result_list
if __name__ == '__main__':
file_counter() # keep execution below the if...main
files = os.listdir(test_folder)
file_paths = [test_folder + '\\' + file for file in files]
dataframe_collection = []
# for I/O you should prefer threads over processes
with ThreadPoolExecutor(max_workers=4) as executor:
pool = {executor.submit(worker_function, p): p for p in file_paths}
for future in as_completed(pool):
worker_result = future.result()
if isinstance(worker_result, Exception): # choose your own exception types
# handle the exception
pass
else:
output_df = pd.DataFrame(data=worker_result, columns=files, index=constants)
dataframe_collection.append(output_df)
# now concatenate all the DataFrames
single_df = pd.concat(dataframe_collection)
关于python - 在进程之间更新 pandas 数据框,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/55502519/