python - 使用 Tornado 和 Pika 进行异步队列监控

标签 python asynchronous rabbitmq amqp tornado

我有一个 AMQP 服务器 ( RabbitMQ ),我想在 Tornado web server 中发布和读取它.为此,我想我会使用异步 amqp python 库;特别是Pika (据称支持 Tornado 的一种变体)。

我编写的代码似乎成功地从队列中读取,除了在请求结束时,我得到一个异常(浏览器返回正常):

[E 101219 01:07:35 web:868] Uncaught exception GET / (127.0.0.1)
    HTTPRequest(protocol='http', host='localhost:5000', method='GET', uri='/', version='HTTP/1.1', remote_ip='127.0.0.1', remote_ip='127.0.0.1', body='', headers={'Host': 'localhost:5000', 'Accept-Language': 'en-us,en;q=0.5', 'Accept-Encoding': 'gzip,deflate', 'Keep-Alive': '115', 'Accept': 'text/html,application/xhtml+xml,application/xml;q=0.9,*/*;q=0.8', 'User-Agent': 'Mozilla/5.0 (X11; U; Linux x86_64; en-US; rv:1.9.2.13) Gecko/20101206 Ubuntu/10.10 (maverick) Firefox/3.6.13', 'Accept-Charset': 'ISO-8859-1,utf-8;q=0.7,*;q=0.7', 'Connection': 'keep-alive', 'Cache-Control': 'max-age=0', 'If-None-Match': '"58f554b64ed24495235171596351069588d0260e"'})
    Traceback (most recent call last):
      File "/home/dave/devel/lib/python2.6/site-packages/tornado/web.py", line 810, in _stack_context
        yield
      File "/home/dave/devel/lib/python2.6/site-packages/tornado/stack_context.py", line 77, in StackContext
        yield
      File "/usr/lib/python2.6/contextlib.py", line 113, in nested
        yield vars
      File "/home/dave/lib/python2.6/site-packages/tornado/stack_context.py", line 126, in wrapped
        callback(*args, **kwargs)
      File "/home/dave/devel/src/pika/pika/tornado_adapter.py", line 42, in _handle_events
        self._handle_read()
      File "/home/dave/devel/src/pika/pika/tornado_adapter.py", line 66, in _handle_read
        self.on_data_available(chunk)
      File "/home/dave/devel/src/pika/pika/connection.py", line 521, in on_data_available
        self.channels[frame.channel_number].frame_handler(frame)
    KeyError: 1

我不完全确定我是否正确使用了这个库,所以我可能会做一些明显错误的事情。我的代码的基本流程是:

  1. 请求进来
  2. 使用 TornadoConnection 创建到 RabbitMQ 的连接;指定回调
  3. 在连接回调中,创建 channel ,声明/绑定(bind)我的队列,并调用 basic_consume;指定回调
  4. 在消费回调中,关闭 channel 并调用Tornado的finish函数。
  5. 查看异常(exception)情况。

我的问题有几个:

  1. 这个流程是否正确?我不确定连接回调的目的是什么,只是如果我不使用它就不起作用。
  2. 我应该为每个网络请求创建一个 AMQP 连接吗? RabbitMQ 的文档建议不,我不应该,而是我应该坚持只创建 channel 。但是,我应该在哪里创建连接,如果它短暂中断,我该如何尝试重新连接?
  3. 如果我为每个 Web 请求创建一个 AMQP 连接,我应该在哪里关闭它?在我的回调中调用 amqp.close() 似乎把事情搞得更糟。

稍后我会尝试编写一些示例代码,但我上面描述的步骤相当完整地展示了事物的消费方面。我也遇到了发布方面的问题,但队列的消耗更为紧迫。

最佳答案

查看一些源代码会有所帮助,但我在多个生产项目中使用了相同的支持 Tornado 的 pika 模块,没有出现问题。

