我有一个同时使用 grequests
的应用程序和 multiprocessing.managers
用于通过 HTTP 进行 IPC 通信和异步 RESTful 通信的组合。
似乎grequests
, 在使用 gevent.monkey
的 patch_all()
方法,打破了multiprocessing.connection
multiprocessing.manager.SyncManager
使用的模块类及其派生类。
这显然不是一个孤立的问题,但会影响任何实现 multiprocessing.connetion
的用例,例如 multiprocessing.pool
, 例如。
深入研究 gevent/monkey.py
中的代码,我发现 stdlib socket
的交换模块与 gevent.socket
是导致破损的原因。
这可以在 gevent/monkey.py
的第 115 行找到。在patch_socket()
下功能:
def patch_socket(dns=True, aggressive=True):
"""Replace the standard socket object with gevent's cooperative sockets.
...
_socket.socket = socket.socket # This line breaks multiprocessing.connection!
...
我的问题是为什么这个交换页会中断 multiprocessing.connection
,以及使用 gevent.socket
有哪些优势?而不是标准库的 socket
模块?也就是说,不修补 socket
会导致什么性能损失(如果有的话)模块?
回溯
Traceback (most recent call last):
File "clientWithGeventMonkeyPatch.py", line 49, in <module>
client = GetClient(host, port, authkey)
File "clientWithGeventMonkeyPatch.py", line 39, in GetClient
client.connect()
File "/usr/lib/python2.7/multiprocessing/managers.py", line 500, in connect
conn = Client(self._address, authkey=self._authkey)
File "/usr/lib/python2.7/multiprocessing/connection.py", line 175, in Client
answer_challenge(c, authkey)
File "/usr/lib/python2.7/multiprocessing/connection.py", line 414, in answer_challenge
response = connection.recv_bytes(256) # reject large message
IOError: [Errno 11] Resource temporarily unavailable
重现错误的代码
(在 ubuntu 服务器 11.10,python2.7.3 上,安装了 gevent,greenlet 和 grequests)
经理.py
## manager.py
import multiprocessing
import multiprocessing.managers
import datetime
class LocalManager(multiprocessing.managers.SyncManager):
def __init__(self, *args, **kwargs):
multiprocessing.managers.SyncManager.__init__(self, *args, **kwargs)
self.__type__ = 'LocalManager'
def GetManager(host, port, authkey):
def getdatetime():
return '{}'.format(datetime.datetime.now())
LocalManager.register('getdatetime', callable = getdatetime)
manager = LocalManager(address = (host, port), authkey = authkey)
manager.start()
return manager
if __name__ == '__main__':
# define our manager connection parameters
port = 55555
host = 'localhost'
authkey = 'auth1234'
# start a manager
man = GetManager(host, port, authkey)
# wait for user input to shut down
raw_input('return to shutdown')
man.shutdown()
客户端.py
## client.py -- this one works
import time
import multiprocessing.managers
class RemoteClient(multiprocessing.managers.SyncManager):
def __init__(self, *args, **kwargs):
multiprocessing.managers.SyncManager.__init__(self, *args, **kwargs)
self.__type__ = 'RemoteClient'
def GetClient(host, port, authkey):
RemoteClient.register('getdatetime')
client = RemoteClient(address = (host, port), authkey = authkey)
client.connect()
return client
if __name__ == '__main__':
# define our client connection parameters
port = 55555
host = 'localhost'
authkey = 'auth1234'
# start a manager
client = GetClient(host, port, authkey)
print 'connected', client
print 'client.getdatetime()', client.getdatetime()
# wait a couple of seconds, then do it again
time.sleep(2)
print 'client.getdatetime()', client.getdatetime()
# exit...
clientWithGeventMonkeyPatch.py
## clientWithGeventMonkeyPatch.py -- breaks, depending on patch_all() parameters
import time
import multiprocessing.managers
# this part is copied from grequests
# bear in mind that it doesn't actually do anything in this module.
try:
import gevent
from gevent import monkey as curious_george
from gevent.pool import Pool
except ImportError:
raise RuntimeError('Gevent is required for grequests.')
# this line causes breakage of the multiprocessing.manager connection auth method:
# Monkey-patch.
# patch_all() parameters with default values: socket=True, dns=True, time=True, select=True, thread=True, os=True, ssl=True, aggressive=True
curious_george.patch_all(thread=False, select=False) # breaks
#~ curious_george.patch_all(thread=False, select=False, socket = False) # works!
#~ curious_george.patch_all(thread=False, select=False, socket = True, aggressive = True, dns = True) # same as (thread=False, select=False); breaks
#~ curious_george.patch_all(thread=False, select=False, socket = True, aggressive = True, dns = False) # breaks
#~ curious_george.patch_all(thread=False, select=False, socket = True, aggressive = False, dns = True) # breaks
#~ curious_george.patch_all(thread=False, select=False, socket = True, aggressive = False, dns = False) # breaks
class RemoteClient(multiprocessing.managers.SyncManager):
def __init__(self, *args, **kwargs):
multiprocessing.managers.SyncManager.__init__(self, *args, **kwargs)
self.__type__ = 'RemoteClient'
def GetClient(host, port, authkey):
RemoteClient.register('getdatetime')
client = RemoteClient(address = (host, port), authkey = authkey)
client.connect()
return client
if __name__ == '__main__':
# define our client connection parameters
port = 55555
host = 'localhost'
authkey = 'auth1234'
# start a manager
client = GetClient(host, port, authkey)
print 'connected', client
print 'client.getdatetime()', client.getdatetime()
# wait a couple of seconds, then do it again
time.sleep(2)
print 'client.getdatetime()', client.getdatetime()
# exit...
最佳答案
如果不给套接字模块打补丁,gevent
不阻止网络操作的能力将不可用,因此使用 gevent
的大部分好处首先将不可用。
gevent
和 multiprocessing
并不是真正设计来与彼此很好地玩耍 - gevent
主要假设您正在通过它进行网络连接,而不是绕过最高级别的 Python 套接字接口(interface)(multiprocessing
会这样做)。
关于python - 为什么 gevent.socket 会破坏 multiprocessing.connection 的授权,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/14736766/