我尝试使用 txredis(redis 的非阻塞扭曲 api)作为持久消息队列但未成功,我正在尝试使用我正在处理的 scrapy 项目进行设置。我发现虽然客户端没有阻塞,但它变得比本来应该慢得多,因为 react 器循环中本应是一个事件的事件被分成了数千个步骤。
因此,我尝试使用 redis-py(常规阻塞扭曲 api)并将调用包装在延迟线程中。它工作得很好,但是我想在调用 redis 时执行内部延迟,因为我想设置连接池以尝试进一步加快速度。
下面是我对从延迟线程的扭曲文档中获取的一些示例代码的解释,以说明我的用例:
#!/usr/bin/env python
from twisted.internet import reactor,threads
from twisted.internet.task import LoopingCall
import time
def main_loop():
print 'doing stuff in main loop.. do not block me!'
def aBlockingRedisCall():
print 'doing lookup... this may take a while'
time.sleep(10)
return 'results from redis'
def result(res):
print res
def main():
lc = LoopingCall(main_loop)
lc.start(2)
d = threads.deferToThread(aBlockingRedisCall)
d.addCallback(result)
reactor.run()
if __name__=='__main__':
main()
这是我对连接池的更改,它使延迟线程中的代码阻塞:
#!/usr/bin/env python
from twisted.internet import reactor,defer
from twisted.internet.task import LoopingCall
import time
def main_loop():
print 'doing stuff in main loop.. do not block me!'
def aBlockingRedisCall(x):
if x<5: #all connections are busy, try later
print '%s is less than 5, get a redis client later' % x
x+=1
d = defer.Deferred()
d.addCallback(aBlockingRedisCall)
reactor.callLater(1.0,d.callback,x)
return d
else:
print 'got a redis client; doing lookup.. this may take a while'
time.sleep(10) # this is now blocking.. any ideas?
d = defer.Deferred()
d.addCallback(gotFinalResult)
d.callback(x)
return d
def gotFinalResult(x):
return 'final result is %s' % x
def result(res):
print res
def aBlockingMethod():
print 'going to sleep...'
time.sleep(10)
print 'woke up'
def main():
lc = LoopingCall(main_loop)
lc.start(2)
d = defer.Deferred()
d.addCallback(aBlockingRedisCall)
d.addCallback(result)
reactor.callInThread(d.callback, 1)
reactor.run()
if __name__=='__main__':
main()
所以我的问题是,有谁知道为什么我的更改会导致延迟线程阻塞和/或谁能提出更好的解决方案?
最佳答案
嗯,作为twisted docs说:
Deferreds do not make the code magically not block
无论何时使用阻塞代码,例如 sleep
,您都必须将其推迟到新线程。
#!/usr/bin/env python
from twisted.internet import reactor,defer, threads
from twisted.internet.task import LoopingCall
import time
def main_loop():
print 'doing stuff in main loop.. do not block me!'
def aBlockingRedisCall(x):
if x<5: #all connections are busy, try later
print '%s is less than 5, get a redis client later' % x
x+=1
d = defer.Deferred()
d.addCallback(aBlockingRedisCall)
reactor.callLater(1.0,d.callback,x)
return d
else:
print 'got a redis client; doing lookup.. this may take a while'
def getstuff( x ):
time.sleep(3)
return "stuff is %s" % x
# getstuff is blocking, so you need to push it to a new thread
d = threads.deferToThread(getstuff, x)
d.addCallback(gotFinalResult)
return d
def gotFinalResult(x):
return 'final result is %s' % x
def result(res):
print res
def aBlockingMethod():
print 'going to sleep...'
time.sleep(10)
print 'woke up'
def main():
lc = LoopingCall(main_loop)
lc.start(2)
d = defer.Deferred()
d.addCallback(aBlockingRedisCall)
d.addCallback(result)
reactor.callInThread(d.callback, 1)
reactor.run()
if __name__=='__main__':
main()
如果 redis api 不是很复杂,使用 twisted.web 重写它可能更自然,而不是在很多线程中调用阻塞 api。
关于python - Twisted:为什么将延迟回调传递给延迟线程会使线程突然阻塞?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/2466000/