python - 多处理中的外部与内部 __main__ 变量定义

标签 python multithreading multiprocessing python-multiprocessing

我有以下代码:

import multiprocessing
import time
import os

# WHEN SEMAPHORE IS DEFINED HERE THEN IT IT WORKS
semaphore = multiprocessing.Semaphore(1)

def producer(num, output):
  semaphore.acquire()
  time.sleep(1)
  element = "PROCESS: %d PID: %d PPID: %d" % (num, os.getpid(), os.getppid())
  print "WRITE -> " + element
  output.put(element)
  time.sleep(1)
  semaphore.release()

if __name__ == '__main__':
    """
    Reads elements as soon as they are are put inside queue
    """

    output    = multiprocessing.Manager().Queue()
    pool      = multiprocessing.Pool(4)
    lst       = range(40)

    # WHEN SEMAPHORE IS DEFINED HERE THEN IT DOES NOT WORKS
    # semaphore = multiprocessing.Semaphore(1)

    for i in lst:
        pool.apply_async(producer, (i, output))
        # print "%d Do not wait!" % i
        # res.get()

    counter = 0
    while True:
      try:
        print "READ  <- " + output.get_nowait()
        counter += 1
        if (counter == len(lst)):
          print "Break"
          break
      except:
        print "READ  <- NOTHING IN BUFFER"  
        pass
      time.sleep(1)

此代码按预期工作并打印:

READ  <- NOTHING IN BUFFER
WRITE -> PROCESS: 0 PID: 15803 PPID: 15798
READ  <- NOTHING IN BUFFER
READ  <- PROCESS: 0 PID: 15803 PPID: 15798
READ  <- NOTHING IN BUFFER
WRITE -> PROCESS: 1 PID: 15806 PPID: 15798
READ  <- PROCESS: 1 PID: 15806 PPID: 15798
...

然后我有这个不起作用的版本(它基本上与第一个版本相同,除了信号量的定义在另一个地方):

import multiprocessing
import time
import os

# WHEN SEMAPHORE IS DEFINED HERE THEN IT IT WORKS
# semaphore = multiprocessing.Semaphore(1)

def producer(num, output):
  print hex(id(semaphore))
  semaphore.acquire()
  time.sleep(1)
  element = "PROCESS: %d PID: %d PPID: %d" % (num, os.getpid(), os.getppid())
  print "WRITE -> " + element
  output.put(element)
  time.sleep(1)
  semaphore.release()

if __name__ == '__main__':
    """
    Reads elements as soon as they are are put inside queue
    """

    output    = multiprocessing.Manager().Queue()
    pool      = multiprocessing.Pool(4)
    lst       = range(40)

    # WHEN SEMAPHORE IS DEFINED HERE THEN IT DOES NOT WORKS
    semaphore = multiprocessing.Semaphore(1)

    for i in lst:
        pool.apply_async(producer, (i, output))
        # print "%d Do not wait!" % i
        # res.get()

    counter = 0
    while True:
      try:
        print "READ  <- " + output.get_nowait()
        counter += 1
        if (counter == len(lst)):
          print "Break"
          break
      except:
        print "READ  <- NOTHING IN BUFFER"  
        pass
      time.sleep(1)

此版本打印:

READ  <- NOTHING IN BUFFER
READ  <- NOTHING IN BUFFER
READ  <- NOTHING IN BUFFER
READ  <- NOTHING IN BUFFER
READ  <- NOTHING IN BUFFER
READ  <- NOTHING IN BUFFER
READ  <- NOTHING IN BUFFER
...

看起来好像生产者从未向队列写入任何内容。我在某处读到 apply_sync 不打印错误消息。因此,我在第二个代码中将 pool.apply_async( Producer,(i,output)) 更改为 pool.apply( Producer,(i,output)) ,看看到底是怎么回事。似乎 semaphore 没有定义,这是输出:

Traceback (most recent call last):
  File "glob_var_wrong.py", line 31, in <module>
    pool.apply(producer, (i, output))
  File "/usr/lib/python2.7/multiprocessing/pool.py", line 244, in apply
    return self.apply_async(func, args, kwds).get()
  File "/usr/lib/python2.7/multiprocessing/pool.py", line 567, in get
    raise self._value
NameError: global name 'semaphore' is not defined

但是以下代码可以正确运行并打印 10(在 __main__ 中定义的值):

global_var = 20

def print_global_var():
    print global_var

if __name__ == '__main__':
    global_var = 10
    print_global_var()

在这段代码中,全局变量似乎可以在__main__中定义,而在以前的代码中这是不可能的。首先,我假设 __main__ 中定义的变量不在进程之间共享,但它只影响 semaphore 而不是 outputpoollst。为什么会发生这种情况?

最佳答案

当您使用Multiprocessing.Process(由Pool在引擎盖下使用)创建新进程时,它会复制本地范围,对其进行腌制,然后将其发送到新的进程评估过程。

由于您在调用 Pool(4) 之前没有定义变量 semaphore,因此该变量未定义(在评估代码的其他进程中)并且该函数生产者将抛出异常。

要看到这一点,请更改定义

def producer(num, output):
    print hex(id(semaphore))
    try:
        semaphore.acquire()
    except Exception as e:
        print e
    time.sleep(1)
    element = "PROCESS: %d PID: %d PPID: %d" % (num, os.getpid(), os.getppid())
    print "WRITE -> " + element
    output.put(element)
    time.sleep(1)
    semaphore.release()

现在你的失败代码将打印出一堆(40)个错误,看起来像

global name 'semaphore' is not defined

这就是为什么必须在调用Pool之前定义信号量

关于python - 多处理中的外部与内部 __main__ 变量定义,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/32365242/

相关文章:

python - 如何将唯一的行组合转换为排序的元组

python - Python 中的不变性

java - 使用 Java 线程创建一个简单的队列

vb.net - 跨线程操作无效

python - Python multiprocessing.Pool 的键盘中断

python - 同时训练两个模型

c++ - 用于桌面应用程序的 Twitter API 库?

python - 如何获取SNS发布的消息

c++ - 并行线程执行以实现性能

python - Dask:多重处理适用于 Mac,但不适用于 Windows