python - Rabbitmq 远程调用 Pika

标签 python linux rabbitmq pika

我是 rabbitmq 的新手,正在尝试通过本教程 (https://www.rabbitmq.com/tutorials/tutorial-six-python.html) 弄清楚如何让客户端向服务器请求有关内存和 CPU 使用率的信息。

因此客户端请求 CPU 和内存(我相信我需要两个队列)并且服务器响应这些值。

在这种情况下,是否可以使用 Python 中的 Pika 库简单地创建一个 client.pyserver.py

最佳答案

我建议您关注第一个 RabbitMQ tutorials如果你还没有。 RPC 示例建立在前面示例中涵盖的概念(直接队列、独占队列、确认等)的基础上。

教程中提出的 RPC 解决方案至少需要两个队列,具体取决于您要使用的客户端数量:

  • 一个直接队列(rpc_queue),用于将请求从客户端发送到服务器。
  • 每个客户端一个独占队列,用于接收响应。

请求/响应周期:

  • 客户端向rpc_queue 发送消息。每条消息都包含一个 reply_to 属性,其中包含服务器应该回复的客户端独占队列的名称,以及一个 correlation_id 属性,它只是一个唯一的 id 用于跟踪请求。
  • 服务器等待 rpc_queue 上的消息。当消息到达时,它准备响应,将 correlation_id 添加到新消息,并将其发送到 reply_to 消息属性中定义的队列。
  • 客户端在其独占队列中等待,直到它找到带有最初生成的 correlation_id 的消息。

直接跳到您的问题上,首先要做的是定义您要在响应中使用的消息格式。您可以使用 JSON、msgpack 或任何其他序列化库。例如,如果使用 JSON,一条消息可能如下所示:

{
    "cpu": 1.2,
    "memory": 0.3
} 

然后,在您的 server.py 上:

def on_request(channel, method, props, body):
    response = {'cpu': current_cpu_usage(),
                'memory': current_memory_usage()}
    properties = pika.BasicProperties(correlation_id=props.correlation_id)

    channel.basic_publish(exchange='',
                          routing_key=props.reply_to,
                          properties=properties,
                          body=json.dumps(response))
    channel.basic_ack(delivery_tag=method.delivery_tag)

# ...

在你的 client.py 上:

class ResponseTimeout(Exception): pass

class Client:
    # similar constructor as `FibonacciRpcClient` from tutorial...

    def on_response(self, channel, method, props, body):
        if self.correlation_id == props.correlation_id:
            self.response = json.loads(body.decode())

    def call(self, timeout=2):
        self.response = None
        self.correlation_id = str(uuid.uuid4())
        self.channel.basic_publish(exchange='',
                                   routing_key='rpc_queue',
                                   properties=pika.BasicProperties(
                                       reply_to=self.callback_queue,
                                       correlation_id=self.correlation_id),
                                   body='')

        start_time = time.time()
        while self.response is None:
            if (start_time + timeout) < time.time():
                raise ResponseTimeout()
            self.connection.process_data_events()
        return self.response

如您所见,代码与原始 FibonacciRpcClient 几乎相同。主要区别是:

  • 我们使用 JSON 作为消息的数据格式。
  • 我们的客户端 call() 方法不需要 body 参数(没有任何东西可以发送到服务器)
  • 我们会处理响应超时(如果服务器出现故障,或者它没有回复我们的消息)

不过,这里还有很多地方需要改进:

  • 没有错误处理:例如,如果客户端“忘记”发送一个reply_to 队列,我们​​的服务器就会崩溃,并且会在重启时再次崩溃(损坏的消息将无限重新排队,因为只要我们的服务器不承认它)
  • 我们不处理断开的连接(没有重新连接机制)
  • ...

您还可以考虑用发布/订阅 模式替换 RPC 方法;通过这种方式,服务器每隔 X 时间间隔简单地广播其 CPU/内存状态,一个或多个客户端接收更新。

关于python - Rabbitmq 远程调用 Pika,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/35793719/

相关文章:

python - 如何修复 "django-admin not recognized"

c++ - 检查 CPP 程序中的输入重定向

RabbitMQ 主题交流 : 1 Exchange vs Many Exchanges

python - python 2.7中巨大列表的时间复杂度

php - 如何在供应商目录下为 php rabbitmq 创建自动加载

sockets - Rabbitmq 监听器未连接到另一台机器

python - python 中的展平矩阵 :

Python 位串 uint 被视为 long

python - 带有分组依据的尾随或移动平均线

linux - 在 curl 获取的 2 个文件之间添加一个新行