我正在努力弄清楚如何并行处理有向无环图。每个节点只有在其所有输入节点都已预先处理后才能“执行”。想象一个具有以下接口(interface)的 Task
类:
class Task(object):
result = None
def inputs(self):
''' List all requirements of the task. '''
return ()
def run(self):
pass
我想不出一种方法来处理可以表示的图形 通过这种结构与最大数量的 worker 异步 同时,除了一种方法。
我认为最好的处理方式是创建一个线程 对于每个任务,等待所有输入被处理。 但是,产卵 立即而不是连续地为每个任务创建一个线程(即当 任务已准备好处理)对我来说不是个好主意。
import threading
class Runner(threading.Thread):
def __init__(self, task):
super(Runner, self).__init__()
self.task = task
self.start()
def run(self):
threads = [Runner(r) for r in self.task.inputs()]
[t.join() for t in threads]
self.task.run()
有没有办法更理想地模仿这种行为?还有,这种做法 目前没有实现限制运行任务数量的方法 一次。
最佳答案
一旦项目准备好被处理,就让一个主线程将项目推送到队列中。然后让一群工作人员在队列中监听要处理的任务。 (Python 在 Queue
module 中提供了一个同步队列,在 Python 3 中重命名为小写的 queue
)。
master 首先创建一个从依赖关系到依赖任务的映射。每个没有任何依赖关系的任务都可以进入队列。每次任务完成时,master 使用字典找出有哪些依赖任务,如果现在满足所有依赖项,则将它们放入队列。
关于python - DAG的并行处理,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/27407412/