我希望 concurrent.futures.ProcessPoolExecutor.map()
调用由 2 个或更多参数组成的函数。在下面的示例中,我使用了 lambda
函数并将 ref
定义为大小与 numberlist
具有相同值的数组。
第一个问题:有更好的方法吗?在 numberlist 的大小可以是百万到十亿个元素的情况下,因此 ref 大小必须遵循 numberlist,这种方法不必要地占用宝贵的内存,我想避免。我这样做是因为我读到 map
函数将终止其映射,直到到达最短的数组末端。
import concurrent.futures as cf
nmax = 10
numberlist = range(nmax)
ref = [5, 5, 5, 5, 5, 5, 5, 5, 5, 5]
workers = 3
def _findmatch(listnumber, ref):
print('def _findmatch(listnumber, ref):')
x=''
listnumber=str(listnumber)
ref = str(ref)
print('listnumber = {0} and ref = {1}'.format(listnumber, ref))
if ref in listnumber:
x = listnumber
print('x = {0}'.format(x))
return x
a = map(lambda x, y: _findmatch(x, y), numberlist, ref)
for n in a:
print(n)
if str(ref[0]) in n:
print('match')
with cf.ProcessPoolExecutor(max_workers=workers) as executor:
#for n in executor.map(_findmatch, numberlist):
for n in executor.map(lambda x, y: _findmatch(x, ref), numberlist, ref):
print(type(n))
print(n)
if str(ref[0]) in n:
print('match')
运行上面的代码,我发现 map
函数能够达到我想要的结果。然而,当我将相同的条款转移到 concurrent.futures.ProcessPoolExecutor.map() 时,python3.5 失败并出现此错误:
Traceback (most recent call last):
File "/usr/lib/python3.5/multiprocessing/queues.py", line 241, in _feed
obj = ForkingPickler.dumps(obj)
File "/usr/lib/python3.5/multiprocessing/reduction.py", line 50, in dumps
cls(buf, protocol).dump(obj)
_pickle.PicklingError: Can't pickle <function <lambda> at 0x7fd2a14db0d0>: attribute lookup <lambda> on __main__ failed
问题 2:为什么会出现此错误?如何让 concurrent.futures.ProcessPoolExecutor.map() 调用具有 1 个以上参数的函数?
最佳答案
首先回答你的第二个问题,你得到一个异常,因为你正在使用的 lambda
函数是不可 picklable 的。由于 Python 使用 pickle
协议(protocol)序列化在主进程和 ProcessPoolExecutor
的工作进程之间传递的数据,这是一个问题。根本不清楚为什么要使用 lambda
。您拥有的 lambda 有两个参数,就像原始函数一样。您可以直接使用 _findmatch
而不是 lambda
,它应该可以工作。
with cf.ProcessPoolExecutor(max_workers=workers) as executor:
for n in executor.map(_findmatch, numberlist, ref):
...
至于第一个关于在不创建巨型列表的情况下传递第二个常量参数的问题,您可以通过多种方式解决。一种方法可能是使用 itertools.repeat
创建一个可迭代对象,该对象在迭代时永远重复相同的值。
但更好的方法可能是编写一个额外的函数来为您传递常量参数。 (也许这就是您尝试使用 lambda
函数的原因?)如果您使用的函数可以在模块的顶级命名空间中访问,它应该可以工作:
def _helper(x):
return _findmatch(x, 5)
with cf.ProcessPoolExecutor(max_workers=workers) as executor:
for n in executor.map(_helper, numberlist):
...
关于python - 如何将具有多个参数的函数传递给 python concurrent.futures.ProcessPoolExecutor.map()?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/42056738/