python - 如何在 python 中使用 pika (RabbitMQ) 向消费者添加多处理

标签 python rabbitmq multiprocessing pika

我有非常基本的生产者-消费者代码,用 python 中的 pika 框架编写。问题是 - 消费者端在队列中的消息上运行太慢。我进行了一些测试,发现使用多重处理可以将工作流程加快 27 倍。问题是 - 我不知道向我的代码添加多处理功能的正确方法是什么。

import pika
import json
from datetime import datetime
from functions import download_xmls


def callback(ch, method, properties, body):
    print('Got something')
    body = json.loads(body)
    type = body[-1]['Type']
    print('Object type in work currently ' + type)
    cnums = [x['cadnum'] for x in body[:-1]]
    print('Got {} cnums to work with'.format(len(cnums)))

    date_start = datetime.now()
    download_xmls(type,cnums)
    date_end = datetime.now()
    ch.basic_ack(delivery_tag=method.delivery_tag)
    print('Download complete in {} seconds'.format((date_end-date_start).total_seconds()))


def consume(queue_name = 'bot-test'):
    parameters = pika.URLParameters('server@address')
    connection = pika.BlockingConnection(parameters)
    channel = connection.channel()
    channel.queue_declare(queue=queue_name, durable=True)
    channel.basic_qos(prefetch_count=1)
    channel.basic_consume(callback, queue='bot-test')
    print(' [*] Waiting for messages. To exit press CTRL+C')
    channel.start_consuming()

如何从这里开始添加多处理功能?

最佳答案

皮卡有广泛的example code我建议你去看看。请注意,此代码仅供示例使用。在线程上工作的情况下,您将必须使用更智能的方式来管理线程。

目标是不阻塞运行 Pika IO 循环的线程,并从工作线程正确回调 IO 循环。这就是 add_callback_threadsafe 存在并在该代码中使用的原因。

<小时/>

注意: RabbitMQ 团队监控 rabbitmq-users mailing list并且有时只在 StackOverflow 上回答问题。

关于python - 如何在 python 中使用 pika (RabbitMQ) 向消费者添加多处理,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/55373867/

相关文章:

python - Int 对象不可下标?

python - 为什么 asyncio.iscoroutine 在生成器上返回 True?

python - 键在字典 python 中显示为 nan

python - Celery 任务始终等待不同文件中的任务

python multiprocessing - 进程挂起加入大队列

python - 用数字对字符串中的重复字母进行编码

java - 嵌入式 AMQP Java 代理

multithreading - 消息队列消息之间的依赖

android - 如何使用 2 个进程从 1 个 android-app 登录到 1 个日志文件

python - 如何使用 multiprocessing.Pool 处理无法 pickle 的函数