python - 在多个Python进程之间共享RabbitMQ channel

标签 python rabbitmq multiprocessing pika

我想在多个 python 进程之间共享BlockingChannel。 为了发送 来自其他 python 进程的 basic_ack

如何在多个 python 进程之间共享 BlockingChannel

代码如下:

self.__connection__ = pika.BlockingConnection(pika.ConnectionParameters(host='localhost'))
self.__channel__ = self.__connection__.channel()

我尝试使用 pickle 进行转储,但它不允许转储 Channel 并给出错误 can't pickle select.epoll objects 使用以下代码

filepath = "temp/" + "merger_channel.sav"
pickle.dump(self.__channel__, open(filepath, 'wb'))

目标:

目标是从其他 python 进程的 channel 发送 basic_ack

最佳答案

在多个线程之间共享 channel 是一种反模式,您不太可能设法在进程之间共享它。

经验法则是每个进程 1 个连接,每个线程 1 个 channel

您可以通过以下链接阅读有关此事的更多信息:

  1. 13 common RabbitMQ mistakes
  2. RabbitMQ best practices
  3. This SO线程对RabbitMQ和并发消费进行了深入分析

如果您想将消息消费与多处理结合起来,通常的模式是让主进程接收消息,将其有效负载传递给工作进程池,并在完成后确认它们。

使用pika.BlockingChannelconcurrent.futures.ProcessPoolExecutor的简单示例:

def ack_message(channel, delivery_tag, _future):
    """Called once the message has been processed.
    Acknowledge the message to RabbitMQ.
    """
    channel.basic_ack(delivery_tag=delivery_tag)

for message in channel.consume(queue='example'):
    method, properties, body = message

    future = pool.submit(process_message, body)
    # use partial to pass channel and ack_tag to callback function
    ack_message_callback = functools.partial(ack_message, channel, method.delivery_tag)
    future.add_done_callback(ack_message_callback)      

上面的循环将无休止地消耗来自example队列的消息并将它们提交到进程池。您可以通过 RabbitMQ consumer prefetch 控制同时处理多少条消息范围。检查pika.basic_qos了解如何在 Python 中执行此操作。

关于python - 在多个Python进程之间共享RabbitMQ channel ,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/54241917/

相关文章:

python - 虚拟环境 ". venv/bin/activate"与 "source venv/bin/activate"

python - RabbitMQ 持久队列绑定(bind)

java - 多任务延迟和吞吐量

python - 导入openface出错

python - 如何设置SocketServer的最大连接数

erlang - 在 Rabbit 中提取、转换、加载?

python - python 并行化递归

multiprocessing - 破管错误 : [WinError 109] The pipe has been ended during data extraction

python - 如何比较两幅图像的差异并返回点的坐标

django - 为什么我在 Heroku 上有这么多使用 CloudAMQP 的 Celery 消息?