python - 如何重启消费者rabbitmq pika python

标签 python python-2.7 rabbitmq twisted pika

我在使用 rabbitmq 的 pika 库设置的消费者上遇到了一些丢失问题。与 pika 一起,我正在使用扭曲的实现来设置异步消费者。我不确定为什么会发生这种情况,但如果消费者退出并且不确定如何去做,我希望实现重新连接。这是我当前的实现

class Consumer(object):
def __init__(self, queue, exchange, routingKey, medium, signalRcallbackFunc):
    self._queue_name = queue
    self.exchange = exchange
    self.routingKey = routingKey
    self.medium = medium
    print "client on"
    self.channel = None
    self.medium.client.on(signalRcallbackFunc, self.callback)

def on_connected(self, connection):
    d = connection.channel()
    d.addCallback(self.got_channel)
    d.addCallback(self.queue_declared)
    d.addCallback(self.queue_bound)
    d.addCallback(self.handle_deliveries)
    d.addErrback(log.err)

def got_channel(self, channel):
    self.channel = channel
    self.channel.basic_qos(prefetch_count=500)
    return self.channel.queue_declare(queue=self._queue_name, durable=True)

def queue_declared(self, queue):
    self.channel.queue_bind(queue=self._queue_name,
                            exchange=self.exchange,
                            routing_key=self.routingKey)

def queue_bound(self, ignored):
    return self.channel.basic_consume(queue=self._queue_name)

def handle_deliveries(self, queue_and_consumer_tag):
    queue, consumer_tag = queue_and_consumer_tag
    self.looping_call = task.LoopingCall(self.consume_from_queue, queue)

    return self.looping_call.start(0)

def consume_from_queue(self, queue):
    d = queue.get()
    return d.addCallback(lambda result: self.handle_payload(*result))

def handle_payload(self, channel, method, properties, body):
    print(body)
    print(properties.headers)
    channel.basic_ack(method.delivery_tag)
    print "#####################################" + method.delivery_tag + "###################################"

def callback(self, data):
    #self.channel.basic_ack(data, multiple=True)
    pass

最佳答案

您可以在 on_connected 回调中为连接注册一个“on-close”处理程序。当连接丢失时会调用它。在这里,您可以重新建立新的连接。

下面这个例子比较有用,是我用过的策略,效果不错... http://pika.readthedocs.io/en/latest/examples/asynchronous_consumer_example.html

对于 twisted pika 库,add_on_close_callback 方法可能会让您走得更远(虽然我还没有测试过)。 https://pika.readthedocs.io/en/0.10.0/modules/adapters/twisted.html

关于python - 如何重启消费者rabbitmq pika python,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/37182956/

相关文章:

python - Python 中的 OpenID Connect 提供程序

c# - 将 Python 翻译成 C# - 随机数组

python - 导入错误 Python 2.7。没有名为 : 的模块

python - 为字符串的长度创建一个变量是否更快?

java - spring-rabbit 中的 ClassNotFoundException 取决于消费者或生产者何时启动

go - Go-Micro Rabbit MQ插件-优先发布消息

python - 如何使 XLRD 读取 XLSX 单元格中的超链接?

python - 鹡鸰:获取上一个或下一个 sibling

python - PyQt5 - 类型错误 : signal has 0 argument(s) but 1 provided

java.io.IOException : parseAlgParameters failed: PBE AlgorithmParameters not available, 当 docker 容器尝试访问 rabbitmq TLS 端口时导致