python - 为什么 gevent.socket 会破坏 multiprocessing.connection 的授权

标签 python connection multiprocessing gevent monkey

我有一个同时使用 grequests 的应用程序和 multiprocessing.managers用于通过 HTTP 进行 IPC 通信和异步 RESTful 通信的组合。

似乎grequests , 在使用 gevent.monkeypatch_all()方法,打破了multiprocessing.connection multiprocessing.manager.SyncManager 使用的模块类及其派生类。

这显然不是一个孤立的问题,但会影响任何实现 multiprocessing.connetion 的用例,例如 multiprocessing.pool , 例如。

深入研究 gevent/monkey.py 中的代码,我发现 stdlib socket 的交换模块与 gevent.socket是导致破损的原因。 这可以在 gevent/monkey.py 的第 115 行找到。在patch_socket()下功能:

def patch_socket(dns=True, aggressive=True):
    """Replace the standard socket object with gevent's cooperative sockets.
    ...
    _socket.socket = socket.socket # This line breaks multiprocessing.connection!
    ...

我的问题是为什么这个交换页会中断 multiprocessing.connection ,以及使用 gevent.socket 有哪些优势?而不是标准库的 socket模块?也就是说,不修补 socket 会导致什么性能损失(如果有的话)模块?

回溯

Traceback (most recent call last):
  File "clientWithGeventMonkeyPatch.py", line 49, in <module>
    client = GetClient(host, port, authkey)
  File "clientWithGeventMonkeyPatch.py", line 39, in GetClient
    client.connect()
  File "/usr/lib/python2.7/multiprocessing/managers.py", line 500, in connect
    conn = Client(self._address, authkey=self._authkey)
  File "/usr/lib/python2.7/multiprocessing/connection.py", line 175, in Client
    answer_challenge(c, authkey)
  File "/usr/lib/python2.7/multiprocessing/connection.py", line 414, in answer_challenge
    response = connection.recv_bytes(256)        # reject large message
IOError: [Errno 11] Resource temporarily unavailable

重现错误的代码

(在 ubuntu 服务器 11.10,python2.7.3 上,安装了 gevent,greenlet 和 grequests)

经理.py

## manager.py
import multiprocessing
import multiprocessing.managers
import datetime


class LocalManager(multiprocessing.managers.SyncManager):
    def __init__(self, *args, **kwargs):
        multiprocessing.managers.SyncManager.__init__(self, *args, **kwargs)
        self.__type__ = 'LocalManager'

def GetManager(host, port, authkey):
    def getdatetime():
        return '{}'.format(datetime.datetime.now())

    LocalManager.register('getdatetime', callable = getdatetime)
    manager = LocalManager(address = (host, port), authkey = authkey)
    manager.start()

    return manager

if __name__ == '__main__':
    # define our manager connection parameters
    port = 55555
    host = 'localhost'
    authkey = 'auth1234'

    # start a manager
    man = GetManager(host, port, authkey)

    # wait for user input to shut down
    raw_input('return to shutdown')
    man.shutdown()

客户端.py

## client.py -- this one works
import time
import multiprocessing.managers

class RemoteClient(multiprocessing.managers.SyncManager):
    def __init__(self, *args, **kwargs):
        multiprocessing.managers.SyncManager.__init__(self, *args, **kwargs)
        self.__type__ = 'RemoteClient'

def GetClient(host, port, authkey):
    RemoteClient.register('getdatetime')
    client = RemoteClient(address = (host, port), authkey = authkey)
    client.connect()
    return client

if __name__ == '__main__':
    # define our client connection parameters
    port = 55555
    host = 'localhost'
    authkey = 'auth1234'

    # start a manager
    client = GetClient(host, port, authkey)
    print 'connected', client
    print 'client.getdatetime()', client.getdatetime()
    # wait a couple of seconds, then do it again
    time.sleep(2)
    print 'client.getdatetime()', client.getdatetime()

    # exit...

clientWithGeventMonkeyPatch.py​​

## clientWithGeventMonkeyPatch.py -- breaks, depending on patch_all() parameters        
import time
import multiprocessing.managers


# this part is copied from grequests
# bear in mind that it doesn't actually do anything in this module.
try:
    import gevent
    from gevent import monkey as curious_george
    from gevent.pool import Pool
except ImportError:
    raise RuntimeError('Gevent is required for grequests.')

# this line causes breakage of the multiprocessing.manager connection auth method:
# Monkey-patch. 
# patch_all() parameters with default values:  socket=True, dns=True, time=True, select=True, thread=True, os=True, ssl=True, aggressive=True

curious_george.patch_all(thread=False, select=False) # breaks
#~ curious_george.patch_all(thread=False, select=False, socket = False) # works!
#~ curious_george.patch_all(thread=False, select=False, socket = True, aggressive = True, dns = True) # same as (thread=False, select=False); breaks
#~ curious_george.patch_all(thread=False, select=False, socket = True, aggressive = True, dns = False) # breaks
#~ curious_george.patch_all(thread=False, select=False, socket = True, aggressive = False, dns = True) # breaks
#~ curious_george.patch_all(thread=False, select=False, socket = True, aggressive = False, dns = False) # breaks







class RemoteClient(multiprocessing.managers.SyncManager):
    def __init__(self, *args, **kwargs):
        multiprocessing.managers.SyncManager.__init__(self, *args, **kwargs)
        self.__type__ = 'RemoteClient'

def GetClient(host, port, authkey):
    RemoteClient.register('getdatetime')
    client = RemoteClient(address = (host, port), authkey = authkey)
    client.connect()
    return client

if __name__ == '__main__':
    # define our client connection parameters
    port = 55555
    host = 'localhost'
    authkey = 'auth1234'

    # start a manager
    client = GetClient(host, port, authkey)
    print 'connected', client
    print 'client.getdatetime()', client.getdatetime()
    # wait a couple of seconds, then do it again
    time.sleep(2)
    print 'client.getdatetime()', client.getdatetime()

    # exit...

最佳答案

如果不给套接字模块打补丁,gevent不阻止网络操作的能力将不可用,因此使用 gevent 的大部分好处首先将不可用。

geventmultiprocessing并不是真正设计来与彼此很好地玩耍 - gevent主要假设您正在通过它进行网络连接,而不是绕过最高级别的 Python 套接字接口(interface)(multiprocessing 会这样做)。

关于python - 为什么 gevent.socket 会破坏 multiprocessing.connection 的授权,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/14736766/

相关文章:

python - ZeroMQ 和 Python 中的多个订阅过滤器

python - 正确初始化列表

android - 为什么不建议将移动应用程序直接连接到云数据库,而是我们需要一个 Web 服务作为中间层?

python - 多处理写入 Pandas 数据框

python - 运行时警告 : coroutine was never awaited

python - "raise"在 "try"或 "except" block 之外的 python 函数的末尾

java - JDBC寻找未关闭连接的来源?

c# - 为什么 LINQ 突然开始对数据库连接表现得很奇怪

python - 根据 Python 中的 CPU 使用率更改要生成的进程数

python多处理共享队列重新排序