python - Python中的嵌套并行

标签 python python-2.7 parallel-processing

我正在尝试使用 Python 进行多处理器编程。以 Fibonacci 之类的分而治之算法为例。程序的执行流程会像树一样分支出来并并行执行。换句话说,我们有一个 nested parallelism 的例子。 .

在 Java 中,我使用线程池模式来管理资源,因为程序可能会非常快速地分支并创建太多短期线程。单个静态(共享)线程池可以通过 ExecutorService 实例化。

我希望 Pool 也一样,但似乎 Pool object is not to be globally shared .例如,使用 multiprocessing.Manager.Namespace() 共享 Pool 会导致错误。

pool objects cannot be passed between processes or pickled

我有一个两部分的问题:

  1. 我在这里缺少什么;为什么不应该在进程之间共享池?
  2. 在 Python 中实现嵌套并行的模式是什么?如果可能,保持递归结构,而不是用它来换取迭代。

from concurrent.futures import ThreadPoolExecutor

def fibonacci(n):
    if n < 2:
        return n
    a = pool.submit(fibonacci, n - 1)
    b = pool.submit(fibonacci, n - 2)
    return a.result() + b.result()

def main():
    global pool

    N = int(10)
    with ThreadPoolExecutor(2**N) as pool:
        print(fibonacci(N))

main()

Java

public class FibTask implements Callable<Integer> {

    public static ExecutorService pool = Executors.newCachedThreadPool();
    int arg;

    public FibTask(int n) {
        this.arg= n;
    }

    @Override
    public Integer call() throws Exception {
        if (this.arg > 2) { 
            Future<Integer> left = pool.submit(new FibTask(arg - 1));
            Future<Integer> right = pool.submit(new FibTask(arg - 2));
            return left.get() + right.get();
        } else {
            return 1;
        }

    } 

  public static void main(String[] args) throws Exception {
      Integer n = 14;
      Callable<Integer> task = new FibTask(n);
      Future<Integer> result =FibTask.pool.submit(task); 
      System.out.println(Integer.toString(result.get()));
      FibTask.pool.shutdown();            
  }    

}

我不确定这是否重要,但我忽略了“进程”和“线程”之间的区别;对我来说,它们都意味着“虚拟化处理器”。我的理解是,池的目的是共享“池”或资源。正在运行的任务可以向池发出请求。当并行任务在其他线程上完成时,可以回收这些线程并将其分配给新任务。禁止共享池对我来说没有意义,因此每个线程都必须实例化自己的新池,因为这似乎违背了线程池的目的。

最佳答案

1) What am I missing here; why shouldn't a Pool be shared between processes?

并非所有对象/实例都是可拾取/可序列化的,在这种情况下,池使用不可拾取的 threading.lock:

>>> import threading, pickle
>>> pickle.dumps(threading.Lock())
Traceback (most recent call last):
  File "<stdin>", line 1, in <module>
[...]
  File "/Users/rafael/dev/venvs/general/bin/../lib/python2.7/copy_reg.py", line 70, in _reduce_ex
    raise TypeError, "can't pickle %s objects" % base.__name__
TypeError: can't pickle lock objects

或更好:

>>> import threading, pickle
>>> from concurrent.futures import ThreadPoolExecutor
>>> pickle.dumps(ThreadPoolExecutor(1))
Traceback (most recent call last):
  File "<stdin>", line 1, in <module>
  File "/usr/local/Cellar/python/2.7.3/Frameworks/Python.framework/Versions/2.7/lib/python2.7/pickle.py", line 1374, in dumps
    Pickler(file, protocol).dump(obj)
  File 
[...]
"/usr/local/Cellar/python/2.7.3/Frameworks/Python.framework/Versions/2.7/lib/python2.7/pickle.py", line 306, in save
        rv = reduce(self.proto)
      File "/Users/rafael/dev/venvs/general/bin/../lib/python2.7/copy_reg.py", line 70, in _reduce_ex
        raise TypeError, "can't pickle %s objects" % base.__name__
    TypeError: can't pickle lock objects

如果你仔细想想,它是有道理的,锁是由操作系统管理的信号量原语(因为 python 使用 native 线程)。能够在 python 运行时中腌制和保存该对象状态实际上不会完成任何有意义的事情,因为它的真实状态由操作系统保存。

2) What is a pattern for implementing nested parallelism in Python? If possible, maintaining a recursive structure, and not trading it for iteration

现在,为了声望,我上面提到的所有内容都不适用于您的示例,因为您使用的是线程 (ThreadPoolExecutor) 而不是进程 (ProcessPoolExecutor),因此不必发生跨进程的数据共享。

您的 java 示例似乎更高效,因为您使用的线程池 (CachedThreadPool) 正在根据需要创建新线程,而 python 执行器实现是有界的,并且需要显式的最大线程数 (max_workers)。语言之间存在一些语法差异,这似乎也让你失望(python 中的静态实例本质上是任何没有明确限定范围的东西),但本质上,两个示例都会创建完全相同数量的线程以执行。例如,这是一个在 python 中使用相当幼稚的 CachedThreadPoolExecutor 实现的示例:

from concurrent.futures import ThreadPoolExecutor

class CachedThreadPoolExecutor(ThreadPoolExecutor):
    def __init__(self):
        super(CachedThreadPoolExecutor, self).__init__(max_workers=1)

    def submit(self, fn, *args, **extra):
        if self._work_queue.qsize() > 0:
            print('increasing pool size from %d to %d' % (self._max_workers, self._max_workers+1))
            self._max_workers +=1

        return super(CachedThreadPoolExecutor, self).submit(fn, *args, **extra)

pool = CachedThreadPoolExecutor()

def fibonacci(n):
    print n
    if n < 2:
        return n
    a = pool.submit(fibonacci, n - 1)
    b = pool.submit(fibonacci, n - 2)
    return a.result() + b.result()

print(fibonacci(10))

性能调优:

我强烈建议查看 gevent因为它会在没有线程开销的情况下为您提供高并发性。情况并非总是如此,但您的代码实际上是 gevent 使用的典型代表。这是一个例子:

import gevent

def fibonacci(n):
    print n
    if n < 2:
        return n
    a = gevent.spawn(fibonacci, n - 1)
    b = gevent.spawn(fibonacci, n - 2)
    return a.get()  + b.get()

print(fibonacci(10))

完全不科学,但在我的计算机上,上面的代码运行速度比其线程等效代码快 9 倍

我希望这会有所帮助。

关于python - Python中的嵌套并行,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/17038288/

相关文章:

python-2.7 - 根据一列值删除行

c++ - Cuda cudaGetTextureReference 返回 "invalid texture reference"

for-loop - Julia 并行编程 - 使所有工作人员都可以使用现有功能

python - 在列表列表中查找项目的索引

python - 如何在 Python 中显示来自数据框的词云

python - 在 matplotlib 中更改图形大小和图形格式

python - 返回最长的字母子串

c - MPI_Allgather 产生不一致的结果

python - 无法使用 Keras fit_generator 重现结果

python - 从 MySQL 5.5 升级到 5.7 后查询更频繁地遇到死锁