python - 是否可以在 Python 中将 RabbitMQ 直接回复功能与 Pika 生成器使用者一起使用?

标签 python rabbitmq generator pika consumer

我想使用 direct reply-to RabbitMQ 的特性与 Pika Python 中的客户端库。它适用于基本 消费者。但它会针对生成器 消费者引发以下异常:

pika.exceptions.ChannelClosedByBroker: (406, 'PRECONDITION_FAILED - fast reply consumer does not exist')

有没有办法对生成器消费者使用直接回复功能?

使用基本消费者的示例客户端代码(有效):

import pika


def handle(channel, method, properties, body):
    message = body.decode()
    print("received:", message)


connection = pika.BlockingConnection()
channel = connection.channel()

with connection, channel:
    message = "hello"
    channel.basic_consume(queue="amq.rabbitmq.reply-to",
                          on_message_callback=handle, auto_ack=True)
    channel.basic_publish(
        exchange="", routing_key="test", body=message.encode(),
        properties=pika.BasicProperties(reply_to="amq.rabbitmq.reply-to"))
    print("sent:", message)
    channel.start_consuming()

使用生成器消费者的示例客户端代码(引发异常):

import pika


def handle(channel, method, properties, body):
    message = body.decode()
    print("received:", message)


connection = pika.BlockingConnection()
channel = connection.channel()

with connection, channel:
    message = "hello"
    channel.basic_publish(
        exchange="", routing_key="test", body=message.encode(),
        properties=pika.BasicProperties(reply_to="amq.rabbitmq.reply-to"))
    print("sent:", message)

    for (method, properties, body) in channel.consume(
            queue="amq.rabbitmq.reply-to", auto_ack=True):
        handle(channel, method, properties, body)

环境。 — Windows 10、RabbitMQ 3.7.13、CPython 3.7.3、Pika 1.0.1。

注意。 — 在示例客户端代码中调用 basic_publish 方法 消费者引发与使用生成器消费者时相同的异常:

import pika


def handle(channel, method, properties, body):
    message = body.decode()
    print("received:", message)


connection = pika.BlockingConnection()
channel = connection.channel()

with connection, channel:
    message = "hello"
    channel.basic_publish(
        exchange="", routing_key="test", body=message.encode(),
        properties=pika.BasicProperties(reply_to="amq.rabbitmq.reply-to"))
    print("sent:", message)
    channel.basic_consume(queue="amq.rabbitmq.reply-to",
                          on_message_callback=handle, auto_ack=True)
    channel.start_consuming()

最佳答案

正如 Luke Bakken 所建议的那样 here , 这就是诀窍:

import pika


def handle(channel, method, properties, body):
    message = body.decode()
    print("received:", message)


connection = pika.BlockingConnection()
channel = connection.channel()

with connection, channel:
    message = "hello"
    next(channel.consume(queue="amq.rabbitmq.reply-to", auto_ack=True,
                         inactivity_timeout=0.1))
    channel.basic_publish(
        exchange="", routing_key="test", body=message.encode(),
        properties=pika.BasicProperties(reply_to="amq.rabbitmq.reply-to"))
    print("sent:", message)

    for (method, properties, body) in channel.consume(
            queue="amq.rabbitmq.reply-to", auto_ack=True):
        handle(channel, method, properties, body)

关于python - 是否可以在 Python 中将 RabbitMQ 直接回复功能与 Pika 生成器使用者一起使用?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/56842059/

相关文章:

python - 如何使用Python检测视频OpenCV中的对象?

python - 将字节字符串读取为 xls 文件

rabbitmq - 队列公平性和消息服务器

python - 如何在保持顺序的同时在生成器上使用线程?

javascript - 无法在客户端从 php 执行 python 脚本

python - 根据列值等于 None 的条件删除 DataFrame 中的行

python - 使用 Wea​​syprint 创建文件响应

python - 我如何停止rabbitmq worker 中间脚本?

python - 如何在Python中创建嵌套的生成器结构?

javascript - 使用promise等待数据库操作