在 Python 3.6 中,我并行运行多个进程,其中每个进程 ping 一个 URL 并返回一个 Pandas 数据帧。我想继续运行 (2+) 个进程,我创建了一个最小的代表性示例,如下所示。
我的问题是:
1) 我的理解是,由于我有不同的功能,我不能使用 Pool.map_async()
及其变体。那正确吗?我见过的唯一例子是重复相同的功能,比如 this answer .
2) 使此设置永久运行的最佳做法是什么?在我下面的代码中,我使用了一个 while
循环,我怀疑它不适合这个目的。
3) 我使用 Process
和 Manager
的方式是否最优?我使用 multiprocessing.Manager.dict()
作为共享字典来返回进程的结果。我在 this answer 的评论中看到在这里使用 Queue
是有意义的,但是 Queue
对象没有 `.dict()' 方法。所以,我不确定这将如何运作。
对于示例代码的任何改进和建议,我将不胜感激。
import numpy as np
import pandas as pd
import multiprocessing
import time
def worker1(name, t , seed, return_dict):
'''worker function'''
print(str(name) + 'is here.')
time.sleep(t)
np.random.seed(seed)
df= pd.DataFrame(np.random.randint(0,1000,8).reshape(2,4), columns=list('ABCD'))
return_dict[name] = [df.columns.tolist()] + df.values.tolist()
def worker2(name, t, seed, return_dict):
'''worker function'''
print(str(name) + 'is here.')
np.random.seed(seed)
time.sleep(t)
df = pd.DataFrame(np.random.randint(0, 1000, 12).reshape(3, 4), columns=list('ABCD'))
return_dict[name] = [df.columns.tolist()] + df.values.tolist()
if __name__ == '__main__':
t=1
while True:
start_time = time.time()
manager = multiprocessing.Manager()
parallel_dict = manager.dict()
seed=np.random.randint(0,1000,1) # send seed to worker to return a diff df
jobs = []
p1 = multiprocessing.Process(target=worker1, args=('name1', t, seed, parallel_dict))
p2 = multiprocessing.Process(target=worker2, args=('name2', t, seed+1, parallel_dict))
jobs.append(p1)
jobs.append(p2)
p1.start()
p2.start()
for proc in jobs:
proc.join()
parallel_end_time = time.time() - start_time
#print(parallel_dict)
df1= pd.DataFrame(parallel_dict['name1'][1:],columns=parallel_dict['name1'][0])
df2 = pd.DataFrame(parallel_dict['name2'][1:], columns=parallel_dict['name2'][0])
merged_df = pd.concat([df1,df2], axis=0)
print(merged_df)
最佳答案
答案 1(映射到多个函数)
你在技术上是正确的。 对于 map、map_async 和其他变体,您应该使用单个函数。
但是这个约束可以通过实现一个执行器,并将要执行的函数作为参数的一部分传递来绕过:
def dispatcher(args):
return args[0](*args[1:])
所以一个最小的工作示例:
import multiprocessing as mp
def function_1(v):
print("hi %s"%v)
return 1
def function_2(v):
print("by %s"%v)
return 2
def dispatcher(args):
return args[0](*args[1:])
with mp.Pool(2) as p:
tasks = [
(function_1, "A"),
(function_2, "B")
]
r = p.map_async(dispatcher, tasks)
r.wait()
results = r.get()
答案 2(日程安排)
我将从脚本中删除 while 并安排一个 cron 作业 ( on GNU/Linux ) ( on windows ),以便操作系统负责它的执行。
在 Linux 上,您可以运行 cronotab -e
并添加以下行以使脚本每 5 分钟运行一次。
*/5 * * * * python /path/to/script.py
答案 3(共享词典)
是但不是。
据我所知,使用管理器处理集合等数据是最好的方法。
对于数组或基本类型(int、floats、ecc),存在 Value
和 Array
which are faster .
A manager object returned by Manager() controls a server process which holds > Python objects and allows other processes to manipulate them using proxies.
A manager returned by Manager() will support types list, dict, Namespace, Lock, > RLock, Semaphore, BoundedSemaphore, Condition, Event, Barrier, Queue, Value and > Array.
Server process managers are more flexible than using shared memory objects because they can be made to support arbitrary object types. Also, a single manager can be shared by processes on different computers over a network. They are, however, slower than using shared memory.
但您只需返回一个 Dataframe,因此不需要共享字典。
清理代码
使用之前的所有想法,代码可以重写为:
map 版
import numpy as np
import pandas as pd
from time import sleep
import multiprocessing as mp
def worker1(t , seed):
print('worker1 is here.')
sleep(t)
np.random.seed(seed)
return pd.DataFrame(np.random.randint(0,1000,8).reshape(2,4), columns=list('ABCD'))
def worker2(t , seed):
print('worker2 is here.')
sleep(t)
np.random.seed(seed)
return pd.DataFrame(np.random.randint(0, 1000, 12).reshape(3, 4), columns=list('ABCD'))
def dispatcher(args):
return args[0](*args[1:])
def task_generator(sleep_time=1):
seed = np.random.randint(0,1000,1)
yield worker1, sleep_time, seed
yield worker2, sleep_time, seed + 1
with mp.Pool(2) as p:
results = p.map(dispatcher, task_generator())
merged = pd.concat(results, axis=0)
print(merged)
如果 Dataframe 的连接过程是瓶颈,使用 imap 的方法可能会成为最优方法。
imap 版本
with mp.Pool(2) as p:
merged = pd.DataFrame()
for result in p.imap_unordered(dispatcher, task_generator()):
merged = pd.concat([merged,result], axis=0)
print(merged)
主要区别是在map情况下,程序先等待所有流程任务结束,然后拼接所有Dataframes。
而在 imap_unoredered 的情况下,任务一结束,Dataframe 就会与当前结果连接起来。
关于python-3.x - Python 多进程调度,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/59877768/