python : multiprocessing and Array of c_char_p

标签 python multiprocessing ctypes

我正在启动 3 个进程,我希望它们将一个字符串放入一个共享数组中,在与进程 (i) 对应的索引处。

看下面的代码,生成的输出是:

['test 0', None, None]
['test 1', 'test 1', None]
['test 2', 'test 2', 'test 2']

为什么 'test 0' 被 test 1 覆盖,而 test 1test 2 覆盖?

我想要的是(顺序不重要):

['test 0', None, None]
['test 0', 'test 1', None]
['test 0', 'test 1', 'test 2']

代码:

#!/usr/bin/env python

import multiprocessing
from multiprocessing import Value, Lock, Process, Array
import ctypes
from ctypes import c_int, c_char_p

class Consumer(multiprocessing.Process):
    def __init__(self, task_queue, result_queue, arr, lock):
            multiprocessing.Process.__init__(self)
            self.task_queue = task_queue
            self.result_queue = result_queue
            self.arr = arr
            self.lock = lock

    def run(self):
            proc_name = self.name
            while True:
                next_task = self.task_queue.get()
                if next_task is None:
                    self.task_queue.task_done()
                    break            
                answer = next_task(arr=self.arr, lock=self.lock)
                self.task_queue.task_done()
                self.result_queue.put(answer)
            return

class Task(object):
    def __init__(self, i):
        self.i = i

    def __call__(self, arr=None, lock=None):
        with lock:
            arr[self.i] = "test %d" % self.i
            print arr[:]

    def __str__(self):
        return 'ARC'

    def run(self):
        print 'IN'

if __name__ == '__main__':
   tasks = multiprocessing.JoinableQueue()
   results = multiprocessing.Queue()

   arr = Array(ctypes.c_char_p, 3)

   lock = multiprocessing.Lock()

   num_consumers = multiprocessing.cpu_count() * 2
   consumers = [Consumer(tasks, results, arr, lock) for i in xrange(num_consumers)]

   for w in consumers:
      w.start()

   for i in xrange(3):
      tasks.put(Task(i))

   for i in xrange(num_consumers):
      tasks.put(None)

我正在运行 Python 2.7.3 (Ubuntu)

最佳答案

这个问题似乎类似于this one .在那里,J.F. Sebastian 推测对 arr[i] 的赋值将 arr[i] 指向一个内存地址,该地址仅对进行赋值的子进程有意义。其他子进程在查看该地址时检索垃圾。

至少有两种方法可以避免这个问题。一种是使用 multiprocessing.manager 列表:

import multiprocessing as mp

class Consumer(mp.Process):
    def __init__(self, task_queue, result_queue, lock, lst):
            mp.Process.__init__(self)
            self.task_queue = task_queue
            self.result_queue = result_queue
            self.lock = lock
            self.lst = lst

    def run(self):
            proc_name = self.name
            while True:
                next_task = self.task_queue.get()
                if next_task is None:
                    self.task_queue.task_done()
                    break            
                answer = next_task(lock = self.lock, lst = self.lst)
                self.task_queue.task_done()
                self.result_queue.put(answer)
            return

class Task(object):
    def __init__(self, i):
        self.i = i

    def __call__(self, lock, lst):
        with lock:
            lst[self.i] = "test {}".format(self.i)
            print([lst[i] for i in range(3)])

if __name__ == '__main__':
   tasks = mp.JoinableQueue()
   results = mp.Queue()
   manager = mp.Manager()
   lst = manager.list(['']*3)

   lock = mp.Lock()
   num_consumers = mp.cpu_count() * 2
   consumers = [Consumer(tasks, results, lock, lst) for i in xrange(num_consumers)]

   for w in consumers:
      w.start()

   for i in xrange(3):
      tasks.put(Task(i))

   for i in xrange(num_consumers):
      tasks.put(None)

   tasks.join()

另一种方法是使用固定大小的共享数组,例如 mp.Array('c', 10)

import multiprocessing as mp

class Consumer(mp.Process):
    def __init__(self, task_queue, result_queue, arr, lock):
            mp.Process.__init__(self)
            self.task_queue = task_queue
            self.result_queue = result_queue
            self.arr = arr
            self.lock = lock

    def run(self):
            proc_name = self.name
            while True:
                next_task = self.task_queue.get()
                if next_task is None:
                    self.task_queue.task_done()
                    break            
                answer = next_task(arr = self.arr, lock = self.lock)
                self.task_queue.task_done()
                self.result_queue.put(answer)
            return

class Task(object):
    def __init__(self, i):
        self.i = i

    def __call__(self, arr, lock):
        with lock:
            arr[self.i].value = "test {}".format(self.i)
            print([a.value for a in arr])

if __name__ == '__main__':
   tasks = mp.JoinableQueue()
   results = mp.Queue()
   arr = [mp.Array('c', 10) for i in range(3)]

   lock = mp.Lock()
   num_consumers = mp.cpu_count() * 2
   consumers = [Consumer(tasks, results, arr, lock) for i in xrange(num_consumers)]

   for w in consumers:
      w.start()

   for i in xrange(3):
      tasks.put(Task(i))

   for i in xrange(num_consumers):
      tasks.put(None)

   tasks.join()

我推测当 mp.Array(ctypes.c_char_p, 3) 不工作时它起作用的原因是因为 mp.Array('c', 10) 具有固定大小,因此内存地址永远不会改变,而 mp.Array(ctypes.c_char_p, 3) 具有可变大小,因此当 arr[i] 被分配给一个更大的字符串。

也许这就是the docs警告它何时声明,

Although it is possible to store a pointer in shared memory remember that this will refer to a location in the address space of a specific process. However, the pointer is quite likely to be invalid in the context of a second process and trying to dereference the pointer from the second process may cause a crash.

关于 python : multiprocessing and Array of c_char_p,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/14217297/

相关文章:

python - 如何使用IPy确定IP地址是否在白名单中?

python - 我会称之为高级元组操作吗?

python - 多处理:将类实例传递给 pool.map

python - 简化Python中的Ctype联合(在Windows中发送键盘事件)

python - 一起调试boost暴露的Python和C++

java - Eclipse 组织导入快捷方式 (Ctrl+Shift+O) 不起作用

python - 使用manager修改多进程进程之间的变量

python - python多处理的生产者/消费者问题

python - 使用 ctypes 将音频数据从 Python 传递到 C

python - 如何防止FUNCTYPE被收集