Django:如何建立与rabbitmq的持久连接?

标签 django docker rabbitmq amqp pika

我正在寻找一种从我的 django 应用程序向rabbitmq 服务器发布消息的方法。这不是用于任务卸载,所以我不想使用 Celery。目的是使用 django 应用程序发布到交易所,并在 docker 容器中使用该队列中的姊妹(非 django)应用程序。

这一切看起来都很简单,但是,即使没有明确要求发生这种情况,我似乎也无法在每次不建立和关闭连接的情况下向交换发布。

为了解决这个问题,我定义了一个带有嵌套单例类的类,该类使用 Pika 维护与 rabbitmq 服务器的连接。这个想法是嵌套的单例只会被实例化一次,在那个时候声明连接。任何时候有东西要发布到队列中,单例都会处理它。

import logging
import pika
import os

logger = logging.getLogger('django')

class PikaChannelSingleton:
    class __Singleton:
        channel = pika.adapters.blocking_connection.BlockingChannel

        def __init__(self):
            self.initialize_connection()

        def initialize_connection(self):
            logger.info('Attempting to establish RabbitMQ connection')

            credentials = pika.PlainCredentials(rmq_username, rmq_password)
            parameters = pika.ConnectionParameters(rmq_host, rmq_port, rmq_vhost, credentials, heartbeat=0)
            connection = pika.BlockingConnection(parameters)

            con_chan = connection.channel()
            con_chan.exchange_declare(exchange='xchng', exchange_type='topic', durable=True)
            self.channel = con_chan

        def send(self, routing_key, message):
            if self.channel.is_closed:
                PikaChannelSingleton.instance.initialize_connection()

            self.channel.basic_publish(exchange='xchng', routing_key=routing_key,
                                                                body=message)
    instance = None

    def __init__(self, *args, **kwargs):
        if not PikaChannelSingleton.instance:
            logger.info('Creating channel singleton')
            PikaChannelSingleton.instance = PikaChannelSingleton.__Singleton()

    @staticmethod
    def send(routing_key, message):
        PikaChannelSingleton.instance.send(routing_key, message)

rmq_connection = PikaChannelSingleton()

然后我在 django 应用程序中需要的地方导入 rmq_connection。一切都在玩具应用程序和 python repl 中工作,但是每次在 django 应用程序中调用 send 函数时都会建立一个新的连接。然后连接立即关闭,并显示消息“客户端意外关闭的 TCP 连接”。该消息确实正确发布到交易所。

所以我确信 django 会发生一些事情,以及它如何处理进程等等。问题仍然存在,如何在不每次都重新建立连接的情况下将大量消息发布到队列?

最佳答案

如果我理解正确,连接不能像在单线程上下文中那样保持事件状态。随着您的 Django 应用程序继续执行,amqp 客户端不会发送 heartbeats在 channel 上,连接将死亡。

您可以使用 SelectConnection instead of BlockingConnection ,在 Django 的上下文中可能并不容易。

一个好的折衷方案可能是简单地在您的单例中收集消息,但只使用 BlockingConnection 一次性发送所有消息。在 Django 请求的最后。

关于Django:如何建立与rabbitmq的持久连接?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/55153553/

相关文章:

python - Sentry-youtrack 插件 : PicklingError: Can't pickle <type 'generator' >: attribute lookup __builtin__. 生成器失败

javascript - [Django][Admin]change_form 使用 javascript 自动填充字段

python - Django - 循环导入模块和子模块

docker - 运行在 Docker 中的 Minikube,以及端口转发

docker - Google Cloud 上的 Kubernetes 1.7 : FailedSync Error syncing pod, SandboxChanged Pod 沙箱已更改,它将被杀死并重新创建

python - 处理 Celery 任务代码中的错误的推荐方法是什么?

.net - rabbitmq 的 REST API

mysql - Django MySQL 全文搜索

docker - 使用 docker 复制但排除

javascript - 将嵌套对象数组转换为 Nodejs Buffer