python-3.x - Python 多进程调度

标签 python-3.x multiprocessing queue scheduled-tasks

在 Python 3.6 中,我并行运行多个进程,其中每个进程 ping 一个 URL 并返回一个 Pandas 数据帧。我想继续运行 (2+) 个进程,我创建了一个最小的代表性示例,如下所示。

我的问题是:

1) 我的理解是,由于我有不同的功能,我不能使用 Pool.map_async() 及其变体。那正确吗?我见过的唯一例子是重复相同的功能,比如 this answer .

2) 使此设置永久运行的最佳做法是什么?在我下面的代码中,我使用了一个 while 循环,我怀疑它不适合这个目的。

3) 我使用 ProcessManager 的方式是否最优?我使用 multiprocessing.Manager.dict() 作为共享字典来返回进程的结果。我在 this answer 的评论中看到在这里使用 Queue 是有意义的,但是 Queue 对象没有 `.dict()' 方法。所以,我不确定这将如何运作。

对于示例代码的任何改进和建议,我将不胜感激。

import numpy as np
import pandas as pd
import multiprocessing
import time

def worker1(name, t , seed, return_dict):
    '''worker function'''
    print(str(name) + 'is here.')
    time.sleep(t)
    np.random.seed(seed)
    df= pd.DataFrame(np.random.randint(0,1000,8).reshape(2,4), columns=list('ABCD'))
    return_dict[name] = [df.columns.tolist()] + df.values.tolist()

def worker2(name, t, seed, return_dict):
    '''worker function'''
    print(str(name) + 'is here.')
    np.random.seed(seed)
    time.sleep(t)
    df = pd.DataFrame(np.random.randint(0, 1000, 12).reshape(3, 4), columns=list('ABCD'))

    return_dict[name] = [df.columns.tolist()] + df.values.tolist()

if __name__ == '__main__':
    t=1
    while True:

        start_time = time.time()
        manager = multiprocessing.Manager()
        parallel_dict = manager.dict()
        seed=np.random.randint(0,1000,1) # send seed to worker to return a diff df
        jobs = []
        p1 = multiprocessing.Process(target=worker1, args=('name1', t, seed, parallel_dict))
        p2 = multiprocessing.Process(target=worker2, args=('name2', t, seed+1, parallel_dict))
        jobs.append(p1)
        jobs.append(p2)
        p1.start()
        p2.start()
        for proc in jobs:
            proc.join()
        parallel_end_time = time.time() - start_time
        #print(parallel_dict)
        df1= pd.DataFrame(parallel_dict['name1'][1:],columns=parallel_dict['name1'][0])
        df2 = pd.DataFrame(parallel_dict['name2'][1:], columns=parallel_dict['name2'][0])
        merged_df = pd.concat([df1,df2], axis=0)
        print(merged_df)

最佳答案

答案 1(映射到多个函数)

你在技术上是正确的。 对于 map、map_async 和其他变体,您应该使用单个函数。

但是这个约束可以通过实现一个执行器,并将要执行的函数作为参数的一部分传递来绕过:

def dispatcher(args):
    return args[0](*args[1:])

所以一个最小的工作示例:

import multiprocessing as mp

def function_1(v):
    print("hi %s"%v)
    return 1
    
def function_2(v):
    print("by %s"%v)
    return 2

def dispatcher(args):
    return args[0](*args[1:])

with mp.Pool(2) as p:
    tasks = [
        (function_1, "A"),
        (function_2, "B")
    ]
    r = p.map_async(dispatcher, tasks)
    r.wait()
    results = r.get()

答案 2(日程安排)

我将从脚本中删除 while 并安排一个 cron 作业 ( on GNU/Linux ) ( on windows ),以便操作系统负责它的执行。

在 Linux 上,您可以运行 cronotab -e 并添加以下行以使脚本每 5 分钟运行一次。

*/5 * * * * python /path/to/script.py

答案 3(共享词典)

是但不是。

据我所知,使用管理器处理集合等数据是最好的方法。 对于数组或基本类型(int、floats、ecc),存在 ValueArray which are faster .

documentation

A manager object returned by Manager() controls a server process which holds > Python objects and allows other processes to manipulate them using proxies.

A manager returned by Manager() will support types list, dict, Namespace, Lock, > RLock, Semaphore, BoundedSemaphore, Condition, Event, Barrier, Queue, Value and > Array.

Server process managers are more flexible than using shared memory objects because they can be made to support arbitrary object types. Also, a single manager can be shared by processes on different computers over a network. They are, however, slower than using shared memory.

但您只需返回一个 Dataframe,因此不需要共享字典。

清理代码

使用之前的所有想法,代码可以重写为:

map 版

import numpy as np
import pandas as pd
from time import sleep
import multiprocessing as mp

def worker1(t , seed):
    print('worker1 is here.')
    sleep(t)
    np.random.seed(seed)
    return pd.DataFrame(np.random.randint(0,1000,8).reshape(2,4), columns=list('ABCD'))
     

def worker2(t , seed):
    print('worker2 is here.')
    sleep(t)
    np.random.seed(seed)
    return pd.DataFrame(np.random.randint(0, 1000, 12).reshape(3, 4), columns=list('ABCD'))

def dispatcher(args):
    return args[0](*args[1:])

def task_generator(sleep_time=1):
    seed = np.random.randint(0,1000,1)
    yield worker1, sleep_time, seed    
    yield worker2, sleep_time, seed + 1

with mp.Pool(2) as p:
    results = p.map(dispatcher, task_generator())
    merged = pd.concat(results, axis=0)
    print(merged)

如果 Dataframe 的连接过程是瓶颈,使用 imap 的方法可能会成为最优方法。

imap 版本

with mp.Pool(2) as p:
    merged = pd.DataFrame()
    for result in p.imap_unordered(dispatcher, task_generator()):
        merged = pd.concat([merged,result], axis=0)
    print(merged)

主要区别是在map情况下,程序先等待所有流程任务结束,然后拼接所有Dataframes。

而在 imap_unoredered 的情况下,任务一结束,Dataframe 就会与当前结果连接起来。

关于python-3.x - Python 多进程调度,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/59877768/

相关文章:

python - pywin32 Windows 服务不想发送请求状态

python - 按数据框中的一列进行分组,但将其中一些组汇总为一组

python - 进程池执行器 : TypeError: cannot pickle 'PyCapsule' object

c - 带指针的 C 数组队列

parallel-processing - 具有动态数量的并行消费者的 Kafka 工作队列

java - 如何在java中排队并调用实际方法(而不是立即评估)?

python - 条件分段函数

python-3.x - 使用 make install 在 Raspbian Raspberry Pi 3+ 中安装 OpenCV 4 时出现编译错误

c - 在信号量中排队 - 甚至可能吗?

python - 更新来自不同进程的相同实例变量