python - Pika RabbitMQ 客户端的异步消息处理

标签 python rabbitmq pika

正在关注 Pika timed received example ,我想让一个客户端处理更多的并发请求。我的问题是,是否可以在每次收到新消息时以某种方式调用 handle_delivery 而不是等待之前的 handle_delivery 返回?

最佳答案

看起来对 handle_delivery 的调用是阻塞的,但您可以使用 add_timeout 让它向 I/O 事件循环添加一个辅助处理程序。我认为这就是您想要做的:

"""
Asyncronous amqp consumer; do our processing via an ioloop timeout
"""

import sys
import time

from pika.adapters import SelectConnection
from pika.connection import ConnectionParameters

connection = None
channel = None


def on_connected(connection):
    print "timed_receive: Connected to RabbitMQ"
    connection.channel(on_channel_open)


def on_channel_open(channel_):
    global channel
    channel = channel_
    print "timed_receive: Received our Channel"
    channel.queue_declare(queue="test", durable=True,
                          exclusive=False, auto_delete=False,
                          callback=on_queue_declared)

class TimingHandler(object):
    count = 0
    last_count = 0

    def __init__(self, delay=0):
        self.start_time = time.time()
        self.delay = delay

    def handle_delivery(self, channel, method, header, body):
        connection.add_timeout(self.delay, self)

    def __call__(self):
        self.count += 1
        if not self.count % 1000:
            now = time.time()
            duration = now - self.start_time
            sent = self.count - self.last_count
            rate = sent / duration
            self.last_count = self.count
            self.start_time = now
            print "timed_receive: %i Messages Received, %.4f per second" %\
                  (self.count, rate)

def on_queue_declared(frame):
    print "timed_receive: Queue Declared"
    channel.basic_consume(TimingHandler().handle_delivery, queue='test', no_ack=True)


if __name__ == '__main__':

    # Connect to RabbitMQ
    host = (len(sys.argv) > 1) and sys.argv[1] or '127.0.0.1'
    connection = SelectConnection(ConnectionParameters(host),
                                  on_connected)
    # Loop until CTRL-C
    try:
        # Start our blocking loop
        connection.ioloop.start()

    except KeyboardInterrupt:

        # Close the connection
        connection.close()

        # Loop until the connection is closed
        connection.ioloop.start()

关于python - Pika RabbitMQ 客户端的异步消息处理,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/9586077/

相关文章:

python - 在尝试安装 numpy 时出错 - python 3

python - HTTPServer.shutdown 返回但套接字仍然打开

python - 使用 Python 在 Selenium Webdriver (Selenium 2) 中显式等待和隐式等待的问题

c# - 包含泛型的类型的 BsonClassMap

c# - 为什么要在 RabbitMQ 中声明 Exchange?

ubuntu - RabbitMQ On Ubuntu 14.04 服务器配置

python - 使用 Pika 客户端轮询 RabbitMQ 消息

Python素数计算器

ruby - 如何使用 bunny RabbitMQ 客户端阻止订阅?

python - Rabbitmq 使用 Tornado 不阻塞消息