我的应用程序的生产者模块由想要提交要在小型集群上完成的工作的用户运行。它通过 RabbitMQ 消息代理发送 JSON 形式的订阅。
我已经尝试了几种策略,目前为止最好的是下面的,它仍然没有完全奏效:
每个集群机器都运行一个消费者模块,该模块将自己订阅到 AMQP 队列并发出一个prefetch_count 来告诉代理它可以同时运行多少个任务。
我能够使用 Pika AMQP 库中的 SelectConnection 使其工作。消费者和生产者都启动了两个 channel ,一个连接到每个队列。生产者在 channel [A] 上发送请求并在 channel [B] 中等待响应,消费者在 channel [A] 上等待请求并在 channel [B] 上发送响应。然而,似乎当消费者运行计算响应的回调时,它会阻塞,所以我每次只对每个消费者执行一个任务。
我最终需要的是:
- 消费者 [A] 将他的任务(每次大约 5k)订阅到集群
- 代理为每个消费者分发 N 个消息/请求,其中 N 是它可以处理的并发任务数
- 当单个任务完成时,消费者将结果回复给经纪人/生产者
- 生产者收到回复,更新计算状态,最后打印一些报告
限制:
- 如果另一个用户提交工作,他的所有任务都将在前一个用户之后排队(我猜这是队列系统自动实现的,但我没有考虑对线程环境的影响)
- 任务有提交顺序,但回复顺序并不重要
更新
我进一步研究了一下,我的实际问题似乎是我使用一个简单的函数作为对鼠兔的 SelectConnection.channel.basic_consume() 函数的回调。我最后一个(未实现的)想法是传递一个线程函数,而不是常规函数,这样回调就不会阻塞,消费者可以继续监听。
最佳答案
正如您所注意到的,您的进程在运行回调时会阻塞。有多种方法可以处理此问题,具体取决于您的回调所做的事情。
如果您的回调是 IO 绑定(bind)的(进行大量网络或磁盘 IO),您可以使用线程或基于 greenlet 的解决方案,例如 gevent , eventlet , 或 greenhouse .不过请记住,Python 受到 GIL(全局解释器锁)的限制,这意味着在单个 Python 进程中只能运行一段 Python 代码。这意味着如果您使用 Python 代码进行大量计算,这些解决方案可能不会比您现有的解决方案快多少。
另一种选择是使用 multiprocessing 将您的消费者实现为多个进程.我发现 multiprocessing 在进行并行工作时非常有用。您可以使用 Queue 来实现这一点,让父进程成为消费者并将工作分包给它的子进程,或者简单地启动多个进程,每个进程自己消费。我建议,除非您的应用程序是高度并发的(数千个工作人员),否则只需启动多个工作人员,每个工作人员都从自己的连接中消耗。通过这种方式,您可以使用 AMQP 的确认功能,因此如果消费者在仍在处理任务时死亡,消息会自动发送回队列并由另一个工作人员拾取,而不是简单地丢失请求。
如果您控制生产者并且它也是用 Python 编写的,最后一个选项是使用像 celery 这样的任务库。为您抽象任务/队列工作。我已经将 celery 用于几个大型项目,并且发现它写得非常好。它还将通过适当的配置为您处理多个消费者问题。
关于python - 使用 Python、Pika 和 AMQP 设计异步 RPC 应用程序的最佳模式是什么?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/7403585/