您不想为每个请求创建一个连接。创建一个包装所有 AMQP 操作的类,并将其实例化为 Tornado 应用程序级别的单例,可以跨请求(和跨请求处理程序)使用。我在一个“runapp()”函数中执行此操作,该函数执行类似这样的操作,然后启动主 Tornado ioloop。

这是一个名为“事件”的类。这是一个部分实现(具体来说,我没有在这里定义'self.handle_event'。这取决于你。

class Event(object):
  def __init__(self, config):
    self.host = 'localhost'
    self.port = '5672'
    self.vhost = '/'
    self.user = 'foo'
    self.exchange = 'myx'
    self.queue = 'myq'
    self.recv_routing_key = 'msgs4me'
    self.passwd = 'bar'

    self.connected = False 
    self.connect()


  def connect(self):

    credentials = pika.PlainCredentials(self.user, self.passwd)

    parameters = pika.ConnectionParameters(host = self.host,
                                         port = self.port,
                                         virtual_host = self.vhost,
                                         credentials = credentials)

    srs = pika.connection.SimpleReconnectionStrategy()

    logging.debug('Events: Connecting to AMQP Broker: %s:%i' % (self.host,
                                                              self.port))
    self.connection = tornado_adapter.TornadoConnection(parameters,
                                                      wait_for_open = False,
                                                      reconnection_strategy = srs,
                                                      callback = self.on_connected)

  def on_connected(self):

    # Open the channel
    logging.debug("Events: Opening a channel")
    self.channel = self.connection.channel()

    # Declare our exchange
    logging.debug("Events: Declaring the %s exchange" %  self.exchange)
    self.channel.exchange_declare(exchange = self.exchange,
                                type = "fanout",
                                auto_delete = False,
                                durable = True)

    # Declare our queue for this process
    logging.debug("Events: Declaring the %s queue" %  self.queue)
    self.channel.queue_declare(queue = self.queue,
                             auto_delete = False,
                             exclusive = False,
                             durable = True)


    # Bind to the exchange
    self.channel.queue_bind(exchange = self.exchange,
                          queue = self.queue,
                          routing_key = self.recv_routing_key)

    self.channel.basic_consume(consumer = self.handle_event, queue = self.queue, no_ack = True)

    # We should be connected if we made it this far
    self.connected = True

然后我将其放入名为“events.py”的文件中。我的 RequestHandlers 和任何后端代码都使用一个“common.py”模块,该模块包装了对两者都有用的代码(我的 RequestHandlers 不直接调用任何 amqp 模块方法——对于数据库、缓存等也是如此),所以我在 common.py 的模块级别定义“events=None”,然后我像这样实例化 Event 对象:

import events

def runapp(config):
    if myapp.common.events is None: 
       myapp.common.events = myapp.events.Event(config)
    logging.debug("MYAPP.COMMON.EVENTS: %s", myapp.common.events)
    http_server = tornado.httpserver.HTTPServer(app,
                                            xheaders=config['HTTPServer']['xheaders'],
                                            no_keep_alive=config['HTTPServer']['no_keep_alive'])
    http_server.listen(port) 
    main_loop = tornado.ioloop.IOLoop.instance()
    logging.debug("MAIN IOLOOP: %s", main_loop)
    main_loop.start()

新年快乐:-D

关于python - 使用 Tornado 和 Pika 进行异步队列监控,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/4483440/

相关文章:

python - 如何读取包含 "pandas.core.frame, numpy.core.multiarray"的数据文件

python - 在python上进行ARP请求

javascript - 嵌套异步获取状态的 React 组件会导致 Flux 调度链

javascript - 响应回调时使用 Promise

javascript - Node.js async.whilst() 根本没有执行

rabbitmq - 使用命令行在Windows RabbitMQ节点上创建vHost

python - 错误 : Django 'ChoiceField' object has no attribute 'use_required_attribute'

python - 将两列坐标合并为一列

linux - rabbitmqadmin socket.error : [errno 104] connection reset by peer

java - RabbitMQ 和 channel Java 线程安全