rabbitmq - 您如何使用多条消息回复 RabbitMQ RPC 客户端?

标签 rabbitmq rpc

我正在尝试在 RPC 环境中使用 RabbitMQ,在这种环境中,每个远程调用都将花费大量时间,并不断产生结果。我希望在生成结果时将结果交付给客户。

我从标准教程 RPC 示例开始,然后修改它以使用“直接回复”。我将所有中间结果发布回“匿名独占回调队列”,而不确认原始请求。处理完成后,我将最终消息发送回客户端,然后确认原始请求。但是客户端只看到第一个中间消息。我的客户端恰好是 PHP 而我的服务器是 Python,但我怀疑这不相关。有没有人有使这项工作的魔力?我可以发布代码,但这是食谱中非常基本的内容。

最佳答案

回答我自己的问题。以下工作:

PHP 客户端:

#!/usr/bin/php
<?php

require_once __DIR__ . '/vendor/autoload.php';
use PhpAmqpLib\Connection\AMQPStreamConnection;
use PhpAmqpLib\Message\AMQPMessage;

class RpcClient {
    private $connection;
    private $channel;
    private $callback_queue;
    private $response;
    private $corr_id;

    public function __construct() {
        $this->connection = new AMQPStreamConnection(
            'localhost', 5672, 'guest', 'guest'
        );
        $this->channel = $this->connection->channel();

        list($this->callback_queue, ,) = $this->channel->queue_declare(
            "", false, false, true, false
        );


        # For direct reply-to, need to consume amq.rabbitmq.repy-to, a special queue name
        # Unclear what happens to the declare above
        $this->channel->basic_consume(
                $this->callback_queue, '', false, true,
                false, false, array($this, 'onResponse')
        );
    }

    # This is going to be called once for each message coming back
    public function onResponse($rep) {
        if ($rep->get('correlation_id') == $this->corr_id) {
                $response = json_decode($rep->body, true);
                echo print_r($response['line'], true);
                if ($response['type'] == 'final') {
                        $this->response = $rep->body;
                }
        }
    }

    public function call($message_array) {
        $this->response = null;
        $this->corr_id = uniqid();

        $jsonm = json_encode($message_array);
        $msg = new AMQPMessage(
            $jsonm,
            array(
                'correlation_id' => $this->corr_id,
                ### Not sure which of the next two lines is the correct one... if either....
                ##'reply_to' => 'amq.rabbitmq.reply-to' # This is when using direct reply-to
                'reply_to' => $this->callback_queue
            )
        );
        $this->channel->basic_publish($msg, '', 'ansiblePB_rpc_queue');
        while (!$this->response) {
            $this->channel->wait();
        }
        return intval($this->response);
    }
}

$ansiblepb_rpc = new RpcClient();
$response = $ansiblepb_rpc->call(array('userID' => 'jb1234', 
                                       'user_display_name' => 'Joe Bloe',
                                       'limit' => '24000'));
echo ' [.] Got ', $response, "\n";
?>

Python 服务器:

#!/usr/bin/env python
""" 1 """
import glob
import json
import platform
import os
import re
import shutil
import subprocess
import time
import yaml

import pika

class RMQmultireply(object):
    """ Generic class to support ansible_playbook on a Rabbit MQ RPC queue"""
    def __init__(self, channel, method, props):
        #""" Constructor.... duh """
        self.channel = channel
        self.method = method
        self.props = props

    def run(self, userID, username, limit):
        """ Run the main guts of the service """

        cmd = ['/home/dhutchin/devel/rmq/multilineoutput']

        proc = subprocess.Popen(cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE)

        for line in proc.stdout.readlines():
            intermediate_json_result = json.dumps({'type': 'intermediate', 'line': line})

            self.channel.basic_publish(exchange='',
                                       routing_key=self.props.reply_to,
                                       properties=pika.BasicProperties(
                                           correlation_id=self.props.correlation_id),
                                       body=str(intermediate_json_result))
            #self.channel.basic_ack(delivery_tag=self.method.delivery_tag)

        proc.wait()
        return proc.returncode


def on_request(channel, method, props, jsonstring):
    """ Request has just come in to run ansible_playbook """


    playbook = RMQmultireply(channel, method, props)

    # fork and exec a playbook
    #  Recieve each line of output and send them as received back
    #  to the requestor.
    #  .run does not return until playbook exits.
    # Use "Direct Reply-to" mechanism to return multiple messages to
    # our client.
    request = yaml.load(jsonstring)  # Yes, yaml works better than JSON
    returncode = playbook.run(request['userID'], request['user_display_name'], request['limit'])

    final_json_result = json.dumps({'type': "final", 'line': '', 'rc': returncode})

    channel.basic_publish(exchange='',
                          routing_key=props.reply_to,
                          properties=pika.BasicProperties(correlation_id=
                                                          props.correlation_id),
                          body=str(final_json_result))

    # Acknowlege the original message so that RabbitMQ can remove it
    # from the ansiblePB_rpc_queue queue
    channel.basic_ack(delivery_tag=method.delivery_tag)


def main():
    """ Its kinda obvious what this does """

    try:
        connection = pika.BlockingConnection(
            pika.ConnectionParameters(host='localhost'))
    except Exception:
        print "pika.BlockingConnection.... failed... maybe RabbitMQ is not running"
        quit()

    channel = connection.channel()

    channel.queue_declare(queue='ansiblePB_rpc_queue')

    channel.basic_qos(prefetch_count=1)
    # auto_ack is turned off by default, so we don't need to specify auto_ack=False
    channel.basic_consume(queue='ansiblePB_rpc_queue', on_message_callback=on_request)

    print " [x] Awaiting RPC requests"
    channel.start_consuming()


if __name__ == '__main__':
    main()

关于rabbitmq - 您如何使用多条消息回复 RabbitMQ RPC 客户端?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/60442782/

相关文章:

python - 用于单个接收者的 Redis 发布/订阅

apache-camel - Camel 不断发送消息

azure - 将rabbitmq部署到Azure

java - 如何在gRPC拦截器中访问消息请求?

python - MySQL 触发器到 RabbitMQ

rabbitmq - 使用 RabbitMQ 订阅远程队列

wordpress - 在 WP 中向自定义帖子类型 slug 添加斜杠

c++ - 如何使用 grpc 和 google Protocol Buffer 对数组进行就地修改?

javascript - 如何将 frida javascript rpc.exports 的所有函数获取到 python?

c# - 如何编写 C# 客户端 RPC 到远程 native C++ 服务器?