python - DAG的并行处理

标签 python multithreading python-2.7 directed-acyclic-graphs

我正在努力弄清楚如何并行处理有向无环图。每个节点只有在其所有输入节点都已预先处理后才能“执行”。想象一个具有以下接口(interface)的 Task 类:

class Task(object):
    result = None
    def inputs(self):
        ''' List all requirements of the task. '''
        return ()
    def run(self):
        pass

我想不出一种方法来处理可以表示的图形 通过这种结构与最大数量的 worker 异步 同时,除了一种方法。

认为最好的处理方式是创建一个线程 对于每个任务,等待所有输入被处理。 但是,产卵 立即而不是连续地为每个任务创建一个线程(即当 任务已准备好处理)对我来说不是个好主意。

import threading
class Runner(threading.Thread):
    def __init__(self, task):
        super(Runner, self).__init__()
        self.task = task
        self.start()
    def run(self):
        threads = [Runner(r) for r in self.task.inputs()]
        [t.join() for t in threads]
        self.task.run()

有没有办法更理想地模仿这种行为?还有,这种做法 目前没有实现限制运行任务数量的方法 一次。

最佳答案

一旦项目准备好被处理,就让一个主线程将项目推送到队列中。然后让一群工作人员在队列中监听要处理的任务。 (Python 在 Queue module 中提供了一个同步队列,在 Python 3 中重命名为小写的 queue)。

master 首先创建一个从依赖关系到依赖任务的映射。每个没有任何依赖关系的任务都可以进入队列。每次任务完成时,master 使用字典找出有哪些依赖任务,如果现在满足所有依赖项,则将它们放入队列。

关于python - DAG的并行处理,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/27407412/

相关文章:

python - Python 中的 JSON : How do I get specific parts of an array?

python - python 中的 JSON 生成和验证库

python - 在屏幕上移动线时 Pygame 屏幕撕裂

python - 如何使用线程来创建多个独立的可重用对象实例?

python - "ImportError: No module named pwd"但它存在

python - PDF 创建和在其中写入内容 - PyPDF2

python - 在 python 中计算 netcdf 文件几年的年度异常

c# - WPF C# - 从另一个线程更新进度条

c++ - 在另一个线程正在运行的情况下退出应用程序时出错

android - Python 与 Android 通信?