正在关注 Pika timed received example ,我想让一个客户端处理更多的并发请求。我的问题是,是否可以在每次收到新消息时以某种方式调用 handle_delivery 而不是等待之前的 handle_delivery 返回?
最佳答案
看起来对 handle_delivery
的调用是阻塞的,但您可以使用 add_timeout
让它向 I/O 事件循环添加一个辅助处理程序。我认为这就是您想要做的:
"""
Asyncronous amqp consumer; do our processing via an ioloop timeout
"""
import sys
import time
from pika.adapters import SelectConnection
from pika.connection import ConnectionParameters
connection = None
channel = None
def on_connected(connection):
print "timed_receive: Connected to RabbitMQ"
connection.channel(on_channel_open)
def on_channel_open(channel_):
global channel
channel = channel_
print "timed_receive: Received our Channel"
channel.queue_declare(queue="test", durable=True,
exclusive=False, auto_delete=False,
callback=on_queue_declared)
class TimingHandler(object):
count = 0
last_count = 0
def __init__(self, delay=0):
self.start_time = time.time()
self.delay = delay
def handle_delivery(self, channel, method, header, body):
connection.add_timeout(self.delay, self)
def __call__(self):
self.count += 1
if not self.count % 1000:
now = time.time()
duration = now - self.start_time
sent = self.count - self.last_count
rate = sent / duration
self.last_count = self.count
self.start_time = now
print "timed_receive: %i Messages Received, %.4f per second" %\
(self.count, rate)
def on_queue_declared(frame):
print "timed_receive: Queue Declared"
channel.basic_consume(TimingHandler().handle_delivery, queue='test', no_ack=True)
if __name__ == '__main__':
# Connect to RabbitMQ
host = (len(sys.argv) > 1) and sys.argv[1] or '127.0.0.1'
connection = SelectConnection(ConnectionParameters(host),
on_connected)
# Loop until CTRL-C
try:
# Start our blocking loop
connection.ioloop.start()
except KeyboardInterrupt:
# Close the connection
connection.close()
# Loop until the connection is closed
connection.ioloop.start()
关于python - Pika RabbitMQ 客户端的异步消息处理,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/9586077/