我有以下代码:
import multiprocessing
import time
import os
# WHEN SEMAPHORE IS DEFINED HERE THEN IT IT WORKS
semaphore = multiprocessing.Semaphore(1)
def producer(num, output):
semaphore.acquire()
time.sleep(1)
element = "PROCESS: %d PID: %d PPID: %d" % (num, os.getpid(), os.getppid())
print "WRITE -> " + element
output.put(element)
time.sleep(1)
semaphore.release()
if __name__ == '__main__':
"""
Reads elements as soon as they are are put inside queue
"""
output = multiprocessing.Manager().Queue()
pool = multiprocessing.Pool(4)
lst = range(40)
# WHEN SEMAPHORE IS DEFINED HERE THEN IT DOES NOT WORKS
# semaphore = multiprocessing.Semaphore(1)
for i in lst:
pool.apply_async(producer, (i, output))
# print "%d Do not wait!" % i
# res.get()
counter = 0
while True:
try:
print "READ <- " + output.get_nowait()
counter += 1
if (counter == len(lst)):
print "Break"
break
except:
print "READ <- NOTHING IN BUFFER"
pass
time.sleep(1)
此代码按预期工作并打印:
READ <- NOTHING IN BUFFER
WRITE -> PROCESS: 0 PID: 15803 PPID: 15798
READ <- NOTHING IN BUFFER
READ <- PROCESS: 0 PID: 15803 PPID: 15798
READ <- NOTHING IN BUFFER
WRITE -> PROCESS: 1 PID: 15806 PPID: 15798
READ <- PROCESS: 1 PID: 15806 PPID: 15798
...
然后我有这个不起作用的版本(它基本上与第一个版本相同,除了信号量的定义在另一个地方):
import multiprocessing
import time
import os
# WHEN SEMAPHORE IS DEFINED HERE THEN IT IT WORKS
# semaphore = multiprocessing.Semaphore(1)
def producer(num, output):
print hex(id(semaphore))
semaphore.acquire()
time.sleep(1)
element = "PROCESS: %d PID: %d PPID: %d" % (num, os.getpid(), os.getppid())
print "WRITE -> " + element
output.put(element)
time.sleep(1)
semaphore.release()
if __name__ == '__main__':
"""
Reads elements as soon as they are are put inside queue
"""
output = multiprocessing.Manager().Queue()
pool = multiprocessing.Pool(4)
lst = range(40)
# WHEN SEMAPHORE IS DEFINED HERE THEN IT DOES NOT WORKS
semaphore = multiprocessing.Semaphore(1)
for i in lst:
pool.apply_async(producer, (i, output))
# print "%d Do not wait!" % i
# res.get()
counter = 0
while True:
try:
print "READ <- " + output.get_nowait()
counter += 1
if (counter == len(lst)):
print "Break"
break
except:
print "READ <- NOTHING IN BUFFER"
pass
time.sleep(1)
此版本打印:
READ <- NOTHING IN BUFFER
READ <- NOTHING IN BUFFER
READ <- NOTHING IN BUFFER
READ <- NOTHING IN BUFFER
READ <- NOTHING IN BUFFER
READ <- NOTHING IN BUFFER
READ <- NOTHING IN BUFFER
...
看起来好像生产者
从未向队列写入任何内容。我在某处读到 apply_sync
不打印错误消息。因此,我在第二个代码中将 pool.apply_async( Producer,(i,output))
更改为 pool.apply( Producer,(i,output))
,看看到底是怎么回事。似乎 semaphore
没有定义,这是输出:
Traceback (most recent call last):
File "glob_var_wrong.py", line 31, in <module>
pool.apply(producer, (i, output))
File "/usr/lib/python2.7/multiprocessing/pool.py", line 244, in apply
return self.apply_async(func, args, kwds).get()
File "/usr/lib/python2.7/multiprocessing/pool.py", line 567, in get
raise self._value
NameError: global name 'semaphore' is not defined
但是以下代码可以正确运行并打印 10(在 __main__
中定义的值):
global_var = 20
def print_global_var():
print global_var
if __name__ == '__main__':
global_var = 10
print_global_var()
在这段代码中,全局变量似乎可以在__main__
中定义,而在以前的代码中这是不可能的。首先,我假设 __main__
中定义的变量不在进程之间共享,但它只影响 semaphore
而不是 output
、pool
,lst
。为什么会发生这种情况?
最佳答案
当您使用Multiprocessing.Process
(由Pool
在引擎盖下使用)创建新进程时,它会复制本地范围,对其进行腌制,然后将其发送到新的进程评估过程。
由于您在调用 Pool(4)
之前没有定义变量 semaphore
,因此该变量未定义(在评估代码的其他进程中)并且该函数生产者
将抛出异常。
要看到这一点,请更改定义
def producer(num, output):
print hex(id(semaphore))
try:
semaphore.acquire()
except Exception as e:
print e
time.sleep(1)
element = "PROCESS: %d PID: %d PPID: %d" % (num, os.getpid(), os.getppid())
print "WRITE -> " + element
output.put(element)
time.sleep(1)
semaphore.release()
现在你的失败代码将打印出一堆(40)个错误,看起来像
global name 'semaphore' is not defined
这就是为什么必须在调用Pool
之前定义信号量
关于python - 多处理中的外部与内部 __main__ 变量定义,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/32365242/