python - 在子进程已经启动后授予对共享内存的访问权限

标签 python ipc multiprocessing shared-memory

如果数据仅在子进程生成后才可用(使用 multiprocessing.Process),我如何让子进程访问共享内存中的数据?

我知道 multiprocessing.sharedctypes.RawArray ,但我无法弄清楚如何让我的子进程访问在进程已经启动后创建的 RawArray

数据是由父进程产生的,数据量是事先不知道的。

如果不是 GIL我会改用线程,这将使这项任务更简单一些。使用非 CPython 实现不是一种选择。


深入了解 muliprocessing.sharedctypes ,看起来共享的 ctype 对象被分配了 using mmaped memory .

所以这个问题真正归结为:如果 mmap() 在子进程生成后由父进程调用,子进程能否访问匿名映射的内存?

这有点符合 this question 中的要求。 , 除了在我的例子中 mmap() 的调用者是父进程而不是子进程。


(已解决)

我创建了自己的 RawArray 版本,它在底层使用了 shm_open()。只要标识符 (tag) 匹配,生成的共享 ctypes 数组就可以与任何进程共享。

参见 this answer有关详细信息和示例。

最佳答案

免责声明:我是问题的作者。

我最终使用了 posix_ipc模块创建我自己的版本 RawArray .我主要使用 posix_ipc.SharedMemory 调用 shm_open()在幕后。

我的实现 (ShmemRawArray) 公开了与 RawArray 相同的功能,但需要两个额外的参数 - 一个 tag 来唯一标识共享内存区域, 和一个 create 标志来确定我们是应该创建一个新的共享内存段还是附加到一个现有的共享内存段。

如果有人感兴趣,这里有一份副本:https://gist.github.com/1222327

ShmemRawArray(typecode_or_type, size_or_initializer, tag, create=True)

使用说明:

  • 前两个参数(typecode_or_typesize_or_initializer)应该与 RawArray 一样工作。
  • 只要 tag 匹配,任何进程都可以访问共享数组。
  • 当原始对象(由 ShmemRawArray(..., create=True) 返回)被删除时,共享内存段被取消链接
  • 使用当前存在的标签创建共享数组将引发ExistentialError
  • 使用不存在的(或已取消链接的)标签访问共享数组也会引发ExistentialError

A SSCCE (简短、独立、可编译的示例)展示它的实际应用。

#!/usr/bin/env python2.7
import ctypes
import multiprocessing
from random import random, randint
from shmemctypes import ShmemRawArray

class Point(ctypes.Structure):
    _fields_ = [ ("x", ctypes.c_double), ("y", ctypes.c_double) ]

def worker(q):
    # get access to ctypes array shared by parent
    count, tag = q.get()
    shared_data = ShmemRawArray(Point, count, tag, False)

    proc_name = multiprocessing.current_process().name
    print proc_name, ["%.3f %.3f" % (d.x, d.y) for d in shared_data]

if __name__ == '__main__':
    procs = []
    np = multiprocessing.cpu_count()
    queue = multiprocessing.Queue()

    # spawn child processes
    for i in xrange(np):
        p = multiprocessing.Process(target=worker, args=(queue,))
        procs.append(p)
        p.start()

    # create a unique tag for shmem segment
    tag = "stack-overflow-%d" % multiprocessing.current_process().pid

    # random number of points with random data
    count = randint(3,10) 
    combined_data = [Point(x=random(), y=random()) for i in xrange(count)]

    # create ctypes array in shared memory using ShmemRawArray
    # - we won't be able to use multiprocssing.sharectypes.RawArray here 
    #   because children already spawned
    shared_data = ShmemRawArray(Point, combined_data, tag)

    # give children info needed to access ctypes array
    for p in procs:
        queue.put((count, tag))

    print "Parent", ["%.3f %.3f" % (d.x, d.y) for d in shared_data]
    for p in procs:
        p.join()

运行结果如下:

[me@home]$ ./shmem_test.py
Parent ['0.633 0.296', '0.559 0.008', '0.814 0.752', '0.842 0.110']
Process-1 ['0.633 0.296', '0.559 0.008', '0.814 0.752', '0.842 0.110']
Process-2 ['0.633 0.296', '0.559 0.008', '0.814 0.752', '0.842 0.110']
Process-3 ['0.633 0.296', '0.559 0.008', '0.814 0.752', '0.842 0.110']
Process-4 ['0.633 0.296', '0.559 0.008', '0.814 0.752', '0.842 0.110']

关于python - 在子进程已经启动后授予对共享内存的访问权限,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/7419159/

相关文章:

python - 如何在 Python 中使用 Managers() 在多个进程之间共享一个字符串?

python - 计算 pySpark 中非唯一列表元素的累积和

python - 将数据从 google api 导入到我的数据库

objective-c - 使用 perl 与正在运行的 OS X 应用程序进行通信

typescript - 从单个IPC调用获取多个IPC回调

python - 使用多处理的文件处理器

Python多处理无限循环

python - 有更快的替代方案吗?

python - 如何解决运行程序时出现黑屏的问题

c++ - Perl:IPC::Shareable 和 SWIG'ed C++ 对象不一致