python - 与多处理池并行创建对象?

标签 python parallel-processing multiprocessing pickle

Python 2.7.3

我有一个包含数千个数据文件的文件夹。每个数据文件都会被输入构造函数并进行大量处理。现在我正在迭代这些文件并按顺序处理它们:

class Foo:
    def __init__(self,file):
        self.bar = do_lots_of_stuff_with_numpy_and_scipy(file)

def do_lots_of_stuff_with_numpy_and_scipy(file):
    pass

def get_foos(dir):
    return [Foo(os.path.join(dir,file)) for file in os.listdir(dir)]

这工作得很漂亮,但是太慢了。我想并行地做这件事。我尝试过:

def parallel_get_foos(dir):
    p = Pool()
    foos = p.map(Foo, [os.path.join(dir,file) for file in os.listdir(dir)])
    p.close()
    p.join()
    return foos

if __name__ == "__main__":
    foos = parallel_get_foos(sys.argv[1])

但它只是因为很多这样的错误而出错:

Process PoolWorker-7:
Traceback (most recent call last):
  File "/l/python2.7/lib/python2.7/multiprocessing/process.py", line 258, in _bootstrap
    self.run()
  File "/l/python2.7/lib/python2.7/multiprocessing/process.py", line 114, in run
    self._target(*self._args, **self._kwargs)
  File "/l/python2.7/lib/python2.7/multiprocessing/pool.py", line 99, in worker
    put((job, i, result))
  File "/l/python2.7/lib/python2.7/multiprocessing/queues.py", line 390, in put
    return send(obj)
PicklingError: Can't pickle <type 'function'>: attribute lookup __builtin__.function failed

我尝试创建一个函数来返回对象,例如:

def get_foo(file):
    return Foo(file)

def parallel_get_foos(dir):
    ...
    foos = p.map(get_foo, [os.path.join(dir,file) for file in os.listdir(dir)])
    ...

但正如预期的那样,我得到了同样的错误。

我已经阅读了大量类似的线程,试图解决类似于此的问题,但没有一个解决方案对我有帮助。所以我很感谢任何帮助!

编辑:

Bakuriu 正确地推测我在 do_lots_of_stuff 方法中定义了一个非顶级函数。具体来说,我正在做以下事情:

def fit_curve(data,degree):
    """Fits a least-square polynomial function to the given data."""
    sorted = data[data[:,0].argsort()].T
    coefficients = numpy.polyfit(sorted[0],sorted[1],degree)
    def eval(val,deg=degree):
        res = 0
        for coefficient in coefficients:
            res += coefficient*val**deg
            deg -= 1
        return res
    return eval

有什么办法可以让这个函数变得可 pickle 吗?

最佳答案

你正在做的事情(至少,你在例子中展示的),实际上工作得很好:

$mkdir TestPool
$cd TestPool/
$for i in {1..100}
> do
>     touch "test$i"
> done
$ls
test1    test18  test27  test36  test45  test54  test63  test72  test81  test90
test10   test19  test28  test37  test46  test55  test64  test73  test82  test91
test100  test2   test29  test38  test47  test56  test65  test74  test83  test92
test11   test20  test3   test39  test48  test57  test66  test75  test84  test93
test12   test21  test30  test4   test49  test58  test67  test76  test85  test94
test13   test22  test31  test40  test5   test59  test68  test77  test86  test95
test14   test23  test32  test41  test50  test6   test69  test78  test87  test96
test15   test24  test33  test42  test51  test60  test7   test79  test88  test97
test16   test25  test34  test43  test52  test61  test70  test8   test89  test98
test17   test26  test35  test44  test53  test62  test71  test80  test9   test99
$vi test_pool_dir.py
$cat test_pool_dir.py 
import os
import multiprocessing

class Foo(object):
    def __init__(self, fname):
        self.fname = fname   #or your calculations


def parallel_get_foos(directory):
    p = multiprocessing.Pool()
    foos = p.map(Foo, [os.path.join(directory, fname) for fname in os.listdir(directory)])
    p.close()
    p.join()
    return foos

if __name__ == '__main__':
    foos = parallel_get_foos('.')
    print len(foos)   #expected 101: 100 files plus this script

$python test_pool_dir.py 
101

版本信息:

$python --version
Python 2.7.3
$uname -a
Linux giacomo-Acer 3.2.0-39-generic #62-Ubuntu SMP Thu Feb 28 00:28:53 UTC 2013 x86_64 x86_64 x86_64 GNU/Linux

我的猜测是,您没有完全按照您在代码示例中显示的方式进行操作。例如,我在执行此操作时收到与您类似的错误:

>>> import pickle
>>> def test():
...     def test2(): pass
...     return test2
... 
>>> import multiprocessing
>>> p = multiprocessing.Pool()
>>> p.map(test(), [1,2,3])
Exception in thread Thread-2:
Traceback (most recent call last):
  File "/usr/lib/python2.7/threading.py", line 551, in __bootstrap_inner
    self.run()
  File "/usr/lib/python2.7/threading.py", line 504, in run
    self.__target(*self.__args, **self.__kwargs)
  File "/usr/lib/python2.7/multiprocessing/pool.py", line 319, in _handle_tasks
    put(task)
PicklingError: Can't pickle <type 'function'>: attribute lookup __builtin__.function failed

这是显而易见的:

>>> pickle.dumps(test())
Traceback (most recent call last):
  File "<stdin>", line 1, in <module>
  File "/usr/lib/python2.7/pickle.py", line 1374, in dumps
    Pickler(file, protocol).dump(obj)
  File "/usr/lib/python2.7/pickle.py", line 224, in dump
    self.save(obj)
  File "/usr/lib/python2.7/pickle.py", line 286, in save
    f(self, obj) # Call unbound method with explicit self
  File "/usr/lib/python2.7/pickle.py", line 748, in save_global
    (obj, module, name))
pickle.PicklingError: Can't pickle <function test2 at 0x7fad15fc2938>: it's not found as __main__.test2

pickle的文档指出:

The following types can be pickled:

  • None, True, and False
  • integers, long integers, floating point numbers, complex numbers
  • normal and Unicode strings
  • tuples, lists, sets, and dictionaries containing only picklable objects
  • functions defined at the top level of a module
  • built-in functions defined at the top level of a module
  • classes that are defined at the top level of a module
  • instances of such classes whose __dict__ or the result of calling __getstate__() is picklable (see section The pickle protocol for details).

继续:

Note that functions (built-in and user-defined) are pickled by “fully qualified” name reference, not by value. This means that only the function name is pickled, along with the name of the module the function is defined in. Neither the function’s code, nor any of its function attributes are pickled. Thus the defining module must be importable in the unpickling environment, and the module must contain the named object, otherwise an exception will be raised.

关于python - 与多处理池并行创建对象?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/15760179/

相关文章:

python - Python如何传递多重继承的__init__参数

python - 在 Linux 上使用 Python 以编程方式提供辅助 WiFi 热点凭证

python - 有没有办法在 Python 的子类中多次包含同一个 Mixin?

运行 doRedis- 即使已导出对象也未找到

java - CyclicBarrier 浪费时间

python - 守护进程不允许有 child

python - 在 Pool.map 中使用生成器作为可迭代是否合理

python - 为什么 Django 找不到我的模板

python multiprocessing apply_async 只使用一个进程

c - 平行区域上的矩阵元素之和导致 OpenMP 上的错误答案