简单的问题:我读过的所有教程都向您展示了如何使用 ipython.parallel 或多处理将并行计算的结果输出到列表(或最多是字典)。
您能给我指出一个使用任一库将计算结果输出到共享 pandas 数据帧的简单示例吗?
http://gouthamanbalaraman.com/blog/distributed-processing-pandas.html - 本教程向您展示如何读取输入数据帧(下面的代码),但是我如何将 4 个并行计算的结果输出到一个数据帧?
import pandas as pd
import multiprocessing as mp
LARGE_FILE = "D:\\my_large_file.txt"
CHUNKSIZE = 100000 # processing 100,000 rows at a time
def process_frame(df):
# process data frame
return len(df)
if __name__ == '__main__':
reader = pd.read_table(LARGE_FILE, chunksize=CHUNKSIZE)
pool = mp.Pool(4) # use 4 processes
funclist = []
for df in reader:
# process each data frame
f = pool.apply_async(process_frame,[df])
funclist.append(f)
result = 0
for f in funclist:
result += f.get(timeout=10) # timeout in 10 seconds
print "There are %d rows of data"%(result)
最佳答案
您要求multiprocessing
(或其他Python并行模块)输出到它们不直接输出到的数据结构。如果您使用任何并行包中的Pool
,您最好获得一个列表(使用map
)或一个迭代器(使用imap
)。如果您使用多处理
中的共享内存,您也许能够将结果放入可以通过ctypes
通过指针访问的内存块中。
那么问题是,您能否将迭代器或共享内存块中的结果提取到 pandas.DataFrame
中?我认为答案是肯定的。是的你可以。但是,我认为我没有在教程中看到过这样做的简单示例......因为它做起来并不那么简单。
迭代器路线似乎不太可能,因为您需要让 numpy 来消化迭代器,而无需首先将结果作为列表拉回到 python 中。我会选择共享内存路线。我认为这应该为您提供一个 DataFrame
的输出,然后您可以在 multiprocessing
中使用它:
from multiprocessing import sharedctypes as sh
from numpy import ctypeslib as ct
import pandas as pd
ra = sh.RawArray('i', 4)
arr = ct.as_array(ra)
arr.shape = (2,2)
x = pd.DataFrame(arr)
然后您所要做的就是将数组的句柄传递给multiprocessing.Process
:
import multiprocessing as mp
p1 = mp.Process(target=doit, args=(arr[:1, :], 1))
p2 = mp.Process(target=doit, args=(arr[1:, :], 2))
p1.start()
p2.start()
p1.join()
p2.join()
然后,通过一些指针魔法,结果应该填充到您的 DataFrame
.
我将让您编写 doit
函数来根据需要操作数组。
编辑:这看起来是使用类似方法的一个很好的答案...... https://stackoverflow.com/a/22487898/2379433 。这似乎也有效:https://stackoverflow.com/a/27027632/2379433 .
关于python - 如何将python并行计算(ipython并行或多处理)的结果输出到pandas数据帧?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/30381396